流水线并行库

parallelpipe的Python项目详细描述


Build Status

ParallelPipe是一个用于Python的管道并行化库。

管道由一个或多个阶段组成。每一阶段取输出 作为输入并对其执行某些操作 像map、filter、reduce等,这是normal的扩展 生产者/消费者模式,我们可以有多个阶段。每个阶段 接收队列中的输入数据并将结果推送到另一个队列 这与下一阶段有关。

在本例中,我们定义了一个stage函数,它将 迭代器返回url并在 下载:

fromparallelpipeimportstageimportrequests@stage(workers=4)deffetch_urls(urls):forurlinurls:result=requests.get(url)yieldresult.content

要使用此阶段,请运行

urls=['http://test.com',...]pipe=urls|fetch_urlsforcontentinpipe.results():print(len(content))

我们建造了一个只有一个阶段的基本管道。这个阶段有4名工人 它将开始并行处理输入url。主要工序 将在其中一个内容可用时立即接收下载的内容 并打印相应的长度。注意,管道输入可以是 任何iterable;这将自动包装到一个阶段中。

假设我们对html中的标题字符串感兴趣 内容。我们可以添加另一个阶段来执行此操作:

importreRE_TITLE=re.compile("<title>(.*?)</title>",re.M)@stage(workers=2)defget_titles(contents):forcontentincontents:match=RE_TITLE.search(content)ifmatchisnotNone:yieldmatch.group(1)pipe=urls|fetch_urls|get_titlesfortitleinpipe.results():print(title)

同样,第二阶段将在内容完成后立即开始处理 是可用的,并产生他的产出。注意,这个任务也是 并行化,因为我们将workers设置为2。你可以看到这个阶段 不是一张精确的地图,因为返回的标题数量可能少于 文件编号(我们检查是否存在标题标签)。

现在我们再添加一个阶段以返回最常见的标题:

fromcollectionsimportCounter@stage()defmost_common(titles):commons=Counter(titles).most_common(1)yieldcommons[0]pipe=urls|fetch_urls|get_titles|most_commonprint(pipe.execute())

要计算最常见的标题,我们需要汇总所有结果,因此 我们只能用一个工人。我们还使用pipe.execute(),而不是 pipe.results(),因为我们知道只会返回一个结果。

参数级

@stage(workers=4)defadd_n(input,n):fornumberininput:yieldnumber+npipe=range(100)|add_n(7)forresultinpipe.results():print(result)

在本例中,stage函数不仅需要输入迭代器 但也有一个或多个额外的参数来执行他的计算。在 在我们构建管道时,我们可以配置这个额外的参数 只需把它们作为输入调用舞台。记住,所有参数 可以传递,但第一个是必需输入的除外 迭代器。

映射阶段

如果您的阶段执行纯映射,即它只返回一个结果 使用^{tt3}可以简化代码的每个输入元素$ 装饰工:

fromparallelpipeimportmap_stage@map_stage(workers=4)defadd_n(number,n):returnnumber+n

队列大小

构建阶段时,可以定义其输出队列的大小。 如果当前阶段可以 比下一阶段的消费速度快得多。在这个 case,一旦达到队列大小,就停止处理他的输入 等待消费者释放一个插槽。

# only 30 elements can queue in output before blocking this stage@stage(workers=4,qsize=30)defadd_n(input,n):fornumberininput:yieldnumber+n

默认情况下qsize=0,这意味着队列没有限制。

设置阶段

设置阶段队列,还可以在定义 阶段调用setup()方法。

add_n.setup(workers=2,qsize=0)

直接使用stage类

到目前为止,我们在函数上使用decorator构建了阶段,但是我们也可以 直接使用stage类:

fromparallelpipeimportStagedefadd_n(input,n):fornumberininput:yieldnumber+npipe=Stage(range,10)|Stage(add_n,5)

正如您在前面的示例中所看到的,stage类将 迭代器函数及其所需的任何额外参数。第一阶段 是生产者,因此不会使用任何输入迭代器调用。当我们使用 stage类明确地说,我们可以使用setup()来配置 我们需要的工人和队列大小:

pipe=Stage(range,10).setup(qsize=5)|Stage(add_n,5).setup(workers=2)

setup()方法返回stage本身,因此我们可以设置它 在管道定义期间。

异常处理

在执行stage函数期间,可能会发生异常。什么时候? 阶段检测到它将自动使用和忽略的异常 前一阶段的所有输入,然后a TaskException将 在主要的过程中。

@stage(workers=2)defadd_one(numbers):fornumberinnumbers:yieldnumber+1
>>>pipe=[2,3,"ops",7]|add_one>>>print(sum(pipe.results()))Processadd_one-0:Traceback(mostrecentcalllast):File"/Users/gt/miniconda2/lib/python2.7/multiprocessing/process.py",line258,in_bootstrapself.run()File"/Users/gt/Desktop/code/parallelpipe/parallelpipe.py",line67,inrunforiteminres:File"example.py",line7,inadd_oneyieldnumber+1TypeError:cannotconcatenate'str'and'int'objectsTraceback(mostrecentcalllast):File"example.py",line10,in<module>print(sum(pipe.results()))File"/Users/gt/Desktop/code/parallelpipe/parallelpipe.py",line249,inresultsraiseTaskException(msg)parallelpipe.TaskException:Thetask"add_one-0"raisedTypeError("cannot concatenate 'str' and 'int' objects",)
<如果你想避免一个坏的输入阻塞你的管道 当然可以捕获stage函数中的任何异常,以便 管道可以继续并产生其余的结果。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java GridBagLayout不填充区域   java Memozied Fibonacci未运行与常规Fibonacci解决方案   Java Web启动未启动问题   Java中异常和if-then的区别   java从命令提示符运行批处理文件获取错误   socket在Java中验证SSL证书的公共名称   如何在JAVA中检查字符串数组中的相等字   用java语言将音频文件转换成文本文件的语音识别   java为什么foo(1,2,3)没有传递给varargs方法foo(对象…)作为整数[]   java通过蓝牙将奇怪的数据从Arduino传输到Android   java ContainerRequestFilter获取空entitystream   java如何从安卓 studio中删除不兼容类型错误   基本Java错误   在Spring引导中使用REST API时发生java错误   javascript通过从SQL查询派生的URL打开页面