使用自定义函数将dask包写入数据库

2024-05-23 15:36:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在dask bag上运行一个函数,将数据转储到NoSQL DB中,如:

def write_to_db(x):
   # code to write into db
   db.insert_many(x)
   return

def func():
   # code to process each element
   for col in int_cols:
       try:
           x[col] = int(x[col])
       except (ValueError, TypeError):
           x[col] = None

import dask.bag as db

bag = db.read_text(...)
bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db)
bag.compute()

现在,当我查看dask任务图时,在每个分区完成write_to_db函数之后,它显示为memory,而不是released

我的问题是:

  1. 如何告诉dask没有返回值,从而将内存标记为已释放?例如,在下图中,我希望右侧的红色方块标记为released,即蓝色
  2. func()释放GIL吗?有没有办法优化这种计算
  3. 我做这种计算的方法正确吗?(通过将自定义函数传递到map_分区,插入数据库)

Dask Task Graph


Tags: to函数mapdbdefcodecoldask
1条回答
网友
1楼 · 发布于 2024-05-23 15:36:51
  1. 是的,Dask在内存中保存了隐式返回None值作为结果,但是这些值很小,我不担心。您的compute()的输出将是一组None(实际上,为了保持袋子排列,您可能希望将其列为一个列表)
  2. Dask不会为您发布GIL,但您调用的DB函数可能会读取该项目的文档;如果它确实而不是释放GIL,您可能会看到更多进程和更少线程/进程的性能更好
  3. 这似乎是一个好办法。使用dask.delayed的版本可能具有相同的行数

相关问题 更多 >