如何使用python FastAPI异步运行kafka consumer

2024-10-05 11:29:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用Kafka python和FastAPI。我能够成功运行我的消费者,但它不允许任何其他功能工作。它在使用消息时陷入for循环。我希望我的消费者能够持续运行,也希望其他功能能够正常工作。 我怎样才能做到这一点? 在下面的代码中,方法isolve_kafka_consumer()开始运行,但当我进行post调用时,它不会调用我的router和log_requests()方法

import os
from typing import List


from isolveconsumer.commonutils.base_logger import LoggerManager
from isolveconsumer.routers.application_router import application_router
from isolveconsumer.client.isolve_consumer_client import  IsolveConsumer
from isolveconsumer.configuration.read_secrets import set_env_variables
from fastapi import FastAPI, Request
import uvicorn
import time
import random
import string
from datetime import datetime
import threading
from isolveconsumer.commonutils.isolve_trigger_utils import Constants, EnvironmentUtils, ConfigueParser, \
    ConfigConstants
LOGGER = LoggerManager().get_logger(Constants.ISOLVE_CONSUMER)
env = EnvironmentUtils.get_env_var(var_name=Constants.EPASS_ENV, default_value=Constants.LOCAL)
config = ConfigueParser()
LOGGER.info("Executing env is {}".format(env))

# Data from multiple sources is published as events for isolve
isolve_consumer = FastAPI(
    title="iSolveConsumer",
    description="This is the consumer for isolve",
    version="1.0.0",
)
# all routers
isolve_consumer.include_router(application_router, prefix="/isolveconsumers/v1", tags=["application"])

# kafka_consumer = None

@isolve_consumer.on_event("startup")
async def startup_event():
    LOGGER.info("setting all the env variables required for iSolve")
    LOGGER.info("Starting app at {}".format(datetime.now()))
    set_env_variables()

@isolve_consumer.on_event("startup")
async def isolve_kafka_consumer():
    LOGGER.info("creating the isolve consumer for consuming messages")
    topic = config[env][ConfigConstants.KAFKA_TOPIC]
    group_id = config[env][ConfigConstants.KAFKA_CONSUMER_GROUP]
    bootstrap_servers: List[str] = config[env][ConfigConstants.BOOTSTRAP_SERVERS]
    LOGGER.info(f"List : {bootstrap_servers}")
    security_protocol = config[env][ConfigConstants.SECURITY_PROTOCOL]
    cafile = config[env][ConfigConstants.SSL_CAFILE]
    certfile = config[env][ConfigConstants.SSL_CERTFILE]
    keyfile = config[env][ConfigConstants.SSL_KEYFILE]
    password = os.environ.get("ssl_password")
    consumer = IsolveConsumer(topic, group_id,bootstrap_servers,cafile,certfile,keyfile,password,security_protocol)
    LOGGER.info(f"Consumer Object : {consumer}")
    consumer.isolve_events_consumer()

async def log_requests(request: Request, call_next):
    idem = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
    start_date = datetime.now()
    start_time = time.time()
    LOGGER.info("Requests logged")
    if str(request.url).__contains__("/isolveconsumers/v1/health"):
        LOGGER.info("Entered Health")
        response = await call_next(request)
    else:
        LOGGER.info(f"rid={idem} start request path={request.url.path} start time={start_date}")
        response = await call_next(request)
        end_date = datetime.now()
        end_time = time.time()
        process_time = (time.time() - start_time) * 1000
        formatted_process_time = '{0:.2f}'.format(process_time)
        LOGGER.info(
            f"rid={idem} completed_in={formatted_process_time}ms status_code={response.status_code} end time={end_date}")

    return response
#
if __name__ == "__main__":
    uvicorn.run("isolve_consumer:isolve_consumer", host="0.0.0.0", port=8084)

Tags: fromimportinfoenvconfigfortimeconsumer

热门问题