观测器设计模式的一种简洁实现

GreenRocket的Python项目详细描述


绿色火箭

绿色火箭是一种简单紧凑的观测器实现 (或发布/订阅)通过信号的设计模式。

使用Base One创建特定信号:

>>> fromgreenrocketimportSignal>>> classMySignal(Signal):... pass...

订阅处理程序:

>>> @MySignal.subscribe... defhandler(signal):... print('handler: '+repr(signal))...

火警信号:

>>> MySignal().fire()handler: MySignal()

如果使用的是asyncio,则还可以使用协程作为处理程序 并使用await Signal.afire()异步触发信号 yield from Signal().afire()。方法afire()与 同步处理程序。

注意,该信号在继承上传播,即基址的所有订户 启动子一时将调用信号:

>>> @Signal.subscribe... defbase_handler(signal):... print('base_handler: '+repr(signal))...>>> MySignal().fire()handler: MySignal()
base_handler: MySignal()

取消订阅处理程序:

>>> MySignal.unsubscribe(handler)>>> MySignal().fire()base_handler: MySignal()

处理程序是使用弱引用订阅的。所以如果你创建并订阅 本地作用域中的处理程序(例如在生成器中),它将 自动取消订阅。

>>> defgen():... @MySignal.subscribe... deflocal_handler(signal):... print('local_handler: '+repr(signal))... yield1...>>> forvalueingen():... MySignal(value=value).fire()...local_handler: MySignal(value=1)
base_handler: MySignal(value=1)
>>> importgc# PyPy fails the following test without>>> _=gc.collect()# explicit call of garbage collector.>>> MySignal(value=2).fire()base_handler: MySignal(value=2)
>>> Signal.unsubscribe(base_handler)

如上所示,信号构造函数接受关键字参数。这些 参数可用作信号的属性:

>>> s=MySignal(a=1,b=2)>>> s.a1
>>> s.b2

信号抑制处理程序调用时引发的任何异常。它使用 名为greenrocket的记录器,来自标准logging模块,用于记录错误和 调试信息。

库还提供了Watchman类作为一种方便的测试方法 信号。

为特定信号创建监视人:

>>> fromgreenrocketimportWatchman>>> watchman=Watchman(MySignal)

火警信号:

>>> MySignal(x=1).fire()

测试信号:

>>> watchman.assert_fired_with(x=1)>>> watchman.assert_fired_with(x=2)# DOCTEST: +ellipsisTraceback (most recent call last):
...AssertionError: Failed assertion on MySignal.x: 1 != 2>>> watchman.assert_fired_with(x=1,y=2)# DOCTEST: +ellipsisTraceback (most recent call last):
...AssertionError: MySignal has no attribute y

watchman对象将每个激发的信号保存到其日志:

>>> watchman.log[MySignal(x=1)]
>>> MySignal(x=2).fire()>>> watchman.log[MySignal(x=1), MySignal(x=2)]

方法assert_fired_with测试来自 默认日志:

>>> watchman.assert_fired_with(x=2)

但您可以指定要测试的对象:

>>> watchman.assert_fired_with(-2,x=1)

更改

0.30

  • 添加了返回awaitable to support的Signal.afire()方法 基于协程的信号处理程序
  • 放弃了对Python2.6和3.2的支持

0.22

  • 添加了作为测试助手的watchman类

0.21

  • 已删除分发依赖项
  • 改进的测试

0.20

  • 已通过引用将处理程序订阅机制从订阅更改为 弱引用订阅

0.11

  • 修复了程序终止时记录器松动的问题

0.1

  • 初始版本

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java系统。出来打印导致延迟?   java如何使用dasein API连接Azure云(blob存储)   java如何将Jframe cardlayout中的“card”从属于card的Jpanel更改为另一个类?   java如何在单个消息框中显示循环的所有迭代?   java如何设置netbeans保存项目的操作?   java网站的某些选项在web视图中不起作用   java如何在安卓中打开从右到左的菜单滑动条   java更容易反转由静态方法(函数接口)内联创建的比较器?   映射Java HashMap。获取(键)和树形图。获取equals和compareTo方法的(键)用法   java Health endpoints只显示“status:up”,不显示敏感信息   java当我一直按back按钮登录时,字段显示以前插入的用户数据   JTable单元中的java图像显示   go Java vs.Golang for HOTP(rfc4226)   java使用函数链减少分支和清理代码,这有意义吗   java我应该为每个查询创建一个新的DB连接吗?   java推荐的函数调用方法(是否使用CompiledScript?)   java截断分区和地板分区有什么区别?   没有spring引导的java Profile特定属性文件?   异常如何在java中从控制台读取密码?