处理多处理通信的小型python库
pymulproc的Python项目详细描述
pymulproc是一个小库,用于处理多个进程之间的通信,而不需要外部 python标准库以外的依赖项。它完全基于多处理库,并提供 管道和队列通信的公共接口。
Pymulproc协议
pymulproc使用简单的python列表作为与以下字段通信的基础:
- request:一个必需的字符串,指示另一个对等方操作的内容。
- sender pid:一个必需的整数,指示发送数据报的处理的PID。
- recipient pid:一个可选的整数,表示此消息所指向的进程的PID。
- data:一个可选的python数据结构,其中包含用于另一个对等方的信息。
反过来,数据结构中发送的请求可以是任何内容,但是pymulproc使用以下标准:
- REQ_DO:指示其他对等方执行任务的请求。
- REQ_FINISHED:指示已完成任务的另一个对等方的请求。
- REQ_DIE:指示另一个对等方尽快停止并死亡的请求。实际上是毒丸。
有效消息结构的示例如下所示。消息的长度必须始终为4:
['DO',1234,12345,{'value':20}]# request, sender PID, recipient PID, data['DO',1234,12345,None]# request, sender PID, recipient PID['DIE',1234,None,None]# request, sender PID
Pymulproc API
pymulproc为父进程与其子进程之间的会话交换提供了一个通用的api接口 对于管道和队列通信,如下所示:
- send:为1:1会话发送消息或将消息放入1:m、m:1和m:m的joinedQueue 对话。
- receive:检查管道中或队列前面是否有消息,以便进程 询问。如果消息不是用于使查询为假的过程,则返回。万一 通信通过队列完成,该方法将消息放回队列中,以便目标进程可以 再拿回来。
队列的通信再添加两个特定的方法:queue_empty以检查队列中是否没有任务和 queue_join在那里等待,直到所有队列都是空的。
在下面显示的send方法签名的可选参数中,值得突出显示sender_pid。 如果提供,则将此类整数作为发送者PID添加到消息中。否则该过程将添加自己的 PID。
send将在引发 例外。在实例化连接处理程序时可以配置尝试次数-请参见QueueCommunicationApi 类的构造函数以获取进一步的详细信息。
@abc.abstractmethoddefsend(self,request,sender_pid=None,recipient_pid=None,data=None):
receive是一个高阶函数,它可以接受与应用的函数关联的func关键字参数 对队列前面消息的操作。如果结果是真的,那么这条消息是为了查询 过程。否则它将“重新插入”队列的后面,以便其他进程检查它。
如果没有传递参数,则可以理解队列前面的消息始终用于查询过程。
另外,如果block=True被传递给receive,查询队列的进程将在队列 仍然是空的。然后它将“唤醒”并在另一个进程通过发送信息时再次检查队列 send。
检查信息是否用于查询过程的标准总是失败的示例如下:
child.receive(lambdax:False)
Pymulproc管道公司通信示例
下面是一个父进程和单个子进程之间管道通信的简单示例:
importmultiprocessingfrompymulprocimportfactory,mpq_protocoldeftest_simple_pipe_communication():pipe_factory=factory.PipeCommunication()parent=pipe_factory.parent()child=pipe_factory.child()defcall_child(_child):_child.send(mpq_protocol.REQ_TEST_CHILD)child_process=multiprocessing.Process(name='child_process_',target=call_child,args=(child,))child_process.start()child_process.join()message=parent.receive()request_offset=mpq_protocol.S_PID_OFFSET-1assertmessage[request_offset]==mpq_protocol.REQ_TEST_CHILD
pymulproc simple 1:n队列通信示例
下面的示例显示子进程如何将一些数据发送回父进程。注意父进程如何传递no func 参数receive,因为子进程放置在队列中的所有消息都是针对父进程本身的:
importmultiprocessingfrompymulprocimportfactory,mpq_protocolclassChildProcess:def__init__(self,identifier,parent_pid,conn):self.id=identifierself.conn=connself.parent_pid=parent_pidself.pid=multiprocessing.current_process().piddefis_message_for_me(self,message):'''The message is for me if either the recipient_pid coincides with my pid or is None - None indicates that the message is for everyone '''returnmessage[mpq_protocol.S_PID_OFFSET+1]==self.pidornotmessage[mpq_protocol.S_PID_OFFSET+1]defrun(self,**kwargs):'''Sends the data passed as keyword parameter to the parent process: '''data=kwargs.get('data',None)self.conn.send(mpq_protocol.REQ_FINISHED,data=data)defcall_child(identifier,parent_pid,q_factory,data):child=ChildProcess(identifier,parent_pid,q_factory.child())child.run(data=data)deftest_children_to_parent_communication():'''Simple test where all child processes send a message to the parent process All children are initiated with a value that is sent to the parent for it to process it. '''queue_factory=factory.QueueCommunication()parent=queue_factory.parent()parent_pid=multiprocessing.current_process().pid# Prepare list of processes to start and pass the value = 3 to each child processchild_processes=[]val=3foroffsetinrange(5):child_process=multiprocessing.Process(name=f'child_process_{offset}',target=call_child,args=(offset+1,parent_pid,queue_factory,val))child_processes.append(child_process)# Start processesforchildinchild_processes:child.start()# Wait for the processes to finishforchildinchild_processes:child.join()# Receive the data from all childrencounter=0data_offset=mpq_protocol.S_PID_OFFSET+2whilenotparent.queue_empty():message=parent.receive()counter+=message[data_offset]# Ensure the queue is empty - no loose stringsparent.queue_join()# Ensure we got the right data from childrenassertcounter==val*len(child_processes)
更多示例
要获得更复杂的示例,请查看测试test_parent_full_duplex_communication_with_children_stress_test,其中 父进程和子进程之间发生全双工通信。也会给所有的孩子发毒丸 当他们不再需要的时候。