Celery:组中一个子任务总是超时

1 投票
1 回答
583 浏览
提问于 2025-04-28 21:11

我在使用Celery的功能时遇到了一些烦人的问题。

我需要定期检查一堆主机解析到的IP地址,以确保这些IP没有变化。为此,我有一个字典,里面存储了< 主机名, IP地址 >,我需要验证这些信息。例如:

REQUIRED_HOSTS = {
    'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
    'stackoverflow.com': {'198.252.206.16'}
}

所以我只需要定期遍历REQUIRED_HOSTS.keys(),解析主机名,看看解析出来的IP是否和我记录的不同。(这点不难理解)

为了提高效率,我让每个主机名并行解析。我为此创建了一个子任务(它使用dnspython进行解析):

@my_tasks.task
def resolve_hostname(hostname, resolver=None):
    """ This subtask resolves the 'hostname' to its IP addresses. It's
    intended to be used in the 'compare_required_ips' function to resolve
    names in parallel """
    if resolver is None:
        resolver = dns.resolver.Resolver()
        resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers

    try:
        return (hostname,
                {hst.address for hst in resolver.query(hostname)})
    except Exception, e:
        logger.exception("Got %s when trying to resolve hostname=%s"
                         % (type(e), hostname))
        raise e

现在,查询所有主机名并生成子任务的方法如下:

@my_taks.task
def compare_required_ips():
    """ This method verifies that the IPs haven't changed. """
    retval = []
    resolver = dns.resolver.Resolver()
    resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
    retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
    logger.info("Going to compare IPs for %s hostnames=%s"
                % (len(required_hosts.REQUIRED_HOSTS.keys()),
                   required_hosts.REQUIRED_HOSTS.keys()))
    ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    )()
    for hostname, ips in ip_subtasks.get(timeout=90):
        retrieved_hosts[hostname] = ips

    for hostname in required_hosts.REQUIRED_HOSTS:
        if (required_hosts.REQUIRED_HOSTS[hostname]
                != retrieved_hosts[hostname]):
            retval.append(hostname)
            logger.error(
                "IP resolution mismatch. hostname=%s resolve_target=%s"
                ", resolve_actual=%s (mismatch=%s)"
                % (hostname,
                   required_hosts.REQUIRED_HOSTS[hostname],
                   retrieved_hosts[hostname],
                   (required_hosts.REQUIRED_HOSTS[hostname]
                    ^ retrieved_hosts[hostname]))
            )
    return retval

这也很简单……只需遍历REQUIRED_HOSTS的键(也就是主机名),为每个主机名生成一个子任务进行解析,然后用90秒的超时来收集结果(这个超时在for hostname, ips in ip_subtasks.get(timeout=90)这一行中设置)

现在的问题是,除了一个子任务外,所有子任务都在90秒内成功完成。然后父任务(compare_required_ips)因为timeout=90而失败,而在父任务失败后,那个子任务却成功完成。我尝试过增加和减少超时时间,但子任务总是按照我在group创建时指定的超时来执行,这导致主任务报告失败。

我还手动运行了解析(没有使用celery任务,而是用普通的线程),结果在毫秒内就解析完成。每次测试都是如此。我认为这不是dns.resolver.Resolver() 的问题。所有迹象都表明这个类解析得非常快,但子任务、组,或者……Celery中的某个部分似乎并不知道这一点(不过只有一个子任务是这样)

我使用的是celery==3.1.9celery-with-redis==3.0flower==0.6.0来进行监控。

任何帮助、提示或测试建议都将非常感谢。

暂无标签

1 个回答

2

一个问题可能是因为启动同步子任务而导致的死锁。compare_required_ips 是一个 celery 任务。在这个任务内部,你在等待一组 resolve_hostname 任务完成,这样做效率很低。

所以你需要把这个

ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    )()

改成

ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    ).delay()

这样可以让你所有的任务异步启动,从而避免死锁。

而且

你不应该在 compare_required_ips 任务里面使用 ip_subtasks.get()(即使 ip_subtask 只需要极短的时间)。你需要为此写一个新函数,或者使用 celery 的任务成功信号

撰写回答