python多处理的生产者/消费者问题

2024-05-11 06:59:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在写一个有一个生产者和多个消费者的服务器程序, 令我困惑的是,只有第一个放入队列的任务生产者 已消耗,在此之后,排队的任务不再被消耗,它们将保留 永远排队。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

producer是一个HTTP服务器,它在接收到任务后将任务放入队列 用户的请求。似乎消费者流程仍然 当队列中有新任务时被阻塞,这很奇怪。

另外两个与上述无关的问题,我不确定 最好将HTTP服务器放在自己的进程中,而不是放在主进程中 进程,如果是,我怎样才能使主进程始终运行 子进程结束。第二个问题,什么是阻止 HTTP服务器正常吗?

编辑:添加生产者代码,它只是一个简单的python wsgi服务器:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()

Tags: fromimportself服务器task队列queueput
3条回答

“第二个问题,什么是优雅地停止HTTP服务器的最佳方法?”

这很难。

进程间通信有两种选择:

  • 带外控制。服务器有另一种通信机制。另一个套接字、Unix信号或其他东西。其他内容可能是服务器本地目录中的“立即停止”文件。看起来很奇怪,但它确实工作得很好,而且比引入一个select循环来监听多个套接字或引入一个信号处理程序来捕捉Unis信号更简单。

    “立即停止”文件很容易实现。evwsgi.run()循环仅在每次请求后检查此文件。要使服务器停止,您需要创建文件,执行一个/control请求(这将得到一个500个错误或其他东西,这并不重要),服务器应该逐渐停止。请记住删除stop now文件,否则服务器将无法重新启动。

  • 带内控制。服务器有另一个URL(/stop)来停止它。表面上看,这似乎是一个安全噩梦,但这完全取决于该服务器将在何处以及如何使用。因为它看起来是一个简单的内部请求队列包装器,所以这个额外的URL工作得很好。

    要使其工作,您需要编写自己的evwsgi.run()版本,该版本可以通过设置某些变量以脱离循环的方式终止。

编辑

您可能不想终止服务器,因为您不知道它的工作线程的状态。你需要给服务器发信号,然后你只需要等到它正常完成。

如果要强制终止服务器,则os.kill()(或multiprocessing.terminate)将起作用。当然,你不知道孩子们在做什么。

我认为web服务器部分肯定有问题,因为它工作得很好:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        self.queue.close()


Manager().start()

样本输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

相关问题 更多 >