python流处理。
faust的Python项目详细描述
python流处理
//opensource.org/licenses/bsd-3-clause" rel="nofollow">
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
浮士德是一个流处理库,从 kafka流 到python。
它用于构建高性能分布式系统 以及每天处理数十亿事件的实时数据管道。
Faust同时提供流处理和事件处理。 与诸如 卡夫卡流 , 阿帕奇火花 / 风暴 / 萨姆扎 / 燧石 ,
它不使用dsl,它只是python! 这意味着您可以使用所有喜爱的python库 流处理时:numpy、pytorch、pandas、nltk、django, 烧瓶,sqlalchemy,++
Faust需要Python3.6或更高版本才能使用新的-异步编程指南-in-python-with-asyncio-232e2afa44f6" rel="nofollow">异步/等待语法, 以及可变类型注释。
下面是处理传入订单流的示例:
app=faust.App('myapp',broker='kafka://localhost')# Models describe how messages are serialized:# {"account_id": "3fae-...", amount": 3}classOrder(faust.Record):account_id:stramount:int@app.agent(value_type=Order)asyncdeforder(orders):asyncfororderinorders:# process infinite stream of orders.print(f'Order for {order.account_id}: {order.amount}')
代理装饰器定义了一个"流处理器",本质上 从卡夫卡的主题中消费,并为它收到的每个事件做一些事情。
代理是一个异步定义函数,因此也可以执行 其他异步操作,如Web请求。
这个系统可以像数据库一样保持状态。 表被命名为分布式键/值存储,您可以使用 作为常规的python字典。
表在每台机器上以超高速本地存储 用c++编写的嵌入式数据库,名为rocksdb
表还可以存储可选"窗口化"的聚合计数。 所以你可以追踪 "最后一天的点击次数",或 "上一小时的点击次数。"例如。比如 卡夫卡流 , 我们支持翻滚、跳跃、滑动时间窗口和旧窗口 可以过期以停止数据填充。
为了提高可靠性,我们使用kafka主题作为"预写日志"。 每当更改密钥时,我们都会发布到更改日志。 备用节点使用此更改日志来保留精确的副本 并在任何节点出现故障时启用即时恢复。
对于用户来说,表只是一个字典,但是数据在 重新启动并跨节点复制,以便故障转移时其他节点可以接管 自动。
您可以按URL计算页面浏览量:
# data sent to 'clicks' topic sharded by URL key.# e.g. key="http://example.com" value="1"click_topic=app.topic('clicks',key_type=str,value_type=int)# default value for missing URL will be 0 with `default=int`counts=app.Table('click_counts',default=int)@app.agent(click_topic)asyncdefcount_click(clicks):asyncforurl,countinclicks.items():counts[url]+=count
发送到kafka主题的数据是分区的,这意味着 点击将被url切分,这样每个计数 对于同一个url,将传递到同一个faust worker实例。
Faust支持任何类型的流数据:字节、Unicode和序列化 结构,但也附带了使用现代python的"模型" 描述如何序列化流中的键和值的语法:
# Order is a json serialized dictionary,# having these fields:classOrder(faust.Record):account_id:strproduct_id:strprice:floatquantity:float=1.0orders_topic=app.topic('orders',key_type=str,value_type=Order)@app.agent(orders_topic)asyncdefprocess_order(orders):asyncfororderinorders:# process each order using regular Pythontotal_price=order.price*order.quantityawaitsend_order_received_email(order.account_id,order)
Faust是静态类型的,使用类型检查器, 因此,您可以在编写应用程序时利用静态类型。
浮士德的源代码很小,组织得很好,是一个很好的 学习卡夫卡流实现的资源
< DL>浮士德是……
< DL>以下是您可以制作的一个更简单的应用程序:
import faust class Greeting(faust.Record): from_name: str to_name: str app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) @app.agent(topic) async def hello(greetings): async for greeting in greetings: print(f'Hello from {greeting.from_name} to {greeting.to_name}') @app.timer(interval=1.0) async def example_sender(app): await hello.send( value=Greeting(from_name='Faust', to_name='you'), ) if __name__ == '__main__': app.main()
你可能被异步和等待关键字吓到了, 但是您不必知道异步是如何工作的 浮士德:只要模仿这些例子,你就没事了。
示例应用程序启动两个任务:一个是处理st瑞姆, 另一个是向该流发送事件的后台线程。 在实际应用程序中,系统将发布 您的处理器可以使用的Kafka主题的事件, 后台线程只需将数据输入 例子:
安装
您可以通过python包索引(pypi)安装faust 或从源头。
使用pip安装
$ pip install -U faust
束
faust还定义了一组可以使用的setuptools 安装Faust和给定功能的依赖项。
您可以在您的需求中或在 pip 使用括号的命令行。使用逗号分隔多个捆绑包:
$ pip install "faust[rocksdb]"$ pip install "faust[rocksdb,uvloop,fast,redis]"
提供以下捆绑包:
缓存
<表> < COL/> < COL/> <正文> 浮士德[redis] :将 redis用作简单的缓存后端(memcached样式)。 < > <表>优化 <表> < COL/> < COL/> <正文> 浮士德[快速] :用于将所有可用的C加速扩展安装到浮士德核心。 < > <表>
传感器
<表> < COL/> < COL/> <正文> 浮士德[datadog] :用于使用datadog浮士德监视器。 浮士德[statsd] :用于使用浮士德显示器。 < > <表>事件循环 <表> < COL/> < COL/> <正文> 浮士德[uvloop] :用于使用浮士德的 uvloop 浮士德[gevent] :用于使用浮士德与 gevent 浮士德[eventlet] : 用于在eventlet中使用faust < > <表>
调试 <表> < COL/> < COL/> <正文> faust[debug] :用于使用aiomonitor连接和调试正在运行的faust工作进程。 浮士德[setprocTitle] : 安装了 setproctTitle 模块后,Faust Worker将 使用它在 ps / top 列表中设置一个更好的进程名。 还与 fast 和 debug 捆绑包一起安装。 < > <表>
从源代码下载和安装
从下载最新版本的浮士德 http://pypi.org/project/faust
您可以通过执行以下操作来安装它:
$ tar xvfz faust-0.0.0.tar.gz $cd faust-0.0.0 $ python setup.py build # python setup.py install
如果您当前没有使用virtualenv。
使用开发版本
带pip的
您可以使用以下命令安装浮士德的最新快照
pip
命令:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
FAQ
我能把浮士德和django/烧瓶等一起用吗?
是的!使用 gevent 或 eventlet 作为与 异步使用 gevent
这种方法适用于任何可以工作的阻塞python库 用 gevent
使用 gevent 需要安装 aiogevent 模块, 您可以将它作为一个捆绑包与faust一起安装:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust0
然后要实际使用 gevent 作为事件循环,您必须 使用 -l <;faust --loop>; 选项进入 faust 程序:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust1
或者在入口点脚本的顶部添加import mode.loop.gevent
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust2
记住:这是非常重要的,这是在最顶端的模块, 并在导入库之前执行。
使用eventlet
这种方法适用于任何可以使用 eventlet
使用eventlet需要安装aioeventlet模块, 您可以将其作为捆绑包与Faust一起安装:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust3
然后要实际使用eventlet作为事件循环,您必须 将 -l <;faust --loop>; 参数用于 faust 程序:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust4
或者在入口点脚本的顶部添加import mode.loop.eventlet
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust5 < div >
警告
很重要的是这是在模块的最上面, 并在导入库之前执行。
我能用《浮士德》和《扭曲》吗?
是的!使用异步反应器实现: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html是否支持Python3.5或更早版本?
目前还没有支持Python3.5的计划,但是欢迎使用 为项目做出贡献。
以下是实现此目标所需的一些步骤:
将变量注释重写为注释的源代码转换
例如,代码:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
6重写异步函数的源代码转换
< Buff行情>例如,代码:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
7必须重写为:
# Python Streams# Forever scalable event processing & in-memory durable K/V store;# as a library w/ asyncio & static typing.importfaust
8
是否支持python 2?
目前还没有支持Python2的计划,但是欢迎您为 项目(上面问题中的细节也与Python2相关)。
在本地运行faust应用程序时,rocksdb出错,导致打开的文件数超过最大值。我该怎么解决?
您可能需要增加打开文件的最大数量限制。这个 以下文章解释了如何在OSX上执行此操作: https://blog。dekstroza.io/ulimit在osx el capitan上的恶作剧/
浮士德支持卡夫卡,版本>;=0.10。