基于流的异步编程

aiopype的Python项目详细描述


所有类型

python异步数据管道

aiopype允许使用 简单明了的开发方法。

aiopype创建一个集中的消息处理程序,以允许 处理器作为独立的非阻塞消息工作 生产者/消费者。

aiopype有4个主要概念:

  • 流量
  • 经理
  • 处理器
  • 消息处理程序

流量

流是aiopype的主要组件。流程是 运行管道管理器的可靠性。

Flow负责:

  • 启动所有注册经理
  • 处理管理器故障
  • 报告错误
  • 重新启动失败的管理器

经理

管理器负责注册从顶部到 底部。这意味着它必须注册一个源并连接到 消费者,直到管道最终输出。

处理器

处理器是消息的使用者/生产者。

来源

源是处理器的特殊情况。他们的特点是 它们可以永远运行,是任何管道的起点。

源的示例可能是:

  • 一个REST API轮询器
  • 一个Websocket客户机
  • 一个Cron作业

消息处理程序

消息处理程序是允许aiopype 规模。

流将以一个或多个源作为每个源的起点 注册经理。一旦源生成事件,消息将 触发后,处理程序将识别并触发相应的 处理程序。

有两个可用的消息处理程序:

  • 同步协议
  • 异步协议

同步协议

同步事件处理程序,顾名思义,是同步的, 这意味着一旦源发出消息,就必须在 管道的末端和源头可以继续正常工作 行为。这有利于开发,但不能满足 允许组件所需的异步事件驱动模式 孤立。

异步协议

SyncProtocol和AsyncProtocol的主要区别在于 后者使用分离的事件循环来评估是否有新消息 在等待处理的队列中,而第一个简单地开始处理 即时收到消息。这允许完全隔离 处理器。

示例

苹果股票处理器。

来源

我们的数据源是Yahoo Finance,用于从^{tt6}收集数据$ 股票价格。我们将使用aiopypeRestSource作为基类。

fromaiopype.sourcesimportRestSourceclassYahooRestSource(RestSource):"""
  Yahoo REST API source.
  """def__init__(self,name,handler,symbol):super().__init__(name,handler,'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol),{'exception_threshold':10,'request_interval':30})

处理器

我们的样本处理程序将从返回的 杰森。

fromaiopypeimportProcessorclassHandleRawData(Processor):defhandle(self,data,time):self.emit('price',time,data['list']['resources'][0]['resource']['fields']['price'])

输出

我们的输出处理器将价格数据写入csv文件。

importcsvclassCSVOutput(Processor):def__init__(self,name,handler,filename):super().__init__(name,handler)self.filename=filenamewithopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow(['time','price'])defwrite(self,time,price):withopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow([time,price])

经理

管理器将实例化SourceProcessorOutput。 它将把Sourcedata事件连接到Processor.handle 处理程序和Processor'sprice事件到Output.write处理程序。 这将是我们的数据管道。

fromaiopypeimportManagerclassYahooManager(Manager):name='yahoo_apple'def__init__(self,handler):super().__init__(handler)self.processor=HandleRawData(self.build_processor_name('processor'),self.handler)self.source=YahooRestSource(self.build_processor_name('source'),self.handler,'AAPL')self.writer=CSVOutput(self.build_processor_name('writer'),self.handler,'yahoo_appl.csv')self.source.on('data',self.processor.handle)self.processor.on('price',self.writer.write)

流量

我们的流配置将只有yahoo_apple管理器。

fromaiopypeimportAsyncFlowclassFlowConfig(object):FLOWS=['yahoo_apple']dataflow=AsyncFlow(FlowConfig())

主要方法:

只需启动数据流。

if__name__=="__main__":dataflow.start()

运行示例

在名为example.py的文件中编译上述所有代码并运行:

python example.py

在制品:

这种分散的机制使得分布式管道成为可能, 如果我们有协调人在节点之间。

变更日志

0.1.4/2016-07-14

  • #10避免未完成 流量(@jalpedrinha)

0.1.3/2016-07-11

  • #8修复异步协议 终止条件(@jalpedrinha)

0.1.2/2016-07-06

  • #6处理异常 来自异步协议侦听器(@jalpedrinha)

0.1.1/2016-07-05

  • #4避免失败 Pusher客户端断开(@jalpedrinha)

0.1.0/2016-07-05

  • #1添加流管理器 和处理器(@jalpedrinha)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何使用“Wed,01 Jul 2015 17:32:41 EDT”解析字符串   java Storm apache升级(1.0.0到2.0.0)   java类驻留在不同的目录中,而不是包指定的目录。为什么?   将Java中的图像缩放到非常小的维度   java如何通过子文档从自定义方面访问ElasticSearch parentdoc字段   java如何在RationalSoftwareArchitect中使用findbugs?   Java中的事件提升处理   java值被添加到arrayList的所有索引中,而不是在“”时添加到最后一个索引中。正在使用arraylist的add()方法   JFrame中的java JPanel派生类   java如何用循环和异步方法模拟类   java Android阻止可绘制背景超出视图范围   为客户排序Java阵列   java Apache poi如何将工作表设置为枚举位置值属性?   java Rhino在使用自定义类参数调用javascript函数时出错   java格式化日期从年月日到年月日   spring如何修复java。lang.illegalargumentexception在此特定场景中是否尝试创建具有null实体的合并事件?   java如何创建更好的对象