我尝试使用async调用多个pykafka使用者函数。但是,第一个pykafka消费者函数将阻止另一个函数工作。在
QueueConsumer库:
import json
from pykafka import KafkaClient
import configparser
import asyncio
class QueueConsumer(object):
def __init__(self):
config = configparser.ConfigParser()
config.read('config.ini')
self.config = config
async def test(self):
defaultTopic = 'test'
client = KafkaClient(hosts=self.config['kafka']['host'])
topic = client.topics[defaultTopic.encode('utf-8')]
consumer = topic.get_simple_consumer()
# msg = next(consumer)
for message in consumer:
print(defaultTopic+' '+message.value.decode("utf-8"))
async def coba(self):
defaultTopic = 'coba'
client = KafkaClient(hosts=self.config['kafka']['host'])
topic = client.topics[defaultTopic.encode('utf-8')]
consumer = topic.get_simple_consumer()
# msg = next(consumer)
for message in consumer:
print(defaultTopic+' '+message.value.decode("utf-8"))
然后我调用这些函数:
^{pr2}$结果将只返回来自主题“test”的队列消息。在
编辑: 我尝试添加另一个函数
async def factorial(self, name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
然后打电话说:
queueConsumer.test(),
queueConsumer.coba(),
queueConsumer.factorial('a',3),
queueConsumer.factorial('b',5),
queueConsumer.factorial('c',7),
执行一些print from factorial函数。但是当调用test或coba中的print时,它只会停止其他的。在
SimpleConsumer.consume
是一个阻塞调用,因此您需要调整代码以定期轮询新消息,同时在轮询之间放弃对其他异步操作的控制来接管。实现这一点的一种方法是在KafkaClient
上使用use_greenlets=True
kwarg,依赖gevent来处理多个异步操作之间的控制流。在相关问题 更多 >
编程相关推荐