如何在Databricks笔记本中实现并行性?

2024-04-23 19:37:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望在Databricks中有并行性,这意味着我的代码将使用cpu核心来计算而不是核心的线程。为此,我使用了多重处理-

import multiprocessing as mp
from multiprocessing.pool import ThreadPool

def hello(name):
  print("Hello " + name + "\n")

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    # pool = ThreadPool(mp.cpu_count()) # this works in both Databricks and local computer
    pool.map(hello, ["spark", "databricks"])
    pool.close()

但它给了我一个错误- PicklingError: Can't pickle <function hello at 0x7fc136cdbd08>: attribute lookup hello on __main__ failed

整个堆栈跟踪-

---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<command-500228233074730> in <module>
      8     pool = mp.Pool(mp.cpu_count())
      9     # pool = ThreadPool(mp.cpu_count())
---> 10     pool.map(hello, ["spark", "databricks"])
     11     pool.close()

/usr/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

/usr/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

/usr/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/usr/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

PicklingError: Can't pickle <function hello at 0x7fc136cdbd08>: attribute lookup hello on __main__ failed

虽然当我在本地计算机上运行相同的代码时,它运行得很好(你也可以测试)。你知道吗

如何解决这个问题?你知道吗


Tags: inpyselfobjmaphellolibusr