在Python中并行多次运行类方法

0 投票
2 回答
2385 浏览
提问于 2025-04-27 23:07

我实现了一个Python的socket服务器,它可以把多个摄像头拍摄的图像数据发送给客户端。我的请求处理类大概是这样的:

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        while True:
            data = self.request.recv(1024)
            if data.endswith('0000000050'): # client requests data

                for camera_id, camera_path in _video_devices.iteritems():
                    message = self.create_image_transfer_message(camera_id, camera_path)
                    self.request.sendto(message, self.client_address)

     def  create_image_transfer_message(self, camera_id, camera_path):
         # somecode ...

由于客户端的原因,我必须使用这个socket服务器。虽然它能正常工作,但问题是它是一个接一个地处理请求,这样在上传摄像头图像时就会有很大的延迟。我希望能并行创建传输消息,并在调用之间设置一个小延迟。

我尝试使用multiprocessing中的池类来实现:

import multiprocessing

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):

    ...

   pool = multiprocessing.Pool(processes=4)
   messages = [pool.apply(self.create_image_transfer_message, args=(camera_id, camera_path)) for camId, camPath in _video_devices.iteritems()]

但是这样会抛出错误:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我想知道有没有其他方法可以并行创建这些传输消息,并在调用之间设置一个固定的延迟?

编辑:

我使用来自多个摄像头的数据来创建响应消息。问题是,如果我让图像抓取的程序运行得太接近,就会出现图像伪影,因为USB总线会过载。我发现,如果以0.2秒的延迟顺序调用图像抓取,就能解决这个问题。因为在图像抓取函数运行时,摄像头并不是一直在发送数据,所以延迟的并行调用可以得到好的图像,并且它们之间的延迟也很小。

暂无标签

2 个回答

0

这段代码对我来说解决了问题:

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        while True:
            data = self.request.recv(1024)
            if data.endswith('0000000050'): # client requests data

                process_manager = multiprocessing.Manager()
                messaging_queue = process_manager.Queue()
                jobs = []

                for camId, camPath in _video_devices.iteritems():
                    p = multiprocessing.Process(target=self.create_image_transfer_message,
                                                args=(camera_id, camera_path, messaging_queue))
                    jobs.append(p)
                    p.start()
                    time.sleep(0.3)

                # wait for all processes to finish
                for p in jobs:
                    p.join()

                while not messaging_queue.empty():
                    self.request.sendto(messaging_queue.get(), self.client_address)
1

我觉得你已经走在正确的道路上了,不用放弃你之前的工作。

这是我通过谷歌搜索“多进程类方法”找到的一个关于如何在多进程中使用类方法的答案。

from multiprocessing import Pool
import time

pool = Pool(processes=2)

def unwrap_self_f(arg, **kwarg):
    return C.create_image_transfer_message(*arg, **kwarg)

class RequestHandler(SocketServer.BaseRequestHandler):

    @classmethod
    def create_image_transfer_message(cls, camera_id, camera_path):
        # your logic goes here

    def handle(self):
        while True:
            data = self.request.recv(1024)
            if not data.endswith('0000000050'): # client requests data
                continue

            pool.map(unwrap_self_f, 
                (
                    (camera_id, camera_path)
                    for camera_id, camera_path in _video_devices.iteritems()
                )
            )

需要注意的是,如果你想从工作进程中返回值,那么你需要了解一下如何使用共享资源,可以参考这个答案 - 如何获取传递给multiprocessing.Process的函数的返回值?

撰写回答