pika和rabbitMQ每次都从队列中获取一条消息,但仍在python中调用回调函数

2024-10-02 12:22:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望流程优雅地退出,直到当前任务完成。(如果它在回调函数中执行某些操作)。逻辑很简单,一旦有一个SIGINT,我就抛出is_interrupt,在我的无限while循环中,我不断检查队列中是否有消息

如果队列中有消息,我将生成一个子进程来执行回调函数。在回调函数中,它以sys.exit(0)结束,以确保一旦子函数完成回调,它将退出

目前我面临的问题是:

如果发送方将一些消息发送到队列中,然后接收方(下面的代码)启动。它可以处理那些已存在的消息,然后继续进入回调函数。子进程不断获取spawn,但失败。(我不明白是什么原因导致这种情况发生。一旦孩子完成任务,它就会消失,主进程将继续循环并等待消息

如果接收者先启动,然后发送者发布一些消息,那么接收者将什么也收不到

请帮帮我,谢谢

import pika, sys, os, signal, requests

is_interrupt = 0

def signal_handler(sig, frame):
    global is_interrupt
    is_interrupt = 1

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    queue_state = channel.queue_declare(queue='task_queue', durable=True,  
                                        passive=True)
    
    # Register signal handler. 
    signal.signal(signal.SIGINT, signal_handler)

    def callback(ch, method, properties, body):
        # Do something here..
        os._exit(0)

    print('Worker waiting for messages. To exit press CTRL+C')

    while(True):
        if is_interrupt == 1:
            print("\nExit")
            os._exit(0)
        queue_empty = queue_state.message_count == 0
        #print(queue_empty)
        if not queue_empty:
            child_pid = os.fork()
            if child_pid == 0:
                method, properties, body = channel.basic_get(queue='task_queue', auto_ack=True)
                callback(channel, method, properties, body)
            else:
                queue_empty = True
                os.wait()

if __name__ == '__main__':
    main()

Tags: 函数true消息signalif队列queue进程

热门问题