Celery:组中一个子任务总是超时
我在使用Celery的组功能时遇到了一些烦人的问题。
我需要定期检查一堆主机解析到的IP地址,以确保这些IP没有变化。为此,我有一个字典,里面存储了< 主机名, IP地址 >
,我需要验证这些信息。例如:
REQUIRED_HOSTS = {
'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
'stackoverflow.com': {'198.252.206.16'}
}
所以我只需要定期遍历REQUIRED_HOSTS.keys()
,解析主机名,看看解析出来的IP是否和我记录的不同。(这点不难理解)
为了提高效率,我让每个主机名并行解析。我为此创建了一个子任务(它使用dnspython进行解析):
@my_tasks.task
def resolve_hostname(hostname, resolver=None):
""" This subtask resolves the 'hostname' to its IP addresses. It's
intended to be used in the 'compare_required_ips' function to resolve
names in parallel """
if resolver is None:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
try:
return (hostname,
{hst.address for hst in resolver.query(hostname)})
except Exception, e:
logger.exception("Got %s when trying to resolve hostname=%s"
% (type(e), hostname))
raise e
现在,查询所有主机名并生成子任务的方法如下:
@my_taks.task
def compare_required_ips():
""" This method verifies that the IPs haven't changed. """
retval = []
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
logger.info("Going to compare IPs for %s hostnames=%s"
% (len(required_hosts.REQUIRED_HOSTS.keys()),
required_hosts.REQUIRED_HOSTS.keys()))
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
for hostname, ips in ip_subtasks.get(timeout=90):
retrieved_hosts[hostname] = ips
for hostname in required_hosts.REQUIRED_HOSTS:
if (required_hosts.REQUIRED_HOSTS[hostname]
!= retrieved_hosts[hostname]):
retval.append(hostname)
logger.error(
"IP resolution mismatch. hostname=%s resolve_target=%s"
", resolve_actual=%s (mismatch=%s)"
% (hostname,
required_hosts.REQUIRED_HOSTS[hostname],
retrieved_hosts[hostname],
(required_hosts.REQUIRED_HOSTS[hostname]
^ retrieved_hosts[hostname]))
)
return retval
这也很简单……只需遍历REQUIRED_HOSTS
的键(也就是主机名),为每个主机名生成一个子任务进行解析,然后用90秒的超时来收集结果(这个超时在for hostname, ips in ip_subtasks.get(timeout=90)
这一行中设置)
现在的问题是,除了一个子任务外,所有子任务都在90秒内成功完成。然后父任务(compare_required_ips
)因为timeout=90
而失败,而在父任务失败后,那个子任务却成功完成。我尝试过增加和减少超时时间,但子任务总是按照我在group
创建时指定的超时来执行,这导致主任务报告失败。
我还手动运行了解析(没有使用celery任务,而是用普通的线程),结果在毫秒内就解析完成。每次测试都是如此。我认为这不是dns.resolver.Resolver()
类的问题。所有迹象都表明这个类解析得非常快,但子任务、组,或者……Celery中的某个部分似乎并不知道这一点(不过只有一个子任务是这样)
我使用的是celery==3.1.9
、celery-with-redis==3.0
和flower==0.6.0
来进行监控。
任何帮助、提示或测试建议都将非常感谢。
1 个回答
一个问题可能是因为启动同步子任务而导致的死锁。compare_required_ips
是一个 celery 任务。在这个任务内部,你在等待一组 resolve_hostname
任务完成,这样做效率很低。
所以你需要把这个
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
改成
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
).delay()
这样可以让你所有的任务异步启动,从而避免死锁。
而且
你不应该在 compare_required_ips
任务里面使用 ip_subtasks.get()
(即使 ip_subtask
只需要极短的时间)。你需要为此写一个新函数,或者使用 celery 的任务成功信号。