redis的一个小包装器,提供对fifo队列的访问。
saferedisqueue的Python项目详细描述
安全重新排队
一个小包装器,包装redis(使用"标准" 提供 访问fifo队列并允许上游代码将弹出项标记为 处理成功(确认)或不成功(失败)。
失败的项目将自动重新排队。此外,还为项目保留备份 既没有确认也没有失败,也就是说,以防消费者崩溃。这个 一旦一个用户出现,备份项目将重新排队 再次,
所有操作都是原子的,有多个生产者和消费者是安全的 同时访问同一队列。
与redis.py的版本兼容性
<表> < COLGROUP > < COL/> < COL/> <广告> redis.py 安全重新队列 < /广告> <正文> 2.4.10-2.6.x
用作库
>>> queue = SafeRedisQueue(name='test')
>>> queue.put("Hello World")
'595d43b2-2e49-4e96-a1d2-0848d1c7f0d3'
>>> queue.put("Foo bar")
'1df060eb-b578-499d-bede-20db9da8184e'
>>> queue.get()
('595d43b2-2e49-4e96-a1d2-0848d1c7f0d3', 'Hello World')
>>> queue.get()
('1df060eb-b578-499d-bede-20db9da8184e', 'Foo bar')
注意:为了与以前的版本兼容,存在两个别名 push/pop 。尽快开始使用新的"放置/获取"(Put/Get)术语,因为"推/弹出"(Push/Pop)将在将来的版本中删除。
确认和失败
>>> queue.put("Good stuff") >>> queue.put("Bad stuff") >>> uid_good, payload_good = queue.get() >>> uid_bad, payload_bad = queue.get() ... # process the payloads... ... >>> queue.ack(uid_good) # done with that one >>> queue.fail(uid_bad) # something didn't work out with that one, let's requeue >>> uid, payload = queue.get() # get again; we get the requeued payload again >>> uid == uid_bad True ... # try again ... >>> queue.ack(uid) # now it worked; ACK the stuff now
获取超时
saferedisqueue.get 接受超时参数:
- 0(默认)永远阻塞,等待项目
- 正数将阻止该秒数
- 负超时将禁用阻塞
构造函数参数
saferedisqueue 接受 *参数,**kwargs 并将它们传递给 redis.strictredis ,所以您需要什么就用什么。
三个异常 ,请在关键字参数中使用它们进行配置 saferedisqueue 本身:
< DL>使用的快捷方式,而不是主机/端口/db/密码组合。 与redis库一样接受"redis url",例如:
- redis://[:password]@localhost:6379/0
- Unix://[:password]@/path/to/socket.sock?dB=0</LI>
使用此关键字参数时,所有位置参数(通常 一个主机)被忽略。这两者是等价的:
- saferedisqueue('localhost', 端口=6379,数据库=7)
- saferedisqueue(url='redis://localhost:6379/7')
用于redis中键的前缀。默认值:"0",这将创建 redis数据库中的以下键:
- SRQ:0:项目
- srq:0:队列
- srq:0:ackbuf
- SRQ:0:备份
- srq:0:backuplock
以秒为单位的时间间隔(默认值:60),在此时间段内,未确认的项 自动重新排队。(它们从内部ackbuf和备份数据中移动 重新构造队列。)
通过 无 禁用自动清洗。默认启用!
用于项的可选序列化程序。默认值:无
请随意编写自己的序列化程序。它只需要一个 转储 以及加载方法。pickle、json等模块, simplejson 可以开箱即用。
但是,请注意,在使用python 3时, json 模块必须 默认情况下,包装并不能正确处理 是由底层的redis.py网络代码发出的。这应该 作品:
class MyJSONSerializer(object): @staticmethod def loads(bytes): return json.loads(bytes.decode('utf-8')) @staticmethod def dumps(data): return json.dumps(data) queue = SafeRedisQueue(name='foobar',serializer=MyJSONSerializer)
在版本3.0.0中添加
对于快速脏测试,可以使用命令行中的脚本将内容放入队列:
$ echo "Hello World" | python saferedisqueue.py producer
…然后再拿出来:
$ python saferedisqueue.py consumer cbdabbc8-1c0f-4eb0-8733-fdb62a9c0fa6 Hello World