简单python多处理管理器

mp-manager的Python项目详细描述


简单的多处理管理工具

使用python多处理处理处理大量数据

概念

其主要思想是利用多处理,通过构建源队列并处理要在工作者中处理的项。

多进程管理是使用MPYManager .Manager .Py < /CITE >借助 MPXManager。PurryTyHelpP.Py提供事件、锁、共享值和统计、异常处理助手和 ExtStase

多处理的工作线程是基于处理程序类构建的。

处理程序

处理程序必须继承mp_manager.base_handler.basehandler并至少实现:

  1. ClassMethodbuild_items()必须返回iterable个项供工作人员处理
  2. \u handle\u item()用于处理当前项

他们还可以实现:

  1. 日志项()由Manager用作日志记录源
  2. \u reset_stats()用于在内部重置与当前项相关的工作进程dict stats

示例用法

#!/usr/bin/env python3.7
# -*- coding: utf-8 -*-

import math
import random
import time

from mp_manager.base_handler import BaseHandler
from mp_manager.process_helpers import Stats, ns_container
from mp_manager.utils import build_batches


class SampleHandler(BaseHandler):

    batch_size = 2

    @classmethod
    def build_items(cls, *args, **kwargs):
        total = 100
        batches_total = int(math.ceil(total / cls.batch_size))

        Stats.set_stats_val('items_total', total)
        Stats.set_stats_val('batches_total', batches_total)
        ns_container.set_shared_value('shared_data', cls._data_to_share(),
                                      True)
        for i, val in enumerate(build_batches(range(total), cls.batch_size)):
            yield i, val

    @property
    def log_item(self):
        return f'{self.current_item[0]} / {Stats.stats.batches_total}'

    @classmethod
    def _data_to_share(cls):
        return dict(somekey="some value")

    def _reset_stats(self):
        self.stats = {
            'items_processed': 0,
            'items_positive': 0,
            'items_negative': 0,
        }

    def _handle_item(self):
        num, items = self.current_item
        for item in items:
            self._process_number(item)
        Stats.inc_stats_vals(self.stats)
        self.logger.info(Stats.get_stats())

    def _process_number(self, item):
        time.sleep(random.random())
        value = math.factorial(item) / math.cos(item)
        self.stats['items_processed'] += 1
        if value > 0:
            self.stats['items_positive'] += 1
        else:
            self.stats['items_negative'] += 1


if __name__ == '__main__':
    import logging
    from mp_manager.manager import ImportManager

    logging.basicConfig(level=logging.DEBUG)
    status = ImportManager(SampleHandler, num_workers=8).run()

    print(status)
    exit(status.code)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在sqlite数据库中保存特定列的历史记录   java如何更改/更新timeseriechart名称(JFreeChart)   java如何将整数转换为可绘制的   汇编什么解释Java的字节码   java查找已编译的类版本号   我应该什么时候在ColdFusion应用程序中使用Java?   java当一个实体的两个字段为(unique=true)时,如何处理JPA异常?   java为什么在所有其他实例都正确的情况下返回错误的布尔值?   java Hibernate每次都准备语句   java停留在平均字长上   对Java和日语字符进行编码   java如何将导致异常的方法的错误消息传递给侦听器中的onTestFailure方法   java代码没有打印结果   java为什么私有内部接口的方法必须是公共的?   休眠发生错误。有关详细信息,请参阅错误日志。JAVAlang.NullPointerException