用于分布式处理的持久消息传递

muppet的Python项目详细描述


muppetmutual的python实现。muppet为跨进程或计算机边界的简单消息传递提供远程通道,为跨进程或计算机边界的持久消息传递提供durablechannel。remotechannel和durablechannel都使用redis进行消息存储。

远程频道

远程频道遵循pub-sub模型,在该模型中,在频道上发送的每一条消息都将广播给在该频道上侦听的所有订阅者。

用法:

frommuppetimportRemoteChannel# define the callback to receive messagesdefcallback(message):print("received:",message)# we are done with the receiverreceiver.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a remote channel to send messagessender=RemoteChannel("greeting",redis_options)# create a remote channel to receive messagesreceiver=RemoteChannel("greeting",redis_options)# listen for messages by passing the callbackreceiver.listen(callback)# send a messagesender.send("hello")# we are done with the sendersender.end()

耐用频道

持久通道遵循队列模型,其中在通道上发送的消息由侦听该通道的任何一个接收器接收。使用durablechannel,发送者可以发送超时的消息,因此当消息在指定的超时内未被回复时,会通知他们。每封邮件都保证在指定的超时时间内回复,否则,将通过回调通知发件人。

用法:

frommuppetimportDurableChanneldeftimeout_callback(message):print"timed out:",message# we are done with the workerworker.end()# we are done with dispatcherdispatcher.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a durable channel to dispatch messagesdispatcher=DurableChannel("dispatcher.1",redis_options)# create a durable channel to receive messages, note the 3rd argument which is the callback for handling timeoutsworker=DurableChannel("worker.1",redis_options,timeout_callback)# dispatch a message to worker.1dispatcher.send(content="task",to="worker.1")# receive the messagemessage=worker.receive()print"received message:",message["content"]# reply to the messageworker.reply(message=message,response="reply",timeout=5000)# receive the replyreply=dispatcher.receive()print"received reply:",reply["content"]# we are happy with the replydispatcher.close(reply)# we are done with dispatcher and workerworker.end()dispatcher.end()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
使用MongoDB进行java漏斗分析?   java如何重构此代码以仅执行一次方法logViolation(),并在单个字符串中获取字符串变量(speedType)的所有值   java如何将方向向量转换为角度?   Java中json文件中的股票价格   java有没有一种方法可以查看网站中的哪些资源加载了selenium?   java文件定位器可以指向不同文件服务器上的目录吗?   java GWT等待SetVisibleRange和ClearData完成   macos Java Applet无法在Mac OS下接收鼠标输入   java BottomNavigationView不支持单次单击必须单击两次   MS SQL Server 2012中的java:我的数据库名称是J.3.0.0\u DEV我无法在Oracle SQL Developer IDE中连接它?   java对JFileChooser的修改,只显示文件夹内容   java如何将动态对象查询更改为criteria builder或更好的性能   java中的swing新字体类型   java Hibernate。删除未使用的条目   上载CSV文件并将其转换为Java模型对象   java如何将信息添加到库的日志消息中?   JavaEclipseGit:使用egit从另一个分支、标记或引用打开版本   使用Firebase控制台消息的java开放视频