处理多处理通信的小型python库

pymulproc的Python项目详细描述


https://travis-ci.com/d2gex/pymulproc.svg?branch=masterhttps://img.shields.io/badge/pypi_package-0.1.2-brightgreen.svghttps://img.shields.io/badge/coverage-98%25-brightgreen.svg

pymulproc是一个小库,用于处理多个进程之间的通信,而不需要外部 python标准库以外的依赖项。它完全基于多处理库,并提供 管道和队列通信的公共接口。

Pymulproc协议

pymulproc使用简单的python列表作为与以下字段通信的基础:

  1. request:一个必需的字符串,指示另一个对等方操作的内容。
  2. sender pid:一个必需的整数,指示发送数据报的处理的PID。
  3. recipient pid:一个可选的整数,表示此消息所指向的进程的PID。
  4. data:一个可选的python数据结构,其中包含用于另一个对等方的信息。

反过来,数据结构中发送的请求可以是任何内容,但是pymulproc使用以下标准:

  1. REQ_DO:指示其他对等方执行任务的请求。
  2. REQ_FINISHED:指示已完成任务的另一个对等方的请求。
  3. REQ_DIE:指示另一个对等方尽快停止并死亡的请求。实际上是毒丸。

有效消息结构的示例如下所示。消息的长度必须始终为4

['DO',1234,12345,{'value':20}]# request, sender PID, recipient PID, data['DO',1234,12345,None]# request, sender PID, recipient PID['DIE',1234,None,None]# request, sender PID

Pymulproc API

pymulproc为父进程与其子进程之间的会话交换提供了一个通用的api接口 对于管道和队列通信,如下所示:

  1. send:为1:1会话发送消息或将消息放入1:m、m:1和m:m的joinedQueue 对话。
  2. receive:检查管道中或队列前面是否有消息,以便进程 询问。如果消息不是用于使查询为假的过程,则返回。万一 通信通过队列完成,该方法将消息放回队列中,以便目标进程可以 再拿回来。

队列的通信再添加两个特定的方法:queue_empty以检查队列中是否没有任务和 queue_join在那里等待,直到所有队列都是空的。

在下面显示的send方法签名的可选参数中,值得突出显示sender_pid。 如果提供,则将此类整数作为发送者PID添加到消息中。否则该过程将添加自己的 PID。

send将在引发 例外。在实例化连接处理程序时可以配置尝试次数-请参见QueueCommunicationApi 类的构造函数以获取进一步的详细信息。

@abc.abstractmethoddefsend(self,request,sender_pid=None,recipient_pid=None,data=None):

receive是一个高阶函数,它可以接受与应用的函数关联的func关键字参数 对队列前面消息的操作。如果结果是真的,那么这条消息是为了查询 过程。否则它将“重新插入”队列的后面,以便其他进程检查它。

如果没有传递参数,则可以理解队列前面的消息始终用于查询过程。

另外,如果block=True被传递给receive,查询队列的进程将在队列 仍然是空的。然后它将“唤醒”并在另一个进程通过发送信息时再次检查队列 send

检查信息是否用于查询过程的标准总是失败的示例如下:

child.receive(lambdax:False)

安装pymulproc

pymulprocPyPI上可用,因此您可以使用pip

$ pip install pymulproc

Pymulproc管道公司通信示例

下面是一个父进程和单个子进程之间管道通信的简单示例:

importmultiprocessingfrompymulprocimportfactory,mpq_protocoldeftest_simple_pipe_communication():pipe_factory=factory.PipeCommunication()parent=pipe_factory.parent()child=pipe_factory.child()defcall_child(_child):_child.send(mpq_protocol.REQ_TEST_CHILD)child_process=multiprocessing.Process(name='child_process_',target=call_child,args=(child,))child_process.start()child_process.join()message=parent.receive()request_offset=mpq_protocol.S_PID_OFFSET-1assertmessage[request_offset]==mpq_protocol.REQ_TEST_CHILD

pymulproc simple 1:n队列通信示例

下面的示例显示子进程如何将一些数据发送回父进程。注意父进程如何传递no func 参数receive,因为子进程放置在队列中的所有消息都是针对父进程本身的:

importmultiprocessingfrompymulprocimportfactory,mpq_protocolclassChildProcess:def__init__(self,identifier,parent_pid,conn):self.id=identifierself.conn=connself.parent_pid=parent_pidself.pid=multiprocessing.current_process().piddefis_message_for_me(self,message):'''The message is for me if either the recipient_pid coincides with my pid or is None - None indicates
        that the message is for everyone
        '''returnmessage[mpq_protocol.S_PID_OFFSET+1]==self.pidornotmessage[mpq_protocol.S_PID_OFFSET+1]defrun(self,**kwargs):'''Sends the data passed as keyword parameter to the parent process:
        '''data=kwargs.get('data',None)self.conn.send(mpq_protocol.REQ_FINISHED,data=data)defcall_child(identifier,parent_pid,q_factory,data):child=ChildProcess(identifier,parent_pid,q_factory.child())child.run(data=data)deftest_children_to_parent_communication():'''Simple test where all child processes send a message to the parent process

    All children are initiated with a value that is sent to the parent for it to process it.
    '''queue_factory=factory.QueueCommunication()parent=queue_factory.parent()parent_pid=multiprocessing.current_process().pid# Prepare list of processes to start and pass the value = 3 to each child processchild_processes=[]val=3foroffsetinrange(5):child_process=multiprocessing.Process(name=f'child_process_{offset}',target=call_child,args=(offset+1,parent_pid,queue_factory,val))child_processes.append(child_process)# Start processesforchildinchild_processes:child.start()# Wait for the processes to finishforchildinchild_processes:child.join()# Receive the data from all childrencounter=0data_offset=mpq_protocol.S_PID_OFFSET+2whilenotparent.queue_empty():message=parent.receive()counter+=message[data_offset]# Ensure the queue is empty - no loose stringsparent.queue_join()# Ensure we got the right data from childrenassertcounter==val*len(child_processes)

更多示例

要获得更复杂的示例,请查看测试test_parent_full_duplex_communication_with_children_stress_test,其中 父进程和子进程之间发生全双工通信。也会给所有的孩子发毒丸 当他们不再需要的时候。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
JAVA Tictoe Minimax算法不断引发异常   java弹性时间计算器字符串开关   java从表单post操作中检索值   java Selenium webdriver无法在youtube上找到元素   java如何自动填写XFA(PDF)表单?   java为什么我的秒表程序不能运行?   raspberry pi禁用java中的其他声音   java如何配置web。xml,glassfishweb。JSF的xml文件?   使用浏览器运行自动测试时出现java错误。如何运行它?   java如何阻止Swing程序在每次向JTextArea添加文本时调整组件的大小   javajavax。注射注射无效   java如何改进Solaris服务器配置   java如何在elasticsearch后端脱机时处理Hibernate搜索启动   java TCPsocket的延迟很差,除非持续流式传输