Python多进程与SocketServer实例的通信
我有一组需要互相沟通的进程,叫它们A、B和C。A需要和B、C沟通;B需要和A、C沟通;C需要和A、B沟通。A、B和C可以在不同的机器上,也可以在同一台机器上。
我想通过套接字来进行通信,如果它们都在同一台机器上,就用“localhost”(比如A用11111端口,B用22222端口,等等)。这样的话,非本地的进程就像本地进程一样处理。为了实现这一点,我打算为A、B和C各设置一个SocketServer实例,每个实例都知道其他两个的地址。每当需要通信时,比如A要给B发送信息,A就会打开一个到B的套接字并写入数据。然后B的服务器会一直运行,读取数据并将其存储在一个列表中,以便后续使用。
我遇到的问题是,存储的信息在finish_request
方法(负责监听)和__call__
方法(负责发送)之间没有共享。(服务器类是可调用的,因为我需要这样做来处理其他事情。我认为这与问题无关。)
我的问题是,这样的设计能如我想象的那样工作吗?在同一台机器上,multiprocessing
、threading
和socketserver
能否很好地配合?我不想使用其他机制来进行进程间通信(比如Queue
或Pipe
)。我已经有了使用这些机制的可行方案。我想知道这种方法是否可行,即使效率较低。如果可行的话,我做错了什么,导致它无法正常工作?
下面是一个简单的例子,说明了这个问题:
import uuid
import sys
import socket
import time
import threading
import collections
import SocketServer
import multiprocessing
class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, server_address, client_addresses, max_migrants=1):
SocketServer.TCPServer.__init__(self, server_address, None)
self.client_addresses = client_addresses
self.migrants = collections.deque(maxlen=max_migrants)
self.allow_reuse_address = True
t = threading.Thread(target=self.serve_forever)
t.daemon = True
t.start()
def finish_request(self, request, client_address):
try:
rbufsize = -1
wbufsize = 0
rfile = request.makefile('rb', rbufsize)
wfile = request.makefile('wb', wbufsize)
data = rfile.readline().strip()
self.migrants.append(data)
print("finish_request:: From: %d To: %d MID: %d Size: %d -- %s" % (client_address[1],
self.server_address[1],
id(self.migrants),
len(self.migrants),
data))
if not wfile.closed:
wfile.flush()
wfile.close()
rfile.close()
finally:
sys.exc_traceback = None
def __call__(self, random, population, args):
client_address = random.choice(self.client_addresses)
migrant_index = random.randint(0, len(population) - 1)
data = population[migrant_index]
data = uuid.uuid4().hex
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(client_address)
sock.send(data + '\n')
finally:
sock.close()
print(" __call__:: From: %d To: %d MID: %d Size: %d -- %s" % (self.server_address[1],
client_address[1],
id(self.migrants),
len(self.migrants),
data))
if len(self.migrants) > 0:
migrant = self.migrants.popleft()
population[migrant_index] = migrant
return population
def run_it(migrator, rand, pop):
for i in range(10):
pop = migrator(r, pop, {})
print(" run_it:: Port: %d MID: %d Size: %d" % (migrator.server_address[1],
id(migrator.migrants),
len(migrator.migrants)))
time.sleep(1)
if __name__ == '__main__':
import random
r = random.Random()
a = ('localhost', 11111)
b = ('localhost', 22222)
c = ('localhost', 33333)
am = NetworkMigrator(a, [b, c], max_migrants=11)
bm = NetworkMigrator(b, [a, c], max_migrants=22)
cm = NetworkMigrator(c, [a, b], max_migrants=33)
fun = [am, bm, cm]
pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]]
jobs = []
for f, p in zip(fun, pop):
pro = multiprocessing.Process(target=run_it, args=(f, r, p))
jobs.append(pro)
pro.start()
for j in jobs:
j.join()
am.shutdown()
bm.shutdown()
cm.shutdown()
从这个例子的输出来看,会有三种类型的打印:
run_it:: Port: 11111 MID: 3071227860 Size: 0
__call__:: From: 11111 To: 22222 MID: 3071227860 Size: 0 -- e00e0891e0714f99b86e9ad743731a00
finish_request:: From: 60782 To: 22222 MID: 3071227972 Size: 10 -- e00e0891e0714f99b86e9ad743731a00
“MID”是该实例中migrants
双端队列的id
。 “From”和“To”是发送/接收传输的端口。我现在只是将数据设置为一个随机的十六进制字符串,以便跟踪每个传输。
我不明白为什么,即使MID相同,有时它会显示大小非零,而在稍后的时间又会显示大小为0。我觉得这可能与调用是多线程的有关。如果用这些行替代最后两个for
循环,系统就会按我预期的方式工作:
for _ in range(10):
for f, p in zip(fun, pop):
f(r, p, {})
time.sleep(1)
那么,使用多进程版本时发生了什么,导致它无法正常工作呢?
1 个回答
当我们创建3个新的NetworkMigrator对象时,会启动3个新线程,每个线程都在监听新的TCP连接。之后,我们又启动了3个新的进程来运行run_it这个函数。总的来说,我们有4个进程,其中第一个进程包含4个线程(1个主线程 + 3个服务器线程)。现在的问题是,其他3个进程无法访问由监听服务器线程所做的对象更改。这是因为进程默认情况下不共享内存。
所以,如果你启动3个新线程而不是进程,你会发现区别:
pro = threading.Thread(target=run_it,args=(f,r,p))
还有一个小问题。线程之间的共享也不是完全安全的。每当我们改变对象的状态时,最好使用锁。建议在finish_request和call方法中做类似下面的操作。
lock = Lock()
...
lock.acquire()
self.migrants.append(data)
lock.release()
如果你对多线程不满意,并且确实想使用多进程,那么你可以使用代理对象,具体说明可以参考这里: http://docs.python.org/library/multiprocessing.html#proxy-objects
至于对象ID相同,这并不意外。新的进程在那个时刻会接收到对象的状态(包括对象ID)。新的进程会保留这些对象ID,但我们这里讨论的是两个完全不同的内存空间,因为它们是不同的进程。所以,主进程所做的任何更改都不会反映在创建的子进程中。