如何将作业从多个客户机线程发送到多处理器工作线程池并返回结果(python)?

2024-04-16 10:44:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要计算一个昂贵的计算函数很多次,并希望使用它的所有处理器核心。如果我同时拥有所有函数参数集,那就相对简单了:我可以使用多重处理池.map. 但是,我并没有一次完成所有的函数计算,并且希望避免为每个函数计算启动一个单独的进程。因此,我想启动一个worker池(没有问题),将客户机的作业发送给他们(使用queue,没有问题),然后将结果返回给客户机(但是怎么做?)。你知道吗

更具体地说,我需要在参数的多维网格数组上计算我的函数。在我的例子中,通过使用函数的两个特定属性,可以避免对每个数组点的计算:对于所有参数,它是单调的不递减的,并且它只能取几个离散值。因此,如果网格上的两个独立点的函数值相等,则这两个点之间的所有点的函数值都相同。现在我们将网格划分为多个部分并使用递归。然而,使用递归意味着我不能很容易地使用池.map你知道吗

实际上,我找到了一个可行的解决方案,但我想可能有一个更直接、更可靠的方法。你知道吗

我设置了一个单独的调度程序线程。它通过一个结果队列从工作者那里获得所有结果,然后让客户机获取他们的结果。下面的代码为一个简化(2d)的情况。你知道吗

编辑:Shihab Shahriar的评论。大部分代码需要使整个工作正常进行,但与问题没有直接关系。具体来说:

contpl()sample_f()是辅助函数。在实际问题中,将使用复杂而昂贵的模拟,而不是sample_f()。你知道吗

recfill()是递归函数。它需要昂贵的计算结果,并通过调用getz()来接收它们。它将在不同的线程中递归地启动自身的实例。其他细节不重要。你知道吗

getz()是我的解决方案的一个重要部分:它是上述递归函数和worker池之间的代理。它通过队列task_q将作业参数(由id标记)发送到worker池,然后等待来自dispatcher()的事件并检查计算结果是否已经到达,然后将它们返回给调用方。由于父recfill()实例在多个线程中运行,getz()也是。你知道吗

Worker()-类的实例在不同的进程中运行,等待作业从队列task_q到达,调用“昂贵函数”,并将结果与id标记一起放入result_queue/results_q

dispatcher()在一个单独的线程中运行,从results_q接收结果,并将它们放入一个共享dict中,以id作为索引。然后将事件发送到getz()实例进行检查,其结果已到达。你知道吗

main-启动worker,启动dispatcher,调用recfill(),并进行清理。你知道吗


from concurrent.futures import ThreadPoolExecutor as Pool
import multiprocessing
import threading

import numpy as np

import matplotlib.pyplot as plt


def contpl(p1, p2, Z):
    """
    plots color density plot of XYZf
    """
    plt.figure(figsize=(5,5))
    plt.contourf(p1, p2, Z, 3, cmap='RdYlGn')
    plt.show()


def sample_f(x, y):
    """
    simple sample monotonous function to plot quater-circles
    returns integer values from 0 to 2
    """
    return np.round(1.4 * np.sqrt(x**2 + y**2))


def getz(ix, iy, mp_params):
    """
    gets the values of the function sample_f for (x, y) values
    from the parameter arrays p1, p2 for the indices ix, iy
    if the value has not been already computed,
    send the job to a worker, then wait until the result is ready
    """
    task_q = mp_params["task_q"]
    result_event = mp_params["result_event"]
    result_dict = mp_params["result_dict"]
    num_workers = mp_params["num_workers"]
    results_q = mp_params["results_q"]

    z = Zarr[ix, iy]
    if z >= 0:
        return z     # nice, the point has already been calculated
    # otherwise z is -1 from the array initialisazion
    else:
        # compute "flattened index" of a point as id
        dims = Zarr.shape
        id = np.ravel_multi_index((ix, iy), dims)

        task_q.put((id, p1[ix, iy], p2[ix, iy])) # send the job, targeted by the id, to workers

        # not wait until dispatcher calls
        while True:
            result_event.wait()
            try:
                # anything for me?
                z = result_dict.pop(id)
                result_event.clear()
                break
            except KeyError:
                pass
        # now the point is computed, write the value into the array
        Zarr[ix, iy] = z
        return z

def recfill(ix, iy, mp_params):
    """
    recursive function to compute values of a monotonous function
    on a 2D square (sub-)array of parameters
    """

    (ix0, ix1) = ix # x indices
    (iy0, iy1) = iy # y indices

    z0 = getz(ix0, iy0, mp_params) # get the bottom left point

    # if the array size is one in all dimensions, we reached the recursion limit
    if (ix0 == ix1) and (iy0 == iy1):
        return

    else:
        # get the top right point
        z1 = getz(ix1, iy1, mp_params)
        # if the values for bottom left and top right are equal, they are the same for all
        # elements in between
        if z0 == z1:
            Zarr[ix0:ix1+1, iy0:iy1+1] = z0 # fill in the subarray
            return # and we are done for this recursion branch

        else:
            # divide the sub-array by half in each dimension
            xhalf = (ix1 - ix0 + 1) // 2
            yhalf = (iy1 - iy0 + 1) // 2

            ixlo = (ix0, ix0+xhalf-1)
            iylo = (iy0, iy0+yhalf-1)
            ixhi = (ix0+xhalf, ix1)
            iyhi = (iy0+yhalf, iy1)

            # prepare arguments for the map function
            l1 = [(ixlo, iylo), (ixlo, iyhi), (ixhi, iylo), (ixhi, iyhi)]
            (ixs, iys) = zip(*l1)
            mpps = [mp_params]*4

            # and now multithreaded recursive call for each quater of the initial sub-array
            with Pool() as p:
                p.map(recfill, ixs, iys, mpps)

            return


class Worker(multiprocessing.Process):
    """
    adapted from
    https://pymotw.com/3/multiprocessing/communication.html
    """
    def __init__(self, mp_params):
        multiprocessing.Process.__init__(self)
        self.task_queue = mp_params["task_q"]
        self.result_queue = mp_params["results_q"]

    def run(self):
        proc_name = self.name
        while True:
            job = self.task_queue.get()
            if job is None:
                print('{}: Exiting'.format(proc_name))
                break

            (id, x, y) = job
            result = sample_f(x, y)

            answer = (id, result)
            self.result_queue.put(answer)


def dispatcher(mp_params):
    """
    receives the computation results from the results queue,
    puts them into a shared dictionary,
    and notifies all clients per event,
    that they should check the dictionary,
    if there is anything for them
    """
    result_event = mp_params["result_event"]
    result_dict = mp_params["result_dict"]
    results_q = mp_params["results_q"]

    while True:
        qitem = results_q.get()
        if qitem is not None:
            (id, result) = qitem
            result_dict[id] = result
            result_event.set()
        else:
            break


if __name__ == '__main__':

    result_event = threading.Event()
    num_workers = multiprocessing.cpu_count()
    task_q = multiprocessing.SimpleQueue()
    results_q = multiprocessing.Queue() # why using SimpleQueue here would hang the program?
    result_dict = {}


    mp_params = {}
    mp_params["task_q"] = task_q
    mp_params["results_q"] = results_q
    mp_params["result_dict"] = result_dict
    mp_params["result_event"] = result_event
    mp_params["num_workers"] = num_workers


    print('Creating {} workers'.format(num_workers))

    workers = [Worker(mp_params) for i in range(num_workers)]
    for w in workers:
        w.start()

    # creating dispatcher thread
    t = threading.Thread(target=dispatcher, args=(mp_params, ))
    t.start()

    # creating parameter arrays
    arrsize = 128
    xvec = np.linspace(0, 1, arrsize)
    yvec =    np.linspace(0, 1, arrsize)
    (p1, p2) = np.meshgrid(xvec, yvec)

    # initialize the results array
    # our sample_f returns only non-negative values
    # therefore fill in with -1 to indicate the values
    # which have not been computed yet
    Zarr = np.full_like(p1, -1, dtype=np.int8)

    # now call our recursive function
    # to compute all array values
    recfill((0,arrsize-1), (0,arrsize-1), mp_params)

    # clean up
    for i in range(num_workers):
        task_q.put(None) # stop all workers

    results_q.put(None) # stop dispatcher
    t.join()

    # plot the results
    contpl(p1, p2, Zarr)


    # and check the results by comparing with directly
    # calculated values
    Z = sample_f(p1, p2)
    assert np.all(Z == Zarr)

Tags: the函数eventidfortaskifnp