单进程、持久的多生产者、多消费者队列。

pqueue的Python项目详细描述


pqueue是一个简单的python持久(基于磁盘)fifo队列。

pqueue目标是快速和简单。开发最初是基于 在Queuelib代码上。

要求

  • python 2.7或python 3.x
  • 无外部库要求

安装

可以通过python包索引(pypi)或从 来源。

使用pip安装:

$ pip install pqueue

使用简易安装进行安装:

$ easy_install pqueue

如果您下载了源tarball,可以通过运行 以下(作为根):

# python setup.py install

如何使用

pqueue提供单个fifo队列实现。

下面是fifo队列的使用示例:

>>> from pqueue import Queue
>>> q = Queue("tmpqueue")
>>> q.put(b'a')
>>> q.put(b'b')
>>> q.put(b'c')
>>> q.get()
b'a'
>>> del q
>>> q = Queue("tmpqueue")
>>> q.get()
b'b'
>>> q.get()
b'c'
>>> q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/Queue.py", line 190, in get_nowait
    return self.get(False)
  File "/usr/lib/python2.7/Queue.py", line 165, in get
    raise Empty
Queue.Empty

queue对象与python的“queue”模块(或python中的“queue”)相同 3.x),不同的是它需要一个参数'path'来指示 持久化队列数据和指示排队项目数的“chunksize” 应该存储在每个文件中。上可用的相同“maxsize”参数 已维护系统级“队列”。

换言之,它的工作方式与python的队列完全相同,不同之处在于 突然中断是ACID-guaranteed

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

注意pqueue不打算被多个进程使用。

它是怎么工作的?

在名为的分块文件上,按顺序使用pickle序列化推送的数据。 QNNNN,最大的“CukSKEY”元素,都存储在给定的“路径”上。

队列由“头”和“尾”组成。推送数据在“头”上, 提取的数据显示为“尾部”。

“info”文件在“path”中被pickle,具有以下“dict”:

  • “head”:三个整数的列表,head文件的索引,以及 写入的元素,以及上次写入的文件位置。
  • “tail”:三个整数的列表,tail文件的索引,以及 读取的元素,以及上次读取的文件位置。
  • “size”:队列中的元素数。
  • “chunkSize”:应存储在每个磁盘队列中的元素数 文件。

读写操作都依赖于磁盘上的顺序事务。在 为了达到酸性要求,这些修改受到 队列锁。

如果由于任何原因,应用程序在头部中间停止工作 写入,第二次执行将通过截断 部分头写。

在“get”上,只有当您第一次调用“task\u done”时,“info”文件才不会更新, 只有在第一次的情况下,你必须按顺序调用它。

“info”文件按以下方式更新:临时文件(使用 “mkstemp”)是用新数据创建的,然后移到前面的“info”上 文件。这是这样设计的,因为posix的“rename”保证是原子的。

如果突然中断,可能会出现以下情况之一:

  • 最后一次按下的元素可能会发生部分写入,在这种情况下 最后推送的元素将被丢弃。
  • 从队列中提取的元素可能正在处理,在这种情况下,第二个 运行将再次使用相同的元素。

测试

测试位于pqueue/tests目录中。他们可以用 python的默认unittest模块,命令如下:

./runtests.py

输出应该如下:

./runtests.py
test_GarbageOnHead (pqueue.tests.test_queue.PersistenceTest)
Adds garbage to the queue head and let the internal integrity ... ok
test_MultiThreaded (pqueue.tests.test_queue.PersistenceTest)
Create consumer and producer threads, check parallelism ... ok
test_OpenCloseOneHundred (pqueue.tests.test_queue.PersistenceTest)
Write 1000 items, close, reopen checking if all items are there ... ok
test_OpenCloseSingle (pqueue.tests.test_queue.PersistenceTest)
Write 1 item, close, reopen checking if same item is there ... ok
test_PartialWrite (pqueue.tests.test_queue.PersistenceTest)
Test recovery from previous crash w/ partial write ... ok
test_RandomReadWrite (pqueue.tests.test_queue.PersistenceTest)
Test random read/write ... ok

----------------------------------------------------------------------
Ran 6 tests in 1.301s

OK

许可证

此软件是根据BSD许可证授权的。请参阅 完整许可证文本的顶级分发目录。

版本控制

这个软件遵循Semantic Versioning

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
由于测试失败,java testcontainers maven构建失败   java实现jacobi算法实现laplace方程   java中的多线程:如何在不等待所有线程使用ExecutorService完成任务的情况下终止所有线程的执行?   java Hello World不在Android Studio 3中工作   ubuntu Tomcat7的Java版本不正确   java Javafx内存泄漏   对于手动实现的Spring数据存储库方法,我应该使用Java8默认方法吗?   googleappengine中的java添加过滤查询   html当使用JSOUP库在Java中读取标签时,如何保留标签(如<br>、<ul>、<li>、<p>等)的含义?   编码为什么jasper生成的报告在Java中不显示西里尔语(保加利亚语)?   java有没有办法隐藏当前位置和jdk动作?   java找出编译原型文件的版本   有没有办法在运行时更改java方法的访问修饰符?   语法字符串。。。Java中的参数   java数组元素在添加其他元素时会相互覆盖   eclipse中的java GWT项目   java如何为spring rest模板请求将动态json属性名映射到jackson   java无法在Windows 10上找到特定的JDK   在xml字符串和java字符串之间提取正则表达式子字符串