python多处理调度tas

2024-05-23 14:32:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有8个CPU核心和200个任务要做。每个任务都是孤立的。不需要等待或共享结果。我正在寻找一种方法来运行8个任务/进程一次(最多)当其中一个完成。剩余任务将自动启动进程。在

如何知道子进程何时完成并启动新的子进程。首先,我尝试使用进程(多处理),但很难理解。然后我尝试使用pool和face来解决pickle问题,因为我需要使用动态实例化。在

编辑:添加我的池代码

class Collectorparallel():

def fire(self,obj):
    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)

def start(self):
    log_to_stderr(logging.DEBUG)
    pluginObjectList = []
    for pluginName in self.settingModel.getAllCollectorName():
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)



    pool = Pool(8)
    jobs = pool.map(self.fire,pluginObjectList)
    pool.close()

    print pluginObjectList

pluginObjectList有类似

^{pr2}$

PicklingError:无法pickle:属性查找内置。instancemethod失败

但过程版本运行良好


Tags: nameselfobj进程deffirepickleclass
3条回答

你的问题的解决办法是微不足道的。首先,请注意,方法不能被腌制。实际上,只有^{}'s documentation中列出的类型可以被pickle:

  • None, True, and False
  • integers, long integers, floating point numbers, complex numbers
  • normal and Unicode strings
  • tuples, lists, sets, and dictionaries containing only picklable objects
  • functions defined at the top level of a module
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section The pickle protocol for details).

[...]

Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised. [4]

Similarly, classes are pickled by named reference, so the same restrictions in the unpickling environment apply. Note that none of the class’s code or data is pickled[...]

显然,方法不是在模块的顶层定义的函数,因此它不能被pickle(仔细阅读文档的这一部分,以避免pickle将来出现问题!)但是用一个全局函数替换该方法并将self作为附加参数传递是绝对简单的:

import itertools as it


def global_fire(argument):
    self, obj = argument
    self.fire(obj)


class Collectorparallel():

    def fire(self,obj):
        collectorController = Collectorcontroller()
        collectorController.crawlTask(obj)

    def start(self):
        log_to_stderr(logging.DEBUG)
        pluginObjectList = []
        for pluginName in self.settingModel.getAllCollectorName():
            name = pluginName.capitalize()
            #Get plugin class and instanitiate object
            module = __import__('plugins.'+pluginName,fromlist=[name])
            pluginClass = getattr(module,name)
            pluginObject = pluginClass()
            pluginObjectList.append(pluginObject)



        pool = Pool(8)
        jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList))
        pool.close()

        print pluginObjectList

注意,由于Pool.map只使用一个参数调用给定函数,因此我们必须将self和实际参数“打包”在一起。为此,我有zippedit.repeat(self)和原始的iterable。在

如果您不关心调用的完成顺序,那么使用pool.imap_unordered可能会提供更好的性能。但是它返回一个iterable而不是一个list,因此如果您想要结果列表,您必须执行jobs = list(pool.imap_unordered(...))。在

我不是Python中多处理的专家,但是我尝试了一些方法,通过这个帮助http://www.tutorialspoint.com/python/python_multithreading.htm和这个帮助http://www.devshed.com/c/a/Python/Basic-Threading-in-Python/1/。在

例如,您可以使用这个方法isAlive来回答您的问题。在

警告这有点主观的部署和情况,但我目前的设置如下

我有一个工人程序,我启动6个副本(我有6个核心)。 每个工人都要做到以下几点:

  1. 连接到Redis实例
  2. 试着从一个特定的列表中弹出一些工作
  3. 回推日志信息
  4. 空闲或因“队列”中缺少工作而终止

然后,每个程序基本上都是独立的,同时仍然使用单独的排队系统执行所需的工作。由于您在流程中没有中间人,所以这可能是您问题的解决方案。在

相关问题 更多 >