pacer是一个轻量级的python包,用于实现分布式数据处理工作流。

pacer的Python项目详细描述


起搏器

关于

pacer是用于实现分布式 数据处理工作流。而不是定义 DAG哪个 为从源到最终结果的数据流建模pacer使用 与嵌套函数调用非常相似的拉模型。运行这样的 工作流从结果节点开始并递归地将工作委托给 输入。

最初我们开发了pacer,用于在 emzed,用于分析lcms数据的框架。

Pacer是怎么工作的?

引擎盖下有两个核心组件:

  • 一个用于管理链式计算的分布式计算:

    pacer中的处理步骤只是一些python函数 附加注释。pacer尝试计算尽可能多的处理 尽可能并行地执行步骤,因为这样的函数必须 应用于不同的数据集,或者它有多个输入和 这些是同时计算的。

  • 保留在文件系统上的分布式缓存

    在输入部分修改的情况下a pacer工作流 不确定所需的更新计算,但使用分布式 用于将单个处理步骤的输入值映射到 他们的最终结果。所以工作流的重复运行没有改变 输入将运行完整的工作流,并返回所有处理步骤 立即得到已知结果。运行未知的工作流 或者修改后的输入将只执行所需的计算和 更新缓存。

这两个组件是独立的,可以单独使用。

示例

我们提供了一些简单的例子来说明它是多么容易使用 pacer。您可以找到这些示例,我们将其扩展为打印更多 在git存储库中的examples/文件夹中记录信息。

在现实世界中,lcms工作流不会像 在下面使用,但运行较长的计算步骤,如运行lcms 调峰器和随后的调峰器。

如何声明管道

在本例中,我们的输入源是一个python字符串列表 ["a", "bc", "def"]和一个数字元组(1, 2)。非常 简单示例工作流计算每个字符串的长度 将其与元组中的每个数字相乘。这个很简单的例子 可以用纯python实现如下:

import itertools

def main():

    def length(what):
        return len(what)

    def multiply(a, b):
        return a * b

    words = ["a", "bc", "def"]
    multipliers = (1, 2)

    result = [multiply(length(w), v) for (w, v) in itertools.product(words, multipliers)]
    assert result == [1, 2, 2, 4, 3, 6]

if __name__ == "__main__":
    main()

为了将这些计算转换为智能并行处理 我们使用applyjoin函数修饰符的管道 pacer,并使用 函数调用。

from pacer import apply, output, join, Engine

def main():

    @apply
    def length(what):
        return len(what)

    @output
    @join
    def multiply(a, b):
        return a * b

    words = ["a", "bc", "def"]
    multipliers = (1, 2)

    # now we DECLARE the workflow (no execution at that time):
    workflow = multiply(length(words), multipliers)

if __name__ == "__main__":
    main()

在三个cpu核上运行这个工作流现在很容易。在这种情况下 计算步骤并行运行:

Engine.set_number_of_processes(3)
workflow.start_computations()
result = workflow.get_all_in_order()

assert result == [1, 2, 2, 4, 3, 6]

Pacers方法在修改输入数据

的情况下计算所需更新

如前所述,Pacer不确定需要的更新 修改输入数据但使用分布式缓存时的计算 相反。因此,再次运行工作流将获取 不受更改影响的已知计算结果,然后开始 输入参数未知的计算。

我们又用了装修工。利用上面的例子只需要很少的 调整:

from pacer import apply, join, output, Engine, CacheBuilder

cache = CacheBuiler("/tmp/cache_000")

@apply
@cache
def length(what):
    return len(what)

@output
@join
@cache
def multiply(a, b):
    return a * b

# inputs to workflow
words = ["a", "bc", "def"]
multipliers = (1, 2)

workflow = multiply(length(words), multipliers)

# run workflow
Engine.set_number_of_processes(3)
workflow.start_computations()
result = workflow.get_all_in_order()

assert result == [1, 2, 2, 4, 3, 6]

如果从命令行运行这些示例,则会看到日志记录结果 显示单个步骤的并行执行和缓存命中避免 重新计算。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
多线程:当服务器在javasocket中可用时,如何自动重新连接到服务器   java如何从listview适配器调用我的后台服务的公共方法   java中央身份验证服务器体系结构和身份验证流程   我的Java项目的空指针异常   java系统输出到文本字段   java实体外观的swing Gui问题   java JVM消耗100%的CPU   java更改谷歌表单的背景色   java如何在2d数组上使用比较器对用户输入进行排序   从泛型原子引用获取特定类对象的java方法给出了类型错误。有办法解决这个问题吗?   java Hot从netsuite获取上次修改的信用备忘录记录   java libGDX屏幕之间的简单淡入淡出过渡?   java无法注册到textsecure服务器,无法正常工作   java如何使用关键字作为枚举常量   java Eclipse没有从central maven repo下载插件   java Rails/设计不可处理的实体   ImageView背景中的java添加带动画的乐蒂   在java中,我们可以添加到列表中的元素的最大数量是多少?   java | Ultra noob |可以将按键发送到后台窗口吗?