PipeFrame用python编写多处理数据管道的简单模块。
pipeframe的Python项目详细描述
管架
什么是管道?在
In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.
来源:维基百科
PipeFrame是一个小的pipe行frame帮助您利用python多处理库处理数据(流或批处理)。在
安装
pip上提供的软件包,要在您的环境中安装它,只需执行以下操作:
pip install pipeframe
入门
创建管道
首先应该创建管道,它应该继承自pipeframe.core.PipelineEngine
并包含steps
类属性:
frompipeframe.coreimportPipelineEngineclassYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]
管道将针对steps
函数执行每个条目。你可以
定义要对数据执行的任何数量的函数,即执行顺序
将遵循您在步骤列表中定义的相同顺序。在
您的函数应该将要处理的记录作为参数接收并返回 修改后的记录和用于绕过后续步骤执行的布尔值 在数据上(False)或继续管道流(True)。在
^{pr2}$您还必须提供一个名为feed
的函数,该函数将为您的进程提供一些数据:
classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]deffeed(self,bucket):req=requests.get('https://www.reddit.com/r/all/top.json',headers={'User-agent':'pipeframe'})ifreq.status_code==200:data=req.json()['data']['children']forentryindata:bucket.put(entry['data'])
运行管道
要执行新创建的管道,必须使用PipeFrame executor调用它:
frompipeframe.coreimportPipeFramepipe_frame=PipeFrame(cpu_count=16,stream_buffer_size=50000)pipe_frame.run(YourCustomPipeline)
cpu_count
和{
- cpu计数:一个整数,默认为机器中的核心数减去1
- 缓冲区大小:默认为10000的整数
流还是批处理?在
默认情况下,管道将以批处理模式运行,这意味着提要函数将在该步骤之前运行并完成 函数开始。您必须知道有多少数据条目将进入队列,并调整buffer_size 根据这一点。在
如果使source='stream',则提要函数将在step函数、feed和processing之后启动 将并行发生。在这种情况下,您应该将timeout属性调整为一个足够高的值,以防止 由于队列中暂时缺少数据而导致管道终止(对于数据摄取速度低于 你的处理能力)。在
示例:
classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]source='stream'timeout=10deffeed(self,bucket):forentryininfinite_stream_of_data():bucket.put(entry)
在上面的示例中,您的工人将等待10秒,等待无限长的\u stream_of_data()函数生成 要处理的新数据,如果10秒内没有新数据到达,则工作进程将终止,因为流已干涸。在
完整的例子
frompipeframe.coreimportPipelineEngine,PipeFrameimportfcntlimportjsondefclear_entry(entry):entry['new_number']=0returnentry,Truedefpower(entry):entry['new_number']=entry['number']**entry['number']returnentry,Truedefwrite_to_disk(entry):""" Lock the file, write entry, release the file. """withopen("log","a")asfh:fcntl.flock(fh,fcntl.LOCK_EX)fh.write(json.dumps(entry['number'])+'\n')fcntl.flock(fh,fcntl.LOCK_UN)returnentry,TrueclassPowerDataPipeline(PipelineEngine):steps=[clear_entry,power,write_to_disk]source='batch'@staticmethoddeffeed(bucket):x=1000000foriinrange(10):x+=1000entry={'number':x}bucket.put(entry)# With all cpu - 1pipe_frame=PipeFrame()pipe_frame.run(PowerDataPipeline)# With 2 cpuspipe_frame=PipeFrame(cpu_count=2)pipe_frame.run(PowerDataPipeline)
- 项目
标签: