单进程、持久的多生产者、多消费者队列。

pqueue的Python项目详细描述


pqueue是一个简单的python持久(基于磁盘)fifo队列。

pqueue目标是快速和简单。开发最初是基于 在Queuelib代码上。

要求

  • python 2.7或python 3.x
  • 无外部库要求

安装

可以通过python包索引(pypi)或从 来源。

使用pip安装:

$ pip install pqueue

使用简易安装进行安装:

$ easy_install pqueue

如果您下载了源tarball,可以通过运行 以下(作为根):

# python setup.py install

如何使用

pqueue提供单个fifo队列实现。

下面是fifo队列的使用示例:

>>> from pqueue import Queue
>>> q = Queue("tmpqueue")
>>> q.put(b'a')
>>> q.put(b'b')
>>> q.put(b'c')
>>> q.get()
b'a'
>>> del q
>>> q = Queue("tmpqueue")
>>> q.get()
b'b'
>>> q.get()
b'c'
>>> q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/Queue.py", line 190, in get_nowait
    return self.get(False)
  File "/usr/lib/python2.7/Queue.py", line 165, in get
    raise Empty
Queue.Empty

queue对象与python的“queue”模块(或python中的“queue”)相同 3.x),不同的是它需要一个参数'path'来指示 持久化队列数据和指示排队项目数的“chunksize” 应该存储在每个文件中。上可用的相同“maxsize”参数 已维护系统级“队列”。

换言之,它的工作方式与python的队列完全相同,不同之处在于 突然中断是ACID-guaranteed

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

注意pqueue不打算被多个进程使用。

它是怎么工作的?

在名为的分块文件上,按顺序使用pickle序列化推送的数据。 QNNNN,最大的“CukSKEY”元素,都存储在给定的“路径”上。

队列由“头”和“尾”组成。推送数据在“头”上, 提取的数据显示为“尾部”。

“info”文件在“path”中被pickle,具有以下“dict”:

  • “head”:三个整数的列表,head文件的索引,以及 写入的元素,以及上次写入的文件位置。
  • “tail”:三个整数的列表,tail文件的索引,以及 读取的元素,以及上次读取的文件位置。
  • “size”:队列中的元素数。
  • “chunkSize”:应存储在每个磁盘队列中的元素数 文件。

读写操作都依赖于磁盘上的顺序事务。在 为了达到酸性要求,这些修改受到 队列锁。

如果由于任何原因,应用程序在头部中间停止工作 写入,第二次执行将通过截断 部分头写。

在“get”上,只有当您第一次调用“task\u done”时,“info”文件才不会更新, 只有在第一次的情况下,你必须按顺序调用它。

“info”文件按以下方式更新:临时文件(使用 “mkstemp”)是用新数据创建的,然后移到前面的“info”上 文件。这是这样设计的,因为posix的“rename”保证是原子的。

如果突然中断,可能会出现以下情况之一:

  • 最后一次按下的元素可能会发生部分写入,在这种情况下 最后推送的元素将被丢弃。
  • 从队列中提取的元素可能正在处理,在这种情况下,第二个 运行将再次使用相同的元素。

测试

测试位于pqueue/tests目录中。他们可以用 python的默认unittest模块,命令如下:

./runtests.py

输出应该如下:

./runtests.py
test_GarbageOnHead (pqueue.tests.test_queue.PersistenceTest)
Adds garbage to the queue head and let the internal integrity ... ok
test_MultiThreaded (pqueue.tests.test_queue.PersistenceTest)
Create consumer and producer threads, check parallelism ... ok
test_OpenCloseOneHundred (pqueue.tests.test_queue.PersistenceTest)
Write 1000 items, close, reopen checking if all items are there ... ok
test_OpenCloseSingle (pqueue.tests.test_queue.PersistenceTest)
Write 1 item, close, reopen checking if same item is there ... ok
test_PartialWrite (pqueue.tests.test_queue.PersistenceTest)
Test recovery from previous crash w/ partial write ... ok
test_RandomReadWrite (pqueue.tests.test_queue.PersistenceTest)
Test random read/write ... ok

----------------------------------------------------------------------
Ran 6 tests in 1.301s

OK

许可证

此软件是根据BSD许可证授权的。请参阅 完整许可证文本的顶级分发目录。

版本控制

这个软件遵循Semantic Versioning

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java对角差分   不满意链接错误:org。opencv。海吉。海吉。imread_0(OpenCV Java)   java在Eclipse中使用mavenreleaseplugin   java在加载时冻结JVM/Tomcat,而safepointlog记录使用safepoints“ThreadStop”条目   c#在visual studio中进行调试时是否可以像在eclipse(java)中那样编辑代码   java更改生成的jaxb类的类名和包结构   java我怎样才能同时使用JTextField和JLabel?   Java swing使用按钮更改包含图标的标签的颜色   java如何实例化。我在运行时得到的类文件   java无法使用XPath为站点160by2定位WebElements mobile和message box。com和way2sms。通用域名格式   java将MYSQL日期时间值转换为UTC时区格式yyyyMMdd'T'HH:mm:ss'Z'   java在Bundle中,如何找到给定键的值的类型?   使用Java流使用数据库游标   java阻止在过滤适配器数据时重新加载图像[Android]