aiomisc中的aiomisc依赖-依赖注入

aiomisc-dependenc的Python项目详细描述


使用aiodine库和 支持pytest fixture风格的依赖注入。

Installation

从pypi安装:

pip3 install aiomisc aiomisc-dependency

How to use

Register dependency

要注册依赖项,可以使用aiomisc_dependency.dependencydecorator。

fromaiomisc_dependencyimportdependency@dependencyasyncdefpg_engine():pg_engine=awaitcreate_engine(dsn=pg_url)yieldpg_enginepg_engine.close()awaitpg_engine.wait_closed()

如您所见,依赖项可以是异步生成器函数。屈服后编码 将在拆卸时执行以正确关闭依赖项。

还支持协同路由函数、非异步函数和生成器。

Use dependency

要使用依赖项,需要将其名称添加到__dependencies__属性中 为每一个依赖它的服务。将插入指定的依赖项 作为入口点启动时的服务属性。

fromcontextlibimportsuppressimportaiohttpfromaiomiscimportServicefromaiomisc.service.aiohttpimportAIOHTTPServiceclassHealthcheckService(Service):__dependencies__=('pg_engine',)asyncdefcreate_application(self):app=aiohttp.web.Application()app.add_routes([aiohttp.web.get('/ping',self.healthcheck_handler)])returnappasyncdefhealthcheck_handler(self,request):pg_status=Falsewithsuppress(Exception):asyncwithself.pg_engine.acquire()asconn:awaitconn.execute('SELECT 1')pg_status=Truereturnaiohttp.web.json_response({'db':pg_status},status=(200ifpg_statuselse500),)classRESTService(AIOHTTPService):__dependencies__=('pg_engine',)...

如果在入口点启动时找不到任何必需的依赖项, RuntimeError将被筹集。

通过将依赖项添加到服务的kw参数中,可以手动设置依赖项 创造。这在测试中可能很方便。

fromunittestimportMockdeftest_rest_service():pg_engine_mock=Mock()service=RESTService(pg_engine=pg_engine_mock)...

Dependencies for dependencies

可以将依赖项用作其他依赖项的参数。参数将 自动注射。

@dependencyasyncdefpg_connection(pg_engine):asyncwithpg_engine.acquire()asconn:yieldconn

^{tt1}$ built-in dependency

如果依赖项需要,可以使用内置的loop依赖项 事件循环实例。

importaioredis@dependencyasyncdefredis_pool(loop):pool=aioredis.create_pool(redis_url,loop=loop)yieldpoolpool.close()awaitpool.wait_closed()

LICENSE

麻省理工学院

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在下面的多线程程序中出错   java获取com。jcraft。jsch。JSCHEException:通过TeamCity构建时身份验证失败   java在Spring+Hibernate中设置H2数据库而不使用XML文件会产生NullPointerException   java无法运行程序,权限被拒绝   设计方法:过程还是有大量数据的Java?   java sbt不支持的专业。在Jdk 1.7中执行时出现的小版本52.0错误   java随机(?)ElementNotVisibleException   java如何确定输入到文本字段中的时间是否在预先设定的两个打开和关闭时间范围内?   java Map reduce驱动程序代码不工作   Android java客户端突然停止   java J2ssh客户端经常断开连接   java动态标量子查询   java试图通过使用嵌套循环获取非重复数字。。(二维阵列)   使用java通知用户来自socket的新请求   java我们应该为DAO使用spring单例吗