我正在开发一个BS卡夫卡监控工具。该节目将收听卡夫卡主题,并不断输出该主题的新信息。那么,哪种方法是将这些消息不断发送到浏览器端的最佳方法呢
该程序使用flask,所以目前我使用流_和_上下文将新消息发送到浏览器端。目前这是可行的,但我想知道这是否是将stream_与_上下文一起使用的正确方案,因为大多数情况下用于下载和视频流?或者我应该用websocket
@read_controller.route('/v1/listenkafka/<string:kafkaId>', methods=['GET'])
def start_stream(kafkaId):
try:
mykafka_json = eval(my_storage.get(kafkaId))
mykafka = kafkaserver(ip=mykafka_json['ip'], id=kafkaId, port=mykafka_json['port'])
return Response(stream_with_context(mykafka.consume_topic(mykafka_json['topic'])))
except Exception as e:
print(f"{e}")
return jsonify(f"{e}"), 400
#The generator listen to kafka and feed to stream
def consume_topic(self, topic, groupid='test-consumer-group'):
consumer = KafkaConsumer(topic,
group_id=groupid,
bootstrap_servers=[f"{self.ip}:{self.port}"])
print(f"Topic: {topic}@{self.ip}:{self.port} starts steaming at {datetime.now()}")
try:
for messages in consumer:
mykafka_json = eval(my_storage.get(self.id))
print(mykafka_json)
if mykafka_json['flag']:
my_storage.delete(self.id)
return
else:
message = {'topic':messages.topic,
'partition':messages.partition,
'offset':messages.offset,
'key':messages.key,
'value':messages.value}
print (message['value'])
yield message['value']
except StopIteration as e:
#TODO:: handle return
print(e)
finally:
print(f"Topic-{topic} finish at {datetime.now()}")
那么,在这个场景中,我应该将流_与_上下文一起使用,还是应该切换到使用WebSock
谢谢
目前没有回答
相关问题 更多 >
编程相关推荐