在Python中并行多次运行类方法
我实现了一个Python的socket服务器,它可以把多个摄像头拍摄的图像数据发送给客户端。我的请求处理类大概是这样的:
class RequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
while True:
data = self.request.recv(1024)
if data.endswith('0000000050'): # client requests data
for camera_id, camera_path in _video_devices.iteritems():
message = self.create_image_transfer_message(camera_id, camera_path)
self.request.sendto(message, self.client_address)
def create_image_transfer_message(self, camera_id, camera_path):
# somecode ...
由于客户端的原因,我必须使用这个socket服务器。虽然它能正常工作,但问题是它是一个接一个地处理请求,这样在上传摄像头图像时就会有很大的延迟。我希望能并行创建传输消息,并在调用之间设置一个小延迟。
我尝试使用multiprocessing中的池类来实现:
import multiprocessing
class RequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
...
pool = multiprocessing.Pool(processes=4)
messages = [pool.apply(self.create_image_transfer_message, args=(camera_id, camera_path)) for camId, camPath in _video_devices.iteritems()]
但是这样会抛出错误:
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我想知道有没有其他方法可以并行创建这些传输消息,并在调用之间设置一个固定的延迟?
编辑:
我使用来自多个摄像头的数据来创建响应消息。问题是,如果我让图像抓取的程序运行得太接近,就会出现图像伪影,因为USB总线会过载。我发现,如果以0.2秒的延迟顺序调用图像抓取,就能解决这个问题。因为在图像抓取函数运行时,摄像头并不是一直在发送数据,所以延迟的并行调用可以得到好的图像,并且它们之间的延迟也很小。
2 个回答
0
这段代码对我来说解决了问题:
class RequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
while True:
data = self.request.recv(1024)
if data.endswith('0000000050'): # client requests data
process_manager = multiprocessing.Manager()
messaging_queue = process_manager.Queue()
jobs = []
for camId, camPath in _video_devices.iteritems():
p = multiprocessing.Process(target=self.create_image_transfer_message,
args=(camera_id, camera_path, messaging_queue))
jobs.append(p)
p.start()
time.sleep(0.3)
# wait for all processes to finish
for p in jobs:
p.join()
while not messaging_queue.empty():
self.request.sendto(messaging_queue.get(), self.client_address)
1
我觉得你已经走在正确的道路上了,不用放弃你之前的工作。
这是我通过谷歌搜索“多进程类方法”找到的一个关于如何在多进程中使用类方法的答案。
from multiprocessing import Pool
import time
pool = Pool(processes=2)
def unwrap_self_f(arg, **kwarg):
return C.create_image_transfer_message(*arg, **kwarg)
class RequestHandler(SocketServer.BaseRequestHandler):
@classmethod
def create_image_transfer_message(cls, camera_id, camera_path):
# your logic goes here
def handle(self):
while True:
data = self.request.recv(1024)
if not data.endswith('0000000050'): # client requests data
continue
pool.map(unwrap_self_f,
(
(camera_id, camera_path)
for camera_id, camera_path in _video_devices.iteritems()
)
)
需要注意的是,如果你想从工作进程中返回值,那么你需要了解一下如何使用共享资源,可以参考这个答案 - 如何获取传递给multiprocessing.Process的函数的返回值?