批评这段Python代码(使用线程池的爬虫)
这段Python代码怎么样?需要大家给点意见。 这段代码有个错误,有时候会打印出“全部等待 - 可以完成!”然后就卡住了(之后就不再有任何动作发生了……)但我找不到为什么会这样。
这是一个使用线程池的网站爬虫:
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_WAIT = 1
W_WORK = 0
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
self.pool = pool
self.state = None
def is_wait(self):
return self.state == W_WAIT
def run(self):
while True:
#if all workers wait - time to exsit
print "CHECK WAIT: !!! ",self.pool.is_all_wait()
if self.pool.is_all_wait():
print "ALL WAIT - CAN FINISH!"
return
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
print "task wait timeout"
continue
self.state = W_WORK
print "START !!! in thread %s" % str(self)
#print args
try: func(*args, **kargs)
except Exception, e: print e
print "!!! STOP in thread %s", str(self)
self.tasks.task_done()
self.state = W_WAIT
#threads can fast empty it!
#if self.tasks.qsize() == 0:
# print "QUIT!!!!!!"
# break
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def is_all_wait(self):
for w in self.workers:
if not w.is_wait():
return False
return True
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_completion()
start('evgenm.com','utf8')
谢谢大家的帮助!我做了新的实现: 你们觉得这段代码#2怎么样? ==================================尝试 #2=======================================
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_STOP = 1
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.pool = pool
self.state = None
self.start()
def stop(self):
self.state = W_STOP
def run(self):
while True:
if self.state == W_STOP:
print "\ncalled stop"
break
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
continue
print "\n***START*** %s" % str(self)
try:
func(*args, **kargs)
except Exception, e:
print e
print "\n***STOP*** %s", str(self)
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def stop_threads(self):
for w in self.workers:
w.stop()
def wait_stop(self):
self.wait_completion()
self.stop_threads()
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_stop()
start('evgenm.com','utf8')
2 个回答
0
我对Python有一些基础知识,但我听说Python中的线程并不是很有用?我看到很多文章在批评全局锁这个东西。
1
你在不同的线程之间共享状态(比如在 is_all_wait
里),但没有进行同步处理。而且,所有线程都在“等待”并不能可靠地说明 队列 是空的(比如,它们可能都在获取任务的过程中)。我怀疑,有时候线程在队列真正空之前就已经退出了。如果这种情况发生得比较频繁,你可能会发现队列里还有任务,但没有线程来执行它们。这样的话,queue.join()
就会一直等待下去。
我的建议是:
- 去掉
is_all_wait
-- 这个并不是一个可靠的指示 - 去掉任务的
state
-- 这个其实并不必要 - 依靠
queue.join
来告诉你什么时候所有任务都处理完了
如果你需要终止线程(比如,这是一部分较大且长时间运行的程序),那么在 queue.join()
之后再去做。