在进程之间共享异步IO对象

2024-10-01 22:30:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用asynciomultiprocessing库来运行两个进程,每个进程都有一个服务器实例在不同端口上侦听传入消息

为了识别每个客户机,我想在两个进程之间共享一个dict,以更新已知客户机的列表。为了实现这一点,我决定使用一个Tuple[StreamReader, StreamWriter]查找键,为这个连接分配一个Client对象

但是,只要我插入或只是访问共享dict,程序就会崩溃,并显示以下错误消息:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

当然,我查找了错误消息并找到了this question,但我真的不明白这里的原因是什么。据我所知,造成此崩溃的原因是StreamReaderStreamWriter不能被pickle/serialized以在进程之间共享。如果这就是原因,有没有办法对它们进行pickle处理,也许可以通过修补reducer函数来使用不同的pickler


Tags: inpyselfclient消息hometask进程
2条回答

我设法找到了一个解决办法,同时还保留了asynciomultiprocessing库,而没有任何其他库

首先,由于StreamReaderStreamWriter对象是不可拾取的,因此我不得不使用socket。这很容易通过一个简单的功能实现:

def get_socket(writer: StreamWriter):
    fileno = writer.get_extra_info('socket').fileno()
    return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)

套接字被插入到共享对象中(例如Manager().dict()或者甚至是一个自定义类,您必须通过自定义BaseManager实例注册该类)。现在,由于应用程序基于asyncio构建并利用库提供的流,我们可以通过以下方式轻松地将socket转换回一对StreamReaderStreamWriter

node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
node_writer.write(mesg_text)
await node_writer.drain()

其中self.node_sock是通过共享对象传递的socket实例

您可能对使用SyncManager感兴趣。只需确保在结束时调用shutdown关闭管理器,这样就不会留下僵尸进程

from multiprocessing.managers import SyncManager
from multiprocessing import Process
import signal

my_manager = SyncManager()

# to avoid closing the manager by ctrl+C. be sure to handle KeyboardInterrupt errors and close the manager accordingly
def manager_init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

my_manager.start(manager_init)

my_dict = my_manager.dict()
my_dict["clients"] = my_manager.list()
def my_process(my_id, the_dict):
    for i in range(3):
        the_dict["clients"].append(f"{my_id}_{i}")

processes = []
for j in range(4):
    processes.append(Process(target=my_process, args=(j,my_dict)))

for p in processes:
    p.start()

for p in processes:
    p.join()

print(my_dict["clients"])
# ['0_0', '2_0', '0_1', '3_0', '1_0', '0_2', '1_1', '2_1', '3_1', '1_2', '2_2', '3_2']

my_manager.shutdown()



相关问题 更多 >

    热门问题