Python:函数的并行执行

2024-06-10 21:31:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我想并行执行一组任务。我在一个类中定义了一个函数,它接受参数并基于参数执行操作。班级结构如下。你知道吗

from threading import Thread
from concurrent.futures import *
class Test(object):

  def process_dataframe(self,id:int):
    print(id*id)


  def run_task(self):
    thd = []
    for i in range(1,10): 
      thd.append( "self.process_dataframe({0})".format(i))
    self.run_functions_in_parallel(thd)

  def run_functions_in_parallel(self,fns)->bool:
    def wrap_function(self,fnToCall):
      try:
        eval(fnToCall)
        return ("0")
      except Exception as e:
        return "{0}".format(e)

    thd = []
    isError = False
    executor = ThreadPoolExecutor(max_workers=len(fns))
    errorMessage = ""

    for fn in fns:     
      t = executor.submit(wrap_function,self,fn)
      thd.append(t)

    for td in thd:
      ret = td.result()
      if ret != "0":
        isError = True
        errorMessage = errorMessage + """
        """ + ret
    if isError == True:
      print (errorMessage)
      raise Exception (errorMessage)
    else:
      return True


d=Test()
d.run_task()

我已经设法使它工作和任务执行正常。我想知道是否有更好/更简单的方法来达到同样的目的。我想让run\u函数在\u parallel method中保持泛型,这样它就可以作为模块中的通用方法使用。你知道吗


Tags: 函数runinselfidtrueforreturn
1条回答
网友
1楼 · 发布于 2024-06-10 21:31:49

您不需要使用包装器,因为ThreadPoolExecutor以更好的方式捕获错误。一个总是返回True或引发错误的函数不需要返回值,但是如果您有返回值的函数,您希望并行调用,则应该返回它们的结果。 使用魔术字符串作为错误指示器是个坏主意。^一个KeyError: 0的{}也导致"0"。最好使用一个惟一的值,比如在本例中的None。 如果不需要,不要使用eval。在您的例子中,可以使用partial。 不要对max_workers使用太大的值。你知道吗

from functools import partial
from concurrent.futures import ThreadPoolExecutor

class Test(object):
    def process_dataframe(self, id):
        print(id*id)

    def run_task(self):
        functions = []
        for i in range(1,10): 
            functions.append(partial(self.process_dataframe, i))
        self.run_functions_in_parallel(functions)

    def run_functions_in_parallel(self, functions, max_workers=8):
        executor = ThreadPoolExecutor(max_workers=max_workers)
        futures = [
            executor.submit(function)
            for function in functions
        ]

        errors = []
        results = []
        for future in futures:
            try:
                result = future.result()
            except Exception as e:
                errors.append(e)
            else:
                results.append(result)
        if errors:
            raise Exception(errors)
        return results

d = Test()
d.run_task()

相关问题 更多 >