如何使用Python multiprocessing.P实现Java FixedThreadPool

2024-05-29 04:59:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要在Python中使用一个进程池。要求如下:

这个游泳池有一个固定的大小,比如10。我有许多作业要提交给人才库(N>;10)。 在Java中,可以使用FixedThreadPool来实现这个目的。作业被提交,一旦一个线程执行完一个任务,客户机就可以提交下一个任务。因此,如果当前有10个任务正在运行,则客户端无法提交第11个任务。但是如果一个任务完成了,客户端就可以向可用线程提交下一个任务。在

这是我用来测试一些想法的代码:

import multiprocessing, time


def printStuff(number):
    print number
    if number % 2 : time.sleep(0.5)
    return number*number

pool = multiprocessing.Pool(5, None, None, None)   
a = []

def execute():
    def resultAggregator(n):
        print 'aggregator called...'
        a.append(n)
    for i in range (0, 34):

        # With callback
        #pool.apply_async(printStuff, [i], None, resultAggregator)
        #print "called for ", i

        # Without callback
        res = pool.apply_async(printStuff, [i])
        print "called for" , i, "returned ", res.get()

    pool.close() # disable sumitting any more tasks
    pool.join() # wait for all the worker to finish

execute()
print a

在资源获取()块,直到printStuff返回。使用回调变量甚至不调用printStuff。注意在这两种情况下,末尾的a是空的。在

有什么想法如何实现上述行为?代码片段会很棒,但它足够一个指向我不知道的现有库函数的指针,或者只是简单地加入一些想法。在


Tags: 代码none客户端numberforexecutetimedef
1条回答
网友
1楼 · 发布于 2024-05-29 04:59:16

我不知道Java的FixedThreadPool,但我可以修复您的代码;-)

你显然不想使用res.get(),对吧?所以我忽略这一部分。.apply_async()的问题是调用不正确。我很惊讶没有提出例外!参数列表应该是元组,而不是列表(对于内置的apply()函数)。对于关键字args参数,None不起作用。如果没有要传递的关键字参数,请忽略它(如下所示),或者传递一个空字典({})。在

这里的其他更改更具装饰性:引入了IO锁以防止终端输出被置乱,并引入了__name__ == "__main__"检查以确保清晰,从而使代码也能在Windows上运行:

import multiprocessing, time

def getlock(lck):
    global iolock
    iolock = lck

def printStuff(number):
    with iolock:
        print number
    if number % 2:
        time.sleep(0.5)
    return number*number

def execute():
    def resultAggregator(n):
        with iolock:
            print 'aggregator called...'
        a.append(n)

    for i in range(34):
        pool.apply_async(printStuff, (i,), callback=resultAggregator)
        with iolock:
            print "called for ", i

if __name__ == "__main__":
    a = []
    iolock = multiprocessing.Lock()
    pool = multiprocessing.Pool(5, getlock, (iolock,))   
    execute()
    pool.close()
    pool.join()
    print a

稍后:错误

如果您为关键字参数传递None,但multiprocessing将其取消,则实际上会引发异常。唉,这是异步技巧的一个常见问题:没有好的方法来引发异常!它们发生在与你的“主程序”当时正在做的无关的上下文中。在

至少python3.3.2的.apply_async()实现也有一个可选的error_callback参数。我不知道是什么时候引进的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略…)它们。添加此功能:

^{pr2}$

将调用改为:

pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)

生成以ouch()结尾的回溯,异常详细信息如下:

TypeError: printStuff() argument after ** must be a mapping, not NoneType

因此,至少在最近的Python中,您可以不让异步错误以无形的方式传递。在

问答

Could you explain the "global iolock" declaration within getLock() ? I thought it defines a global variable for each subprocess, but changing the name from iolock to iiolock in ? "main" makes iolock unknown to the worker processes.

对不起,我不能从中看出你到底做了什么。名称iolock旨在成为所有进程、主进程和子进程的全局名称。这是因为我的代码中的所有进程都使用名称iolock。在

如果,例如,“通过改变名字…”你的意思是你只是替换了

iolock = multiprocessing.Lock()

iiolock = multiprocessing.Lock()

那你就有个例外了:

Traceback (most recent call last):
  ...
    pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined

如果您还将该行(pool = ...)也改为使用iiolock,那么当主进程中的resultAggregator试图使用iolock时,您将得到一个不同的异常:

Exception in thread Thread-3:
Traceback (most recent call last):
  ...
  File "mpool.py", line 19, in resultAggregator
    with iolock:
NameError: global name 'iolock' is not defined

所以我不知道你到底做了什么。在

Also declaring printStuff within execute causes a silent error (code does not progress past printing "called for")

那不行。Python中的函数没有声明-def是一个可执行的语句。在执行def printstuff之前,printStuff的代码不存在。因为只有主程序执行execute(),所以execute()中的函数def只存在于主程序中。这是真的

    pool.apply_async(printStuff, (i,), callback=resultAggregator)

printStuff传递给子进程,但是所有传递的东西都是通过在发送端进行pickle和在接收端取消pickle来工作的,并且函数对象不能被pickle。你确定没有遇到这样的错误吗?公司名称:

_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

(我在这里使用的是Python3——可能在Python2下是不同的)。在

在任何情况下,Python不是Java-不要疯狂地嵌套-保持简单;-)子进程使用的每个函数和类都应该在模块级定义(class也是Python中的可执行语句!Python中唯一的“声明”是globalnonlocal语句)。在

更多问答

You are right about the assumptions. I changed to iiolock in all places except main

还是不知道你到底做了什么。对于这样的事情,你真的需要发布代码,而不仅仅是描述你做了什么。我只能用另外一种方式来猜测——这真的很痛苦;——)这个怎么样:如果你有新的问题,打开一个新的问题?在

根据您在这里所描述的(“除main之外的所有地方”),您将在execute()中得到一个异常,因为新的iiolock名称将不存在于主进程中(主进程是execute()运行的唯一进程,并且您说您在main()中更改了旧的iolock)。但你没有提到例外,所以我猜你并没有真正做到你说的那样(“除了梅因以外的所有地方”)。在

and was expecting that the new Process just get the same lock passed as a param to initialization function but each has its own global iiolock variable. How can multiple processes share the same variable anyway (aren't the memory contents different for each process???).

有两个答案;-)最直接相关的是iolock(在我最初的代码中,我真的不知道你的代码现在是什么样子)是一个由multiprocessing(它是一个mp.Lock())创建的对象,并通过mp.Pool()传递给子进程:

pool = multiprocessing.Pool(5, getlock, (iolock,)) 
                                         ^^^^^^

mp控制着这里的一切,并在幕后做了一件大事,以确保这个{}在进程之间具有一致的状态。它不仅仅是任何旧变量,它是mp所知道的,以及{}实现的所有行为。在

第二个答案是,您也可以使用mp在“共享内存”中创建一些类型的数据。请参阅mp.Valuemp.Arraymultiprocessing.sharedctypes的文档。这些价值观是真正(物理上)跨流程共享的。在

但是除了那些(由mp实现的对象和从mp获得的“共享内存”)之外,您是对的:没有其他值在进程之间共享(无论是物理上还是语义上)。通过在不同的mp同步点对对象进行pickle(序列化)和unpickle(从pickle字符串重构一个对象的值)来完成所有其他类型的对象值的通信(比如当您在mp.Queue上对一个对象进行.get()处理时)。在

相关问题 更多 >

    热门问题