使用aws-sqs的多工作管道机制

sqspipes的Python项目详细描述


使用aws sqs的多工作管道机制。

说明

  1. 安装最新版本的软件包:pip install sqspipes

  2. 创建客户机

    fromsqspipesimportTaskClientclient=TaskClient(domain='my-app',aws_key='YOUR_AWS_KEY',aws_secret='YOUR_AWS_SECRET',aws_region='us-west-2')

    确保提供的aws_key对sqs具有完全访问权限 服务,因为它需要能够创建和删除队列。

    还要确保提供的aws_regionus-west-2us-east-2,因为其他区域不支持fifo队列 这个包裹用的。

  3. 定义您可能拥有的任务:

    importosimportsysimportrandomimportstringimporttimedef_generate(max_size):return''.join(random.choice(string.ascii_lowercase)for_inrange(random.randint(1,max_size)))def_reduce(value,keep='vowels'):vowels=['a','e','i','o','u',]result=[vforvinvalueif(vinvowels)==(keep=='vowels')]returnvalue,''.join(result)def_count(data):value,vowels=datareturnvalue,len(vowels)

    在本例中,我们有一个简单的流程,如下所示:

    生成单词->;将单词缩减为只包含元音->;计算缩减的 单词

    这类似于map reduce算法,但是使用这个模块 您可能有许多层,每个层在其中转换原始数据 另一种方式。这些层(tasks)然后像bash一样组合在一起 管道,其中任务的输出是下一个任务的输入。

    注意以下几点:

    1. 每个task的第一个参数将由 前一个的输出,除了 第一个任务。
    2. 每个任务的输出都应该是json可序列化的。
    3. 如果不希望任务返回None。 在加工线上继续。可以这样做,例如 因为您的任务是从数据库中选择的,所以您可以返回 None如果数据库为空。如果出于任何原因你想 处理None就像正常的任务输出/输入一样,您可以通过 ignore_none=False作为^{tt12}的参数$ 构造函数。在这种情况下,可以使用以下命令返回 空任务输出。
    fromsqspipesimportEmptyTaskOutputdefmy_task()# your task's logic herereturnEmptyTaskOutput()# for some reason, None is a valid task output# later in your code...TaskClient(domain='my-app',aws_key='YOUR_AWS_KEY',aws_secret='YOUR_AWS_SECRET',aws_region='us-west-2',ignore_none=False)
  4. 注册任务

    现在您已经创建了各种tasks,您只需 定义它们的顺序和其他运行时参数,如下所示:

    client.register_tasks([{'method':_generate,'workers':32,'interval':0.1},{'method':_reduce,'workers':2},{'method':_count,'workers':16}])

    每个任务都支持以下键:

    `method`:
        A callable object. This is the function that will actually be executed.
        For all tasks except for the first one, the first argument of this method
        will be the result of the previous task's method.
    
    `name`:
        The name of this tasks.
        If no name is provided, the method's name is automatically used.
    
    `workers`:
        The number of worker threads that will be processing messages in parallel.
        Defaults to 1.
    
    `priorities`:
        The number of different priority levels, where 0 is the lowest possible priority.
        Defaults to 1, maximum value is 16.
    
    `interval`:
        Only applies to the first task.
        Number of seconds to wait between each execution.
        Can either be an number, or a callable that returns an number (e.g `lambda: random.random() * 5`)
        Defaults to 0.
    
  5. 执行任务

    执行我们描述的任务的脚本看起来像 这个:

    # script.py fileimportsysdefgenerate(workers):forresinclient.run('_generate',args=(10,),iterate=True,workers=workers):print(res)defreduce(workers):forresinclient.run('_reduce',iterate=True,workers=workers):print('%s -> %s'%res)defcount(workers):forresultinclient.run('_count',iterate=True,workers=workers):print('%s -> %d'%result)try:n_workers=int(sys.argv[2])exceptValueError:n_workers=Nonetry:ifsys.argv[1]=='generate':generate(n_workers)elifsys.argv[1]=='reduce':reduce(n_workers)elifsys.argv[1]=='count':count(n_workers)else:raiseValueError('Invalid argument: must be one of generate, reduce or count')exceptIndexError:raiseValueError('Script argument is required')

    在本例中,我们有一个基于 参数,执行前面定义的三个任务之一 一步一步。请注意,您可以进行以下设置:

    1. 运行命令^{tt14}的机器m1$ 这将创造出8个工人,他们将为 处理。
    2. 运行命令^{tt15}的机器m2$ 这将创造出16个工人,他们只会减少他们的语言 元音。
    3. 本例中的计算机可以是另一个节点(vm,physical 但任务当然可以在同一台计算机上运行 还有基础设施。
    4. 其中一个任务上未处理的异常将导致 整个任务运行器。这是故意的,否则如果 未处理的异常被“吞并”,这将更难 调试问题,甚至发现并跟踪那些“丢失的” 包装。你可以处理任何你想要的异常 可能的方式。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java中的多个控制台或显示屏?   java Guava:是否可能不使用多重映射映射所有条目。索引()?   java转换键值对作为JSON响应的对象?   java读取一个文本文件,然后计算字母频率,并从高到低列出它们   java Apache CXF为客户提供SEI   java如何在SQL查询中“转义”整个字符串   将JavaServlet定义为主网页servlet注释不起作用   运行jar文件时发生java FileNotFoundException   java有两种加载FXML的方法;为什么一个比另一个更受欢迎?   java无法切换到timeofindia站点页面中的帧   java Firebase Firestore在连接丢失后需要很长时间才能重新连接   java使用来自SQLite的通用数据填充RecyclerView/ListView   当我使用offer和poll独占访问它时,java是LinkedList线程安全的吗?   如何使用包含Java命名空间的XPath检索XML数据?   Spring Boot的java Elasticsearch Searchguard配置   java数组中的数组值赋值?   java保存成功,但更新失败使用Jointable的多对多Spring JPA和额外列   kotlin什么是java。构造器。单()?   简单解析例程的java问题