我需要在Python中使用一个进程池。要求如下:
这个游泳池有一个固定的大小,比如10。我有许多作业要提交给人才库(N>;10)。 在Java中,可以使用FixedThreadPool来实现这个目的。作业被提交,一旦一个线程执行完一个任务,客户机就可以提交下一个任务。因此,如果当前有10个任务正在运行,则客户端无法提交第11个任务。但是如果一个任务完成了,客户端就可以向可用线程提交下一个任务。在
这是我用来测试一些想法的代码:
import multiprocessing, time
def printStuff(number):
print number
if number % 2 : time.sleep(0.5)
return number*number
pool = multiprocessing.Pool(5, None, None, None)
a = []
def execute():
def resultAggregator(n):
print 'aggregator called...'
a.append(n)
for i in range (0, 34):
# With callback
#pool.apply_async(printStuff, [i], None, resultAggregator)
#print "called for ", i
# Without callback
res = pool.apply_async(printStuff, [i])
print "called for" , i, "returned ", res.get()
pool.close() # disable sumitting any more tasks
pool.join() # wait for all the worker to finish
execute()
print a
在资源获取()块,直到printStuff
返回。使用回调变量甚至不调用printStuff
。注意在这两种情况下,末尾的a
是空的。在
有什么想法如何实现上述行为?代码片段会很棒,但它足够一个指向我不知道的现有库函数的指针,或者只是简单地加入一些想法。在
我不知道Java的
FixedThreadPool
,但我可以修复您的代码;-)你显然不想使用
res.get()
,对吧?所以我忽略这一部分。.apply_async()
的问题是调用不正确。我很惊讶没有提出例外!参数列表应该是元组,而不是列表(对于内置的apply()
函数)。对于关键字args参数,None
不起作用。如果没有要传递的关键字参数,请忽略它(如下所示),或者传递一个空字典({}
)。在这里的其他更改更具装饰性:引入了IO锁以防止终端输出被置乱,并引入了
__name__ == "__main__"
检查以确保清晰,从而使代码也能在Windows上运行:稍后:错误
如果您为关键字参数传递
None
,但multiprocessing
将其取消,则实际上会引发异常。唉,这是异步技巧的一个常见问题:没有好的方法来引发异常!它们发生在与你的“主程序”当时正在做的无关的上下文中。在至少python3.3.2的
^{pr2}$.apply_async()
实现也有一个可选的error_callback
参数。我不知道是什么时候引进的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略…)它们。添加此功能:将调用改为:
生成以
ouch()
结尾的回溯,异常详细信息如下:因此,至少在最近的Python中,您可以不让异步错误以无形的方式传递。在
问答
对不起,我不能从中看出你到底做了什么。名称
iolock
旨在成为所有进程、主进程和子进程的全局名称。这是因为我的代码中的所有进程都使用名称iolock
。在如果,例如,“通过改变名字…”你的意思是你只是替换了
到
那你就有个例外了:
如果您还将该行(
pool = ...
)也改为使用iiolock
,那么当主进程中的resultAggregator
试图使用iolock
时,您将得到一个不同的异常:所以我不知道你到底做了什么。在
那不行。Python中的函数没有声明-
def
是一个可执行的语句。在执行def printstuff
之前,printStuff
的代码不存在。因为只有主程序执行execute()
,所以execute()
中的函数def
只存在于主程序中。这是真的将
printStuff
传递给子进程,但是所有传递的东西都是通过在发送端进行pickle和在接收端取消pickle来工作的,并且函数对象不能被pickle。你确定没有遇到这样的错误吗?公司名称:(我在这里使用的是Python3——可能在Python2下是不同的)。在
在任何情况下,Python不是Java-不要疯狂地嵌套-保持简单;-)子进程使用的每个函数和类都应该在模块级定义(
class
也是Python中的可执行语句!Python中唯一的“声明”是global
和nonlocal
语句)。在更多问答
还是不知道你到底做了什么。对于这样的事情,你真的需要发布代码,而不仅仅是描述你做了什么。我只能用另外一种方式来猜测——这真的很痛苦;——)这个怎么样:如果你有新的问题,打开一个新的问题?在
根据您在这里所描述的(“除main之外的所有地方”),您将在
execute()
中得到一个异常,因为新的iiolock
名称将不存在于主进程中(主进程是execute()
运行的唯一进程,并且您说您在main()
中更改了旧的iolock
)。但你没有提到例外,所以我猜你并没有真正做到你说的那样(“除了梅因以外的所有地方”)。在有两个答案;-)最直接相关的是
iolock
(在我最初的代码中,我真的不知道你的代码现在是什么样子)是一个由multiprocessing
(它是一个mp.Lock()
)创建的对象,并通过mp.Pool()
传递给子进程:mp
控制着这里的一切,并在幕后做了一件大事,以确保这个{mp
所知道的,以及{第二个答案是,您也可以使用
mp
在“共享内存”中创建一些类型的数据。请参阅mp.Value
和mp.Array
和multiprocessing.sharedctypes
的文档。这些价值观是真正(物理上)跨流程共享的。在但是除了那些(由
mp
实现的对象和从mp
获得的“共享内存”)之外,您是对的:没有其他值在进程之间共享(无论是物理上还是语义上)。通过在不同的mp
同步点对对象进行pickle(序列化)和unpickle(从pickle字符串重构一个对象的值)来完成所有其他类型的对象值的通信(比如当您在mp.Queue
上对一个对象进行.get()
处理时)。在相关问题 更多 >
编程相关推荐