多线程环境与如pickle或json的模块
我正在使用“import threading”和Python 3.4。简单来说,我有一个主线程和一个子线程。我需要从子线程中把我的字典保存到文件里。在线程的函数里,我有一个变量:
def thread_function(...)
def save_to_file():
this_thread_data.my_dict or nonlocal this_thread_data.my_dict
... json or pickle
this_thread_data = local()
this_thread_data.my_dict = {...}
...
当我使用pickle时,我遇到了错误
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
当我使用json时,我也遇到了错误
TypeError: <threading.Event object at 0x7f49115a9588> is not JSON serializable
在多线程环境下,pickle或json能用吗?还是说我需要用其他东西?
谢谢。
3 个回答
在Python中,线程处理(threading)和多进程处理(multiprocessing)以及数据序列化(pickling)有一些问题和限制,除非你使用一些标准库以外的工具。
如果你使用一个叫做 pathos.multiprocessing
的库,它是 multiprocessing
的一个分支,你就可以在多进程的 map
函数中直接使用类和类的方法。这是因为它使用了 dill
,而不是 pickle
或 cPickle
,而 dill
能够序列化几乎所有的Python对象。 pathos.multiprocessing
也提供了一个与标准Python模块类似的线程模块接口。
pathos.multiprocessing
还提供了一个异步的 map 函数……而且它可以处理多个参数的函数(例如 map(math.pow, [1,2,3], [4,5,6])
)。
你可以查看以下链接了解更多信息: 多进程和dill可以一起做什么?
还有: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> #from pathos.multiprocessing import ThreadingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
在字典中有一些不寻常的东西也没关系……
>>> d = {'1':add, '2':t, '3':Test, '4':range(10), '5':1}
>>>
>>> def items(x):
... return x[0],x[1]
...
>>> p.map(items, d.items())
[('1', <function add at 0x103b7e2a8>), ('3', <class '__main__.Test'>), ('2', <__main__.Test object at 0x103b7ad90>), ('5', 1), ('4', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]
顺便说一下,如果你想序列化一个线程锁,也可以做到。
>>> import dill as pickle
>>> import threading
>>> lock = threading.Lock()
>>>
>>> pickle.loads(pickle.dumps(lock))
<thread.lock object at 0x10c534650>
看起来你想构建一种闭包,能够自动将函数调用存储到文件中,或者至少存储为一个序列化的字符串。如果这是你想要的,你可以试试 klepto
,它提供了一个装饰器,你可以把它应用到你的函数上,这样你就可以将结果缓存到内存、磁盘或数据库中。 Klepto
可以使用 pickle 或 json,但它是通过 dill
增强的,所以它几乎可以序列化Python中的任何东西——所以不用担心你的字典里有什么……只需序列化它。
from klepto import lru_cache as memoize
from klepto.keymaps import picklemap
dumps = picklemap(serializer='dill')
class Adder(object):
"""A simple class with a memoized method"""
@memoize(keymap=dumps, ignore=('self','**'))
def __call__(self, x, *args, **kwds):
debug = kwds.get('debug', False)
if debug:
print ('debug:', x, args, kwds)
return sum((x,)+args)
add = __call__
add = Adder()
assert add(2,0) == 2
assert add(2,0,z=4) == 2 # cached (ignore z)
assert add(2,0,debug=False) == 2 # cached (ignore debug)
assert add(1,2,debug=False) == 3
assert add(1,2,debug=True) == 3 # cached (ignore debug)
assert add(4) == 4
assert add(x=4) == 4 # cached
Klepto
让你在重新启动代码时可以访问所有缓存的结果。在这种情况下,你可以选择一个文件或数据库作为后端,然后确保你执行 add.dump()
来保存归档……然后重新启动Python或其他程序,再执行 add.load()
来加载归档的结果。
你可以在这里获取代码: https://github.com/uqfoundation
在多线程环境中使用pickle和json是可以的,但可能不是线程安全的,所以要确保在使用时数据不会被改变,比如可以使用锁来保护数据。需要注意的是,你保存到硬盘上的数据类型是有限制的。
并不是所有的对象都可以被序列化,正如你发现的那样。最简单的方法是确保你的字典里只包含那些可以被pickle或json序列化的值。例如,你的字典里似乎存储了一个锁对象,这导致pickle无法正常工作。你可以创建一个新的字典,只包含可以被序列化的值,然后再进行序列化。
另外,如果你想创建一个自定义对象来存储数据,你可以告诉pickle如何序列化它。这种方法比较高级,可能在你的情况下并不必要,但你可以在这里找到更多的文档: https://docs.python.org/3.4/library/pickle.html#pickling-class-instances