multiprocessing.Queue与gevent兼容吗?
有人知道这段代码有什么问题吗?它一直在“加载”,没有任何输出。“Sites”是一个包含几十个字符串的列表。
num_worker_threads = 30
def mwRegisterWorker():
while True:
try:
print q.get()
finally:
pass
q = multiprocessing.JoinableQueue()
for i in range(num_worker_threads):
gevent.spawn(mwRegisterWorker)
for site in sites:
q.put(site)
q.join() # block until all tasks are done
2 个回答
3
建议使用 gevent.queue.JoinableQueue
。绿色线程(gevent
内部使用的)既不是普通的线程,也不是进程,而是一种带有用户级调度的协程。
11
gevent.spawn()
创建的是绿色线程,而不是进程(更准确地说,所有的绿色线程都在一个操作系统线程中运行)。所以 multiprocessing.JoinableQueue
在这里不合适。
gevent
是基于 协作式 多任务处理的,也就是说,只有当你调用一个阻塞函数,切换到 gevent
的事件循环时,其他的绿色线程才会运行。例如,下面的 conn
使用了为 gevent
修改过的套接字方法,这样在等待网站回复时,其他的绿色线程可以继续运行。如果没有 pool.join()
,它会把控制权交给运行事件循环的绿色线程,那么就不会建立任何连接。
如果你想在向多个网站发送请求时限制并发量,可以使用 gevent.pool.Pool
:
#!/usr/bin/env python
from gevent.pool import Pool
from gevent import monkey; monkey.patch_socket()
import httplib # now it can be used from multiple greenlets
import logging
info = logging.getLogger().info
def process(site):
"""Make HEAD request to the `site`."""
conn = httplib.HTTPConnection(site)
try:
conn.request("HEAD", "/")
res = conn.getresponse()
except IOError, e:
info("error %s reason: %s" % (site, e))
else:
info("%s %s %s" % (site, res.status, res.reason))
finally:
conn.close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(msg)s")
num_worker_threads = 2
pool = Pool(num_worker_threads)
sites = ["google.com", "bing.com", "duckduckgo.com", "stackoverflow.com"]*3
for site in sites:
pool.apply_async(process, args=(site,))
pool.join()
if __name__=="__main__":
main()