我需要在我的应用程序中实现一个断路器模式,所以我尝试用PyBreaker来实现它。该应用程序运行正常,但我不知道如何测试它,看看断路器是否工作良好。这是我的CircuitBreakerListener
类:
import pybreaker
import logging
class CircuitBreakerListener(pybreaker.CircuitBreakerListener):
def failure(self, cb, exc):
logging.info("CRITICAL {0}: The last request caused a system error".format(cb))
def state_change(self, cb, old_state, new_state):
logging.info("Circuit Breaker {0}: The {1} state changed to {2} state".format(cb, old_state, new_state))
和我的函数调用:
from starlette.responses import JSONResponse
def circuitbreaker_call(circuitbreaker, data):
if circuitbreaker.current_state == 'open':
data = JSONResponse(status_code=404, content={"Error": "Oops, something went wrong. Try again in 10 seconds"})
print(circuitbreaker.current_state)
return data
据我所知,当Exception
发生时,断路器应该改变其状态。我这样做是为了抛出一个异常:
db_breaker = pybreaker.CircuitBreaker(fail_max=1, reset_timeout=5, listeners=[CircuitBreakerListener()])
@db_breaker
def get_projects(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Project).offset(skip).limit(limit).all() == 'a'
我的终点是:
@app.get("/projects", response_model=List[schemas.Project])
def get_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return circuitbreaker_call(db_breaker, crud.get_projects(db, skip=skip, limit=limit))
但是,我没有得到带有“Oops,出错了”消息的404代码,而是在控制台中得到了500个内部服务器错误,并且/projects
端点上没有任何真正的更改。有没有办法触发断路器并测试它,看它是否真的工作
目前没有回答
相关问题 更多 >
编程相关推荐