pacer是一个轻量级的python包,用于实现分布式数据处理工作流。
pacer的Python项目详细描述
起搏器
关于
pacer是用于实现分布式 数据处理工作流。而不是定义 DAG哪个 为从源到最终结果的数据流建模pacer使用 与嵌套函数调用非常相似的拉模型。运行这样的 工作流从结果节点开始并递归地将工作委托给 输入。
最初我们开发了pacer,用于在 emzed,用于分析lcms数据的框架。
Pacer是怎么工作的?
引擎盖下有两个核心组件:
一个用于管理链式计算的分布式计算:
pacer中的处理步骤只是一些python函数 附加注释。pacer尝试计算尽可能多的处理 尽可能并行地执行步骤,因为这样的函数必须 应用于不同的数据集,或者它有多个输入和 这些是同时计算的。
保留在文件系统上的分布式缓存
在输入部分修改的情况下a pacer工作流 不确定所需的更新计算,但使用分布式 用于将单个处理步骤的输入值映射到 他们的最终结果。所以工作流的重复运行没有改变 输入将运行完整的工作流,并返回所有处理步骤 立即得到已知结果。运行未知的工作流 或者修改后的输入将只执行所需的计算和 更新缓存。
这两个组件是独立的,可以单独使用。
示例
我们提供了一些简单的例子来说明它是多么容易使用 pacer。您可以找到这些示例,我们将其扩展为打印更多 在git存储库中的examples/文件夹中记录信息。
在现实世界中,lcms工作流不会像 在下面使用,但运行较长的计算步骤,如运行lcms 调峰器和随后的调峰器。
如何声明管道
在本例中,我们的输入源是一个python字符串列表 ["a", "bc", "def"]和一个数字元组(1, 2)。非常 简单示例工作流计算每个字符串的长度 将其与元组中的每个数字相乘。这个很简单的例子 可以用纯python实现如下:
import itertools def main(): def length(what): return len(what) def multiply(a, b): return a * b words = ["a", "bc", "def"] multipliers = (1, 2) result = [multiply(length(w), v) for (w, v) in itertools.product(words, multipliers)] assert result == [1, 2, 2, 4, 3, 6] if __name__ == "__main__": main()
为了将这些计算转换为智能并行处理 我们使用apply和join函数修饰符的管道 pacer,并使用 函数调用。
from pacer import apply, output, join, Engine def main(): @apply def length(what): return len(what) @output @join def multiply(a, b): return a * b words = ["a", "bc", "def"] multipliers = (1, 2) # now we DECLARE the workflow (no execution at that time): workflow = multiply(length(words), multipliers) if __name__ == "__main__": main()
在三个cpu核上运行这个工作流现在很容易。在这种情况下 计算步骤并行运行:
Engine.set_number_of_processes(3) workflow.start_computations() result = workflow.get_all_in_order() assert result == [1, 2, 2, 4, 3, 6]
Pacers方法在修改输入数据
的情况下计算所需更新如前所述,Pacer不确定需要的更新 修改输入数据但使用分布式缓存时的计算 相反。因此,再次运行工作流将获取 不受更改影响的已知计算结果,然后开始 输入参数未知的计算。
我们又用了装修工。利用上面的例子只需要很少的 调整:
from pacer import apply, join, output, Engine, CacheBuilder cache = CacheBuiler("/tmp/cache_000") @apply @cache def length(what): return len(what) @output @join @cache def multiply(a, b): return a * b # inputs to workflow words = ["a", "bc", "def"] multipliers = (1, 2) workflow = multiply(length(words), multipliers) # run workflow Engine.set_number_of_processes(3) workflow.start_computations() result = workflow.get_all_in_order() assert result == [1, 2, 2, 4, 3, 6]
如果从命令行运行这些示例,则会看到日志记录结果 显示单个步骤的并行执行和缓存命中避免 重新计算。