具有多处理功能的pythonsocketio

2024-06-26 13:50:05 发布

您现在位置:Python中文网/ 问答频道 /正文

所以我一直在努力克服这个让我发疯的泡菜错误。我的主发动机等级如下,代码如下:

import eventlet
import socketio
import multiprocessing
from multiprocessing import Queue
from multi import SIOSerever

class masterEngine:

    if __name__ == '__main__': 
            
        serverObj = SIOSerever()

        try:
            receiveData = multiprocessing.Process(target=serverObj.run)
            receiveData.start()

            receiveProcess = multiprocessing.Process(target=serverObj.fetchFromQueue)
            receiveProcess.start()

            receiveData.join()
            receiveProcess.join()
            
        except Exception as error:
            print(error)

我还有另一个名为multi的文件,它的运行方式如下:

import multiprocessing
from multiprocessing import Queue
import eventlet
import socketio

class SIOSerever:

  def __init__(self):
    self.cycletimeQueue = Queue()
    self.sio = socketio.Server(cors_allowed_origins='*',logger=False)
    self.app = socketio.WSGIApp(self.sio, static_files={'/': 'index.html',})
    self.ws_server = eventlet.listen(('0.0.0.0', 5000))

    @self.sio.on('production')
    def p_message(sid, message):
      self.cycletimeQueue.put(message)
      print("I logged : "+str(message))

  def run(self):
    eventlet.wsgi.server(self.ws_server, self.app)

  def fetchFromQueue(self):
    while True:
      cycle = self.cycletimeQueue.get()
      print(cycle)

如您所见,我可以尝试创建def run和fetchFromQueue这两个进程,我希望它们独立运行

我的run函数启动pythonsocket服务器,我从一个html网页向其发送一些数据(这在没有多重处理的情况下运行得非常好)。然后,我尝试将接收到的数据推送到队列中,以便我的其他函数可以检索它并处理接收到的数据

我需要对从套接字接收到的数据执行一组耗时的操作,这就是为什么我要将所有数据都推到队列中

在运行主发动机等级时,我收到以下信息:

Can't pickle <class 'threading.Thread'>: it's not the same object as threading.Thread
I ended!
[Finished in 0.5s]

你能帮我纠正一下我做错了什么吗


Tags: 数据runfromimportselfmessagequeuedef
1条回答
网友
1楼 · 发布于 2024-06-26 13:50:05

从多处理programming guidelines

Explicitly pass resources to child processes

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

因此,我稍微修改了您的示例,删除了所有不必要的内容,但展示了一种将共享队列显式传递给所有使用它的进程的方法:

import multiprocessing

MAX = 5

class SIOSerever:

  def __init__(self, queue):
    self.cycletimeQueue = queue

  def run(self):
    for i in range(MAX):
      self.cycletimeQueue.put(i)

  @staticmethod
  def fetchFromQueue(cycletimeQueue):
    while True:
      cycle = cycletimeQueue.get()
      print(cycle)
      if cycle >= MAX - 1:
        break


def start_server(queue):
    server = SIOSerever(queue)
    server.run()


if __name__ == '__main__':

    try:
        queue = multiprocessing.Queue()
        receiveData = multiprocessing.Process(target=start_server, args=(queue,))
        receiveData.start()

        receiveProcess = multiprocessing.Process(target=SIOSerever.fetchFromQueue, args=(queue,))
        receiveProcess.start()

        receiveData.join()
        receiveProcess.join()

    except Exception as error:
        print(error)
0
1
...

相关问题 更多 >