python流处理。

faust的Python项目详细描述


python流处理

构建状态coverage //opensource.org/licenses/bsd-3-clause" rel="nofollow"> bsd licenseFaust可通过车轮安装支持的Python版本。支持python实现。 /a>

<表> < COL/> < COL/> <正文> 版本:1.7.4 网站: http://faust.readthedocs.io/ 下载:http://pypi.org/project/faust" rel="nofollow">http://pypi.org/project/faust 来源:http://github.com/robinhood/faust" rel="nofollow">http://github.com/robinhood/faust 关键字:分布式、流、异步、处理、数据、队列 < > <表>
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust

浮士德是一个流处理库,从 kafka流 到python。

它用于构建高性能分布式系统 以及每天处理数十亿事件的实时数据管道。

Faust同时提供流处理和事件处理。 与诸如 卡夫卡流 阿帕奇火花 / 风暴 / 萨姆扎 / 燧石

它不使用dsl,它只是python! 这意味着您可以使用所有喜爱的python库 流处理时:numpy、pytorch、pandas、nltk、django, 烧瓶,sqlalchemy,++

Faust需要Python3.6或更高版本才能使用新的-异步编程指南-in-python-with-asyncio-232e2afa44f6" rel="nofollow">异步/等待语法, 以及可变类型注释。

下面是处理传入订单流的示例:

app=faust.App('myapp',broker='kafka://localhost')# Models describe how messages are serialized:# {"account_id": "3fae-...", amount": 3}classOrder(faust.Record):account_id:stramount:int@app.agent(value_type=Order)asyncdeforder(orders):asyncfororderinorders:# process infinite stream of orders.print(f'Order for {order.account_id}: {order.amount}')

代理装饰器定义了一个"流处理器",本质上 从卡夫卡的主题中消费,并为它收到的每个事件做一些事情。

代理是一个异步定义函数,因此也可以执行 其他异步操作,如Web请求。

这个系统可以像数据库一样保持状态。 表被命名为分布式键/值存储,您可以使用 作为常规的python字典。

表在每台机器上以超高速本地存储 用c++编写的嵌入式数据库,名为rocksdb

表还可以存储可选"窗口化"的聚合计数。 所以你可以追踪 "最后一天的点击次数",或 "上一小时的点击次数。"例如。比如 卡夫卡流 , 我们支持翻滚、跳跃、滑动时间窗口和旧窗口 可以过期以停止数据填充。

为了提高可靠性,我们使用kafka主题作为"预写日志"。 每当更改密钥时,我们都会发布到更改日志。 备用节点使用此更改日志来保留精确的副本 并在任何节点出现故障时启用即时恢复。

对于用户来说,表只是一个字典,但是数据在 重新启动并跨节点复制,以便故障转移时其他节点可以接管 自动。

您可以按URL计算页面浏览量:

# data sent to 'clicks' topic sharded by URL key.# e.g. key="http://example.com" value="1"click_topic=app.topic('clicks',key_type=str,value_type=int)# default value for missing URL will be 0 with `default=int`counts=app.Table('click_counts',default=int)@app.agent(click_topic)asyncdefcount_click(clicks):asyncforurl,countinclicks.items():counts[url]+=count

发送到kafka主题的数据是分区的,这意味着 点击将被url切分,这样每个计数 对于同一个url,将传递到同一个faust worker实例。

Faust支持任何类型的流数据:字节、Unicode和序列化 结构,但也附带了使用现代python的"模型" 描述如何序列化流中的键和值的语法:

# Order is a json serialized dictionary,# having these fields:classOrder(faust.Record):account_id:strproduct_id:strprice:floatquantity:float=1.0orders_topic=app.topic('orders',key_type=str,value_type=Order)@app.agent(orders_topic)asyncdefprocess_order(orders):asyncfororderinorders:# process each order using regular Pythontotal_price=order.price*order.quantityawaitsend_order_received_email(order.account_id,order)

Faust是静态类型的,使用类型检查器, 因此,您可以在编写应用程序时利用静态类型。

浮士德的源代码很小,组织得很好,是一个很好的 学习卡夫卡流实现的资源

< DL>
在介绍页中了解有关浮士德的更多信息
阅读更多关于faust、系统要求、安装说明的信息, 社区资源等。
或直接转到快速启动教程
通过对流媒体应用程序进行编程来查看Faust的运行情况。
然后浏览用户指南
按主题组织的深入信息。

浮士德是……

< DL>
简单
浮士德非常容易使用。开始使用其他流处理 解决方案您有复杂的hello world项目,并且 基础设施要求。浮士德只需要卡夫卡, 剩下的只是python,所以如果您知道python,就可以使用faust来完成 流处理,它几乎可以与任何东西集成。

以下是您可以制作的一个更简单的应用程序:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

你可能被异步和等待关键字吓到了, 但是您不必知道异步是如何工作的 浮士德:只要模仿这些例子,你就没事了。

示例应用程序启动两个任务:一个是处理st瑞姆, 另一个是向该流发送事件的后台线程。 在实际应用程序中,系统将发布 您的处理器可以使用的Kafka主题的事件, 后台线程只需将数据输入 例子:

高可用性
浮士德是高度可用的,可以在网络问题和服务器上生存下来 撞车事故。当节点发生故障时,可以自动恢复, 并且表中有备用节点将接管。
分布式
根据需要启动更多应用程序实例。
快速
一个单核faust worker实例已经可以处理数万个 我们有理由相信吞吐量 一旦我们能够支持更优化的卡夫卡客户机,就可以增加。
灵活
浮士德只是python,流是一个无限的异步迭代器。 如果你知道如何使用python,你已经知道如何使用faust, 它可以和你最喜欢的python库一起工作,比如django,flask, sqlalchemy、ntlk、numpy、scipy、tensorflow等。

安装

您可以通过python包索引(pypi)安装faust 或从源头。

使用pip安装

$ pip install -U faust

faust还定义了一组可以使用的setuptools 安装Faust和给定功能的依赖项。

您可以在您的需求中或在 pip 使用括号的命令行。使用逗号分隔多个捆绑包:

$ pip install "faust[rocksdb]"$ pip install "faust[rocksdb,uvloop,fast,redis]"

提供以下捆绑包:

商店 <表> < COL/> < COL/> <正文> 浮士德[rocksdb]

用于存储浮士德表状态的 rocksdb

建议在生产中使用。

< > <表>
缓存
<表> < COL/> < COL/> <正文> 浮士德[redis] :将 redis用作简单的缓存后端(memcached样式)。 < > <表>
优化 <表> < COL/> < COL/> <正文> 浮士德[快速] :用于将所有可用的C加速扩展安装到浮士德核心。 < > <表>
传感器
<表> < COL/> < COL/> <正文> 浮士德[datadog] :用于使用datadog浮士德监视器。 浮士德[statsd] :用于使用浮士德显示器。 < > <表>
事件循环 <表> < COL/> < COL/> <正文> 浮士德[uvloop] :用于使用浮士德的 uvloop 浮士德[gevent] :用于使用浮士德与 gevent 浮士德[eventlet] : 用于在eventlet中使用faust < > <表>
调试 <表> < COL/> < COL/> <正文> faust[debug] :用于使用aiomonitor连接和调试正在运行的faust工作进程。 浮士德[setprocTitle] : 安装了 setproctTitle 模块后,Faust Worker将 使用它在 ps / top 列表中设置一个更好的进程名。 还与 fast debug 捆绑包一起安装。 < > <表>

从源代码下载和安装

从下载最新版本的浮士德 http://pypi.org/project/faust

您可以通过执行以下操作来安装它:

$ tar xvfz faust-0.0.0.tar.gz
$cd faust-0.0.0
$ python setup.py build
# python setup.py install

如果您当前没有使用virtualenv。

使用开发版本

带pip的

您可以使用以下命令安装浮士德的最新快照 pip 命令:

$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust

FAQ

我能把浮士德和django/烧瓶等一起用吗?

是的!使用 gevent eventlet 作为与 异步

使用 gevent

这种方法适用于任何可以工作的阻塞python库 用 gevent

使用 gevent 需要安装 aiogevent 模块, 您可以将它作为一个捆绑包与faust一起安装:

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
0

然后要实际使用 gevent 作为事件循环,您必须 使用 -l <;faust --loop>; 选项进入 faust 程序:

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
1

或者在入口点脚本的顶部添加import mode.loop.gevent

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
2

记住:这是非常重要的,这是在最顶端的模块, 并在导入库之前执行。

使用eventlet

这种方法适用于任何可以使用 eventlet

使用eventlet需要安装aioeventlet模块, 您可以将其作为捆绑包与Faust一起安装:

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
3

然后要实际使用eventlet作为事件循环,您必须 将 -l <;faust --loop>; 参数用于 faust 程序:

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
4

或者在入口点脚本的顶部添加import mode.loop.eventlet

# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
5 < div >

警告

很重要的是这是在模块的最上面, 并在导入库之前执行。

我能用浮士德和龙卷风吗? 是的!使用 tornado.platform.asyncio 桥: http://www.tornadorWeb.org/en/stable/asyncio.html

我能用《浮士德》和《扭曲》吗?

是的!使用异步反应器实现: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html

是否支持Python3.5或更早版本?

目前还没有支持Python3.5的计划,但是欢迎使用 为项目做出贡献。

以下是实现此目标所需的一些步骤:

  • 将变量注释重写为注释的源代码转换

    例如,代码:

    # Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
    6
  • 重写异步函数的源代码转换

    < Buff行情>

    例如,代码:

    # Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
    7

    必须重写为:

    # Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
    8

是否支持python 2?

目前还没有支持Python2的计划,但是欢迎您为 项目(上面问题中的细节也与Python2相关)。

在本地运行faust应用程序时,rocksdb出错,导致打开的文件数超过最大值。我该怎么解决?

您可能需要增加打开文件的最大数量限制。这个 以下文章解释了如何在OSX上执行此操作: https://blog。dekstroza.io/ulimit在osx el capitan上的恶作剧/

《浮士德》支持哪些卡夫卡版本?

浮士德支持卡夫卡,版本>;=0.10。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java我有一个(单个)类别和子类别树,现在我想在其中添加项目作为treeNode   java使用ICU将输出数字(字符串)拼写为整数   在Java中,检查字符是否为元音的最佳方法是什么?   如何解决这个问题。println Apache jkenvar SSL_CLIENT_DN从mod_SSL到java、javascript或html?   有没有办法比较两个Java war文件   java spark sql问题:第一个匹配行上的联接表:rank()不工作   数学模型   所有类文件中的java错误:无法解析R   在64位Windows上发送ctrlbreak到java进程,在32位Windows上发送信号   java是什么让spring boot控制台变得多彩?   java在当前时间和下周六之间还有剩余的分钟/小时吗?   java强制从控制台输入有效的If/Else扫描程序   用组成员显示组名的java   java MediaCodec编码dequeueInputBuffer返回信息\u稍后重试\u?   java是否可以为整个struts webapp配置统一的日期格式格式?   java无法更改#vbox:focused上的vbox边框   java如何解析没有标记的JSON对象   java组织。jsoup。选择选择器$SelectorParseException:无法分析查询“”:位于“”的意外标记