Python socket.gethostbyname_ex() 多线程失败

1 投票
3 回答
3428 浏览
提问于 2025-04-17 12:35

我写了一个脚本,目的是用多线程的方式把多个主机名转换成IP地址。

但是它在某个随机的地方会出错并且卡住。这个问题怎么解决呢?

num_threads = 100
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database')
cur = conn.cursor()
def mexec(befehl):
    cur = conn.cursor()
    cur.execute(befehl)

websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array
queue = Queue()
def getips(i, q):
    while True:
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
        q.task_done()
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
#Wait until worker threads are done to exit
queue.join()

3 个回答

0

你可能会发现使用 concurrent.futures 比直接使用 threadingmultiprocessingQueue 更简单:

#!/usr/bin/env python3
import socket
# pip install futures on Python 2.x
from concurrent.futures import ThreadPoolExecutor as Executor

hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100
with Executor(max_workers=20) as pool:
     for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60):
         print(results)

注意:你可以很容易地从使用线程切换到使用进程:

from concurrent.futures import ProcessPoolExecutor as Executor

如果你的操作系统上 gethostbyname_ex() 不是线程安全的,比如说在 OSX 上 可能就是这样

如果你想处理在 gethostbyname_ex() 中可能出现的异常:

import concurrent.futures

with Executor(max_workers=20) as pool:
    future2host = dict((pool.submit(socket.gethostbyname_ex, h), h)
                       for h in hosts)
    for f in concurrent.futures.as_completed(future2host, timeout=60):
        e = f.exception()
        print(f.result() if e is None else "{0}: {1}".format(future2host[f], e))

这和 文档中的例子 类似。

1

我最开始的想法是,你可能因为DNS的负载过重而出现错误——也就是说,你的解析器可能不允许你在同一时间内发送太多请求。


另外,我发现了一些问题:

  1. 你在while循环中没有正确赋值site——其实用for循环遍历队列会更好。在你现在的版本中,你使用的是模块级别的site变量,这可能会导致某些请求被重复发送,而其他请求则被跳过。

    在这个地方,你可以控制队列是否还有待处理的条目。如果都没有了,你就可以结束这个线程。

  2. 出于安全考虑,你最好这样做:

    def mexec(befehl, args=None):
        cur = conn.cursor()
        cur.execute(befehl, args)
    

    这样之后可以进行:

    mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
    

为了与未来的协议保持兼容,你应该使用socket.getaddrinfo(),而不是socket.gethostbyname_ex(site)。这样你可以获取到所有你想要的IP(最开始可以限制为IPv4,但之后切换到IPv6会更容易),并且可以把它们全部存入数据库。


关于你的队列,代码示例可以是:

def queue_iterator(q):
    """Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""
    while True:
        try:
            item = q.get(block=q.is_filling, timeout=.1)
            yield item
            q.task_done() # indicate that task is done.
        except Empty:
            # If q is still filling, continue.
            # If q is empty and not filling any longer, return.
            if not q.is_filling: return

def getips(i, q):
    for site in queue_iterator(q):
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
queue.is_filling = False # we are done filling, if q becomes empty, we are done.
#Wait until worker threads are done to exit
queue.join()

这样就可以解决问题。


另一个问题是你在MySQL中并行插入数据。你一次只能执行一个MySQL查询。所以你可以通过threading.Lock()RLock()来保护访问,或者把结果放到另一个队列中,由另一个线程来处理,这样还可以进行合并。

3

你可以使用一个特殊的值来告诉线程没有工作要做,然后让线程结束,而不是使用 queue.task_done()queue.join()

#!/usr/bin/env python
import socket
from Queue import Queue
from threading import Thread

def getips(queue):
    for site in iter(queue.get, None):
        try: # resolve hostname
            result = socket.gethostbyname_ex(site)
        except IOError, e:
            print("error %s reason: %s" % (site, e))
        else:
            print("done %s %s" % (site, result))

def main():
    websites = "youtube google non-existent.example facebook yahoo live".split()
    websites = [name+'.com' for name in websites]

    # Spawn thread pool
    queue = Queue()
    threads = [Thread(target=getips, args=(queue,)) for _ in range(20)]
    for t in threads:
        t.daemon = True
        t.start()

    # Place work in queue
    for site in websites: queue.put(site)
    # Put sentinel to signal the end
    for _ in threads: queue.put(None)
    # Wait for completion
    for t in threads: t.join()

main()

gethostbyname_ex() 这个函数已经过时了。如果你想同时支持 IPv4 和 IPv6 地址,可以使用 socket.getaddrinfo() 来代替。

撰写回答