PipeFrame用python编写多处理数据管道的简单模块。

pipeframe的Python项目详细描述


管架

什么是管道?在

In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.

来源:维基百科

PipeFrame是一个小的pipeframe帮助您利用python多处理库处理数据(流或批处理)。在

安装

pip上提供的软件包,要在您的环境中安装它,只需执行以下操作:

pip install pipeframe

入门

创建管道

首先应该创建管道,它应该继承自pipeframe.core.PipelineEngine 并包含steps类属性:

frompipeframe.coreimportPipelineEngineclassYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]

管道将针对steps函数执行每个条目。你可以 定义要对数据执行的任何数量的函数,即执行顺序 将遵循您在步骤列表中定义的相同顺序。在

您的函数应该将要处理的记录作为参数接收并返回 修改后的记录和用于绕过后续步骤执行的布尔值 在数据上(False)或继续管道流(True)。在

^{pr2}$

您还必须提供一个名为feed的函数,该函数将为您的进程提供一些数据:

classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]deffeed(self,bucket):req=requests.get('https://www.reddit.com/r/all/top.json',headers={'User-agent':'pipeframe'})ifreq.status_code==200:data=req.json()['data']['children']forentryindata:bucket.put(entry['data'])

运行管道

要执行新创建的管道,必须使用PipeFrame executor调用它:

frompipeframe.coreimportPipeFramepipe_frame=PipeFrame(cpu_count=16,stream_buffer_size=50000)pipe_frame.run(YourCustomPipeline)

cpu_count和{}是可选参数:

  • cpu计数:一个整数,默认为机器中的核心数减去1
  • 缓冲区大小:默认为10000的整数

流还是批处理?在

默认情况下,管道将以批处理模式运行,这意味着提要函数将在该步骤之前运行并完成 函数开始。您必须知道有多少数据条目将进入队列,并调整buffer_size 根据这一点。在

如果使source='stream',则提要函数将在step函数、feed和processing之后启动 将并行发生。在这种情况下,您应该将timeout属性调整为一个足够高的值,以防止 由于队列中暂时缺少数据而导致管道终止(对于数据摄取速度低于 你的处理能力)。在

示例:

classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]source='stream'timeout=10deffeed(self,bucket):forentryininfinite_stream_of_data():bucket.put(entry)

在上面的示例中,您的工人将等待10秒,等待无限长的\u stream_of_data()函数生成 要处理的新数据,如果10秒内没有新数据到达,则工作进程将终止,因为流已干涸。在

完整的例子

frompipeframe.coreimportPipelineEngine,PipeFrameimportfcntlimportjsondefclear_entry(entry):entry['new_number']=0returnentry,Truedefpower(entry):entry['new_number']=entry['number']**entry['number']returnentry,Truedefwrite_to_disk(entry):"""    Lock the file, write entry, release the file.    """withopen("log","a")asfh:fcntl.flock(fh,fcntl.LOCK_EX)fh.write(json.dumps(entry['number'])+'\n')fcntl.flock(fh,fcntl.LOCK_UN)returnentry,TrueclassPowerDataPipeline(PipelineEngine):steps=[clear_entry,power,write_to_disk]source='batch'@staticmethoddeffeed(bucket):x=1000000foriinrange(10):x+=1000entry={'number':x}bucket.put(entry)# With all cpu  - 1pipe_frame=PipeFrame()pipe_frame.run(PowerDataPipeline)# With 2 cpuspipe_frame=PipeFrame(cpu_count=2)pipe_frame.run(PowerDataPipeline)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java何时可以运行。toString()是否有可能返回重复的字符串?   使用REST进行Java应用程序登录验证?   java测试onErrorResume()Spring Webflux   java设置一个单元格样式,使数字显示为百分比ApachePOI   java仅替换regex az09   java将字符串附加到文件   java Hibernate:如何在集合中查找对象   当独立客户端为Web服务实例化代理时,java WebSphere会生成ClassNotFoundException   java简单算法。我做不好   java我的代码有什么问题?我想用Android制作一个“cardflip”动画   java如何模拟Springbean及其自动连接的参数?   java在Android中将arraylist对象的某些参数显示到列表视图中   java setOnclickListener(此)错误   java自动连接未按类型连接bean   java如何禁止在Viewpager上滑动?   java代码检查每个if语句吗?   java NIO选择器OP_READ和OP_WRITE,关于处理它们的一些问题   java如何在不锁定文件的情况下获取文件大小   Oculus Rift的Java API?   java是一种选择。仍然需要fork来设置bootClasspath