websocket和Flask之间有什么区别

2024-10-01 13:46:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个BS卡夫卡监控工具。该节目将收听卡夫卡主题,并不断输出该主题的新信息。那么,哪种方法是将这些消息不断发送到浏览器端的最佳方法呢

该程序使用flask,所以目前我使用流_和_上下文将新消息发送到浏览器端。目前这是可行的,但我想知道这是否是将stream_与_上下文一起使用的正确方案,因为大多数情况下用于下载和视频流?或者我应该用websocket

@read_controller.route('/v1/listenkafka/<string:kafkaId>', methods=['GET'])
def start_stream(kafkaId):
    try:
        mykafka_json = eval(my_storage.get(kafkaId))
        mykafka = kafkaserver(ip=mykafka_json['ip'], id=kafkaId, port=mykafka_json['port'])
        return Response(stream_with_context(mykafka.consume_topic(mykafka_json['topic'])))
    except Exception as e:
        print(f"{e}")
        return jsonify(f"{e}"), 400
#The generator listen to kafka and feed to stream

def consume_topic(self, topic, groupid='test-consumer-group'):
    consumer = KafkaConsumer(topic,
                             group_id=groupid,
                             bootstrap_servers=[f"{self.ip}:{self.port}"])
    print(f"Topic: {topic}@{self.ip}:{self.port} starts steaming at {datetime.now()}")
    try:
        for messages in consumer:
            mykafka_json = eval(my_storage.get(self.id))
            print(mykafka_json)
            if mykafka_json['flag']:
                my_storage.delete(self.id)
                return
            else:
                message = {'topic':messages.topic,
                           'partition':messages.partition,
                           'offset':messages.offset,
                           'key':messages.key,
                           'value':messages.value}
                print (message['value'])
                yield message['value']
    except StopIteration as e:
        #TODO:: handle return
        print(e)
    finally:
        print(f"Topic-{topic} finish at {datetime.now()}")

那么,在这个场景中,我应该将流_与_上下文一起使用,还是应该切换到使用WebSock

谢谢


Tags: selfipidjsonstreamtopicreturnvalue