为什么我的平行化方法不能扩展?

2024-06-02 04:56:55 发布

您现在位置:Python中文网/ 问答频道 /正文

  • 我必须迭代一个大集合(>;50GB)
  • 我使用游标和多处理池以及队列作为通信工具。
    • 速度很慢(大约1500个文档/秒)

我是否可以加快处理速度以换取更多的内存使用

def dowork(args):

    uid = int(args.get('uid'))
    if map_userid_visits.get(uid):
        map_userid_visits[uid] += 1
    else:
        map_userid_visits[uid] = 1

def main():

    manager = Manager()
    map_userid_visits = manager.dict()
    start_time = time.time()
    print ('Start Time', start_time)
    cur = cursor.Cursor(mycollection)
    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
    iteration = 0
    for user_event in cur:
        pool.apply(dowork, (user_event, ))
    pool.close()
    pool.join()
    print map_userid_visits

Tags: mapuidgettimedefmanagerargsstart
1条回答
网友
1楼 · 发布于 2024-06-02 04:56:55

你的方法无法扩展。主要的问题是一次只发送一行,这样就可以做一个非常轻量级的操作。这意味着序列化开销远远大于工作本身

同样来自multiprocessing documentation

apply(func[, args[, kwds]])¶

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

相关问题 更多 >