如何测试pybreaker并查看它是否工作

2024-09-29 23:20:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要在我的应用程序中实现一个断路器模式,所以我尝试用PyBreaker来实现它。该应用程序运行正常,但我不知道如何测试它,看看断路器是否工作良好。这是我的CircuitBreakerListener类:

import pybreaker
import logging

class CircuitBreakerListener(pybreaker.CircuitBreakerListener):

    def failure(self, cb, exc):
        logging.info("CRITICAL {0}: The last request caused a system error".format(cb))

    def state_change(self, cb, old_state, new_state):
        logging.info("Circuit Breaker {0}: The {1} state changed to {2} state".format(cb, old_state, new_state))

和我的函数调用:

from starlette.responses import JSONResponse

def circuitbreaker_call(circuitbreaker, data):
    if circuitbreaker.current_state == 'open':
        data = JSONResponse(status_code=404, content={"Error": "Oops, something went wrong. Try again in 10 seconds"})
    print(circuitbreaker.current_state)
    return data

据我所知,当Exception发生时,断路器应该改变其状态。我这样做是为了抛出一个异常:

db_breaker = pybreaker.CircuitBreaker(fail_max=1, reset_timeout=5, listeners=[CircuitBreakerListener()])

@db_breaker
def get_projects(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Project).offset(skip).limit(limit).all() == 'a'

我的终点是:

@app.get("/projects", response_model=List[schemas.Project])
def get_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    return circuitbreaker_call(db_breaker, crud.get_projects(db, skip=skip, limit=limit))

但是,我没有得到带有“Oops,出错了”消息的404代码,而是在控制台中得到了500个内部服务器错误,并且/projects端点上没有任何真正的更改。有没有办法触发断路器并测试它,看它是否真的工作


Tags: importdbgetloggingdefintprojectsstate

热门问题