AsyncResult.successful() 返回 false,get() 抛出属性错误
我正在尝试第一次使用Python的ThreadPool,目的是为了加快一些非常慢的日志解析。
可惜的是,这似乎没有正常工作。我在网上搜索时找不到类似的情况。我调用了pool.join()
来等待线程完成,然后再遍历它们以获取返回值。然而,我发现虽然AsyncResult.ready()返回的是true,但AsyncResult.successful()却返回false。而当我调用get()时,又出现了属性错误。
Traceback (most recent call last):
File "C:\Users\luke.timothy\Documents\Aptana Studio 3 Workspace\Monitor\monitor.py", line 175, in <module>
stamp = threads[i].get()
File "C:\Python27\lib\multiprocessing\pool.py", line 528, in get
raise self._value
AttributeError: _strptime
我还发现,当join()
函数返回时,只有4个线程完成了。这让我感到意外,因为根据文档,我以为join会等待所有线程完成后再返回。我还发现,如果在访问返回值之前对每个线程调用AsyncResult.wait()
,那么什么也不会发生。它根本不等待。
这是我的代码:
def parse(all_logs):
current_time_local = datetime.now()
print "Parsing."
stamps = []
for line in all_logs:
match = re.match(match_string, line)
if match:
for i in range(4):
if match.group(1 + (i * 3)):
wheren = match.group(1 + (i * 3)).rstrip().strip("[").strip("]").split(",")
break
stamp = datetime.strptime(wheren[0], "%Y-%m-%d %H:%M:%S")
if stamp.day == current_time_local.day or (stamp.day == current_time_local.day-1 and stamp.hour >= current_time_local.hour):
try:
name, aliases, ipaddrlist = socket.gethostbyaddr(wheren[1].split(":")[1])
except:
continue
stamps.append(Event(stamp,name,match.groups()))
print "Returning stamps."
return stamps
pool = ThreadPool(processes=8)
threads = []
for i in range(8):
range_begin = i * logs_fraction
range_end = range_begin + logs_fraction
print "begin: " + str(range_begin) + " end: " + str(range_end)
thread_args = []
thread_args.extend(all_logs[range_begin:range_end])
threads.append(pool.apply_async(parse, (thread_args, )))
pool.close()
pool.join()
for i in range(8):
print "Getting thread " + str(i+1)
print threads[i].ready()
print threads[i].successful()
print "Thread Ready."
stamp = threads[i].get()
print stamp
stamps.extend(stamp)
有人能帮忙吗?我以前从未使用过这个模块,经过我的搜索发现,学习它的资料相当稀少。官方的Python文档也只能帮我到这里……
1 个回答
根据这个链接,你遇到了datetime库的线程安全问题。
上周五,我遇到了一个Python的bug,这个周末我花了一些时间来调查这个bug,并写了这篇文章来解释根本原因。我不是Python专家,而是C语言程序员。如果你发现有什么错误,请纠正我。
我在这里提取了一个最小的示例:
#!/usr/bin/env python import thread import time def thread_fn(): for _ in xrange(1, 10): for _ in xrange(1, 100): time.strptime("2013-06-02", "%Y-%m-%d") for _ in xrange(10): thread.start_new_thread(thread_fn, ()) time.sleep(1)
上面的代码有时会抛出一个异常:
AttributeError: _strptime_time
,你可以在自己的环境中运行它,看看输出。我检查了Python-2.7.2(Mac默认)和Python-2.7.3(从源代码编译)。我随机得到了这个错误,这意味着有时这个脚本运行得很好!
接下来是解决方法:
你应该意识到这将是一个多线程问题,对吧?这里是
time_strptime
的实现:static PyObject * time_strptime(PyObject *self, PyObject *args) { PyObject *strptime_module = PyImport_ImportModuleNoBlock("_strptime"); PyObject *strptime_result; if (!strptime_module) return NULL; strptime_result = PyObject_CallMethod(strptime_module, "_strptime_time", "O", args); Py_DECREF(strptime_module); return strptime_result; }
每次调用这个函数时,它都会尝试加载模块“_strptime”。API PyImport_ImportModuleNoBlock的算法是,如果有一个线程正在导入这个模块,它会抛出异常,而不是在那儿阻塞。这避免了重复导入模块和潜在的死锁。
但是在多线程环境中,当一个线程试图导入_strptime,但还没有完全导入时,另一个线程直接尝试调用
strptime_module._strptime_time
。这就是为什么会出现这个bug。如果你理解了这个bug发生的原因,你应该心里已经有了解决方法。其实这非常简单。你只需要在启动线程之前调用一次
strptime
。
所以,看起来你可以通过在创建线程之前直接导入_strptime
来解决这个问题。
这是官方的bug报告,里面有更多信息。