在Python中创建处理队列

1 投票
3 回答
753 浏览
提问于 2025-04-17 10:44

我有一个邮箱设置,每当收到邮件时就会触发一个Python脚本。这个脚本会执行几个功能,大约需要30秒的时间,并且会在MYSQL数据库中写入一条记录。

一切运行得很顺利,直到在第一次邮件发送后不到30秒又发送了第二封邮件。第二封邮件处理得没问题,但第一次邮件在数据库中却出现了损坏的记录。

我想在脚本还没有处理完前一封邮件的时候,把邮件数据放在一个队列里,

msg=email.message_from_file(sys.stdin)

以便等脚本处理完再继续处理后面的邮件。

我使用的是Python 2.5。有没有人能推荐一个可以实现这个功能的包或脚本呢?

3 个回答

2

虽然Celery是个很不错的软件,但在这种情况下使用它就像用大锤子钉钉子一样不合适。从概念上讲,你确实是在寻找一个工作队列(Celery提供的就是这个),但是你用来触发脚本的电子邮件收件箱本身也是一个可以用作工作队列的工具。

更直接的解决办法是让Python的工作脚本直接去检查邮件服务器(比如用内置的poplib),每隔几秒钟获取一次新邮件,然后逐个处理这些新邮件。这样可以确保你的脚本一次只做一件事,避免同时运行两个副本。

例如,你可以把现有的脚本放在一个像这样(来自上面链接的文档)的函数里:

import getpass, poplib
from time import sleep

M = poplib.POP3('localhost')
M.user(getpass.getuser())
M.pass_(getpass.getpass())
while True:
    numMessages = len(M.list()[1])
    for i in range(numMessages):
        email = '\n'.join(M.retr(i+1)[1])
        # This is what your script normally does:
        do_work_for_message(email)
    sleep(5)

编辑:语法

2

我建议你去看看 http://celeryproject.org/

我很确定这个网站能完全满足你的需求。

2

我发现这是一个简单的方法,可以避免在上一个定时任务还在运行的时候再去执行新的定时任务。

fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

这样会引发一个IO错误,我会通过让进程自己结束来处理这个错误。

想了解更多信息,可以查看这个链接:http://docs.python.org/library/fcntl.html#fcntl.lockf

总之,你可以很容易地用同样的思路来确保一次只运行一个任务,这其实和排队不太一样(因为任何等待的进程都有可能获得锁),但它能实现你想要的效果。

import fcntl
import time
fd = open('lock_file', 'w')
fcntl.lockf(fd, fcntl.LOCK_EX)
# optionally write pid to another file so you have an indicator
# of the currently running process
print 'Hello'
time.sleep(1)

你也可以直接使用这个链接中的内容:http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes,它正好能满足你的需求。

撰写回答