我正在使用asyncio
和multiprocessing
库来运行两个进程,每个进程都有一个服务器实例在不同端口上侦听传入消息
为了识别每个客户机,我想在两个进程之间共享一个dict
,以更新已知客户机的列表。为了实现这一点,我决定使用一个Tuple[StreamReader, StreamWriter]
查找键,为这个连接分配一个Client
对象
但是,只要我插入或只是访问共享dict,程序就会崩溃,并显示以下错误消息:
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
if not await self.handle_message(reader, writer, buffer):
File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
client = self.syncmanager.get_api_client((reader, writer))
File "<string>", line 2, in get_api_client
File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
当然,我查找了错误消息并找到了this question,但我真的不明白这里的原因是什么。据我所知,造成此崩溃的原因是StreamReader
和StreamWriter
不能被pickle/serialized以在进程之间共享。如果这就是原因,有没有办法对它们进行pickle处理,也许可以通过修补reducer函数来使用不同的pickler
我设法找到了一个解决办法,同时还保留了
asyncio
和multiprocessing
库,而没有任何其他库首先,由于
StreamReader
和StreamWriter
对象是不可拾取的,因此我不得不使用socket
。这很容易通过一个简单的功能实现:套接字被插入到共享对象中(例如
Manager().dict()
或者甚至是一个自定义类,您必须通过自定义BaseManager
实例注册该类)。现在,由于应用程序基于asyncio
构建并利用库提供的流,我们可以通过以下方式轻松地将socket
转换回一对StreamReader
和StreamWriter
:其中
self.node_sock
是通过共享对象传递的socket
实例您可能对使用SyncManager感兴趣。只需确保在结束时调用
shutdown
关闭管理器,这样就不会留下僵尸进程相关问题 更多 >
编程相关推荐