单进程、持久的多生产者、多消费者队列。
pqueue的Python项目详细描述
pqueue是一个简单的python持久(基于磁盘)fifo队列。
pqueue目标是快速和简单。开发最初是基于 在Queuelib代码上。
要求
- python 2.7或python 3.x
- 无外部库要求
安装
可以通过python包索引(pypi)或从 来源。
使用pip安装:
$ pip install pqueue
使用简易安装进行安装:
$ easy_install pqueue
如果您下载了源tarball,可以通过运行 以下(作为根):
# python setup.py install
如何使用
pqueue提供单个fifo队列实现。
下面是fifo队列的使用示例:
>>> from pqueue import Queue >>> q = Queue("tmpqueue") >>> q.put(b'a') >>> q.put(b'b') >>> q.put(b'c') >>> q.get() b'a' >>> del q >>> q = Queue("tmpqueue") >>> q.get() b'b' >>> q.get() b'c' >>> q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python2.7/Queue.py", line 190, in get_nowait return self.get(False) File "/usr/lib/python2.7/Queue.py", line 165, in get raise Empty Queue.Empty
queue对象与python的“queue”模块(或python中的“queue”)相同 3.x),不同的是它需要一个参数'path'来指示 持久化队列数据和指示排队项目数的“chunksize” 应该存储在每个文件中。上可用的相同“maxsize”参数 已维护系统级“队列”。
换言之,它的工作方式与python的队列完全相同,不同之处在于 突然中断是ACID-guaranteed:
q = Queue() def worker(): while True: item = q.get() do_work(item) q.task_done() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done
注意pqueue不打算被多个进程使用。
它是怎么工作的?
在名为的分块文件上,按顺序使用pickle序列化推送的数据。 QNNNN,最大的“CukSKEY”元素,都存储在给定的“路径”上。
队列由“头”和“尾”组成。推送数据在“头”上, 提取的数据显示为“尾部”。
“info”文件在“path”中被pickle,具有以下“dict”:
- “head”:三个整数的列表,head文件的索引,以及 写入的元素,以及上次写入的文件位置。
- “tail”:三个整数的列表,tail文件的索引,以及 读取的元素,以及上次读取的文件位置。
- “size”:队列中的元素数。
- “chunkSize”:应存储在每个磁盘队列中的元素数 文件。
读写操作都依赖于磁盘上的顺序事务。在 为了达到酸性要求,这些修改受到 队列锁。
如果由于任何原因,应用程序在头部中间停止工作 写入,第二次执行将通过截断 部分头写。
在“get”上,只有当您第一次调用“task\u done”时,“info”文件才不会更新, 只有在第一次的情况下,你必须按顺序调用它。
“info”文件按以下方式更新:临时文件(使用 “mkstemp”)是用新数据创建的,然后移到前面的“info”上 文件。这是这样设计的,因为posix的“rename”保证是原子的。
如果突然中断,可能会出现以下情况之一:
- 最后一次按下的元素可能会发生部分写入,在这种情况下 最后推送的元素将被丢弃。
- 从队列中提取的元素可能正在处理,在这种情况下,第二个 运行将再次使用相同的元素。
测试
测试位于pqueue/tests目录中。他们可以用 python的默认unittest模块,命令如下:
./runtests.py
输出应该如下:
./runtests.py test_GarbageOnHead (pqueue.tests.test_queue.PersistenceTest) Adds garbage to the queue head and let the internal integrity ... ok test_MultiThreaded (pqueue.tests.test_queue.PersistenceTest) Create consumer and producer threads, check parallelism ... ok test_OpenCloseOneHundred (pqueue.tests.test_queue.PersistenceTest) Write 1000 items, close, reopen checking if all items are there ... ok test_OpenCloseSingle (pqueue.tests.test_queue.PersistenceTest) Write 1 item, close, reopen checking if same item is there ... ok test_PartialWrite (pqueue.tests.test_queue.PersistenceTest) Test recovery from previous crash w/ partial write ... ok test_RandomReadWrite (pqueue.tests.test_queue.PersistenceTest) Test random read/write ... ok ---------------------------------------------------------------------- Ran 6 tests in 1.301s OK
许可证
此软件是根据BSD许可证授权的。请参阅 完整许可证文本的顶级分发目录。
版本控制
这个软件遵循Semantic Versioning