如何正确读取和处理现有Flask应用程序的NSQ消息?

2024-06-14 02:15:25 发布

您现在位置:Python中文网/ 问答频道 /正文

所以我的问题是:

  1. 我有一个现有的Flask应用程序,它有几个Celery周期性任务(所以Flask应用程序正在运行,处理任务的Celery workers正在运行)。在

现在在我们的基础设施中出现了NSQ和第三个将消息推送到NSQ的应用程序,所以我必须向现有的Flask应用程序添加功能,以便从NSQ读取这些消息并做一些工作(可能在DB中创建实体或启动celry任务)

  1. 我看了pynsq官方客户,它基于龙卷风.IOLoop,所以它要求IOLoop始终启动,文档中的示例运行良好,但我不知道如何在一个线程中使用现有的Flask应用程序运行Reader。

  2. 我查看了基于gevent的adgnsq库,似乎可以在单独的线程中启动它的阅读器。

因此,我认为可能的方法是:

  1. 可能是在单独的线程中运行Reader(run_forever),所以当Reader发现message时,我可以将一些工作委托给回调处理程序函数,该函数在\u message事件上启动。

  2. 当我可以从队列中读取消息并处理它时,可能存在这种方式,例如从Celery任务中,我每30秒运行一次。但我无法找到,当读者像无限循环一样启动并读取消息时,我看到的所有库都提供了长轮询方式。

  3. 创建一个单独的应用程序,负责从NSQ读取消息,并重新使用其中现有的Flask应用程序中的一些代码。(或者让这个应用程序像从NSQ到我的Flask应用程序的“门户”)。

另外,当Flask应用程序启动时,我尝试在线程中启动Reader,我在gnsq库中获得了一些成功,它的阅读器连接到NSQ,并且在消息上运行handler的代码,但是当我尝试运行时,使用基于IOLoop的pynsqnsq运行()在线程中它失败了,似乎我不能运行“pynsq”nsq运行()不在主线程中)

请给我指一下正确的方向,我有点不明白怎么做才好。在

另外,也许有人可以给我一些建议,从同步代码中读取来自NSQ的消息的正确方法,我的意思是,当一些代码在celery task中启动时,在该代码中我可以以某种方式读取和处理NSQ消息吗?在


Tags: 方法函数代码应用程序消息flaskmessage方式