多线程Python套接字发送器/客户端
我有一个使用Twisted框架的应用程序,它在监听Int32StringReceiver消息,然后把这些消息转发给另一个应用。简单来说,它就像一个路由器,但它有一些智能,可以分析数据的去向。
我遇到的问题是在发送消息的过程中,出现了很多错误信息等等。
接收部分是一个叫Receiver的类,继承自Int32StringReceiver:
def doActualForwarding(self, data):
self.stats.recvBits += 8 * (4 + len(data))
self.stats.recvMsgs += 1
dlen = len(data)
if dlen > 1024*256:
self.logger.info("router.Receiver.doActualForwarding(): data len: %s" % (dlen))
self.router.forward(data)
def stringReceived(self, data):
d = threads.deferToThread(self.doActualForwarding, data)
d.addCallback(self.forwardingDoneOkay)
d.addErrback(self.forwardingDoneError)
self.router是一个实例对象,需要通过socket通信把这些消息以相同的格式发送出去。所以,它在Router类中就这样做:
def connect(self):
if self.sock:
try:
self.sock.close()
except:
pass
try:
self.stats.connectAttempts += 1
self.sock = socket.socket()
self.sock.settimeout(self.CONNECT_TIMEOUT)
self.sock.connect(self.destination)
self.sock.settimeout(self.SEND_TIMEOUT)
self.set_keepalive_linux(self.sock)
self.connected = True
self.log.info("connected to %s" % (self.destination,))
self.stats.reconnects += 1
self.stats.connectCompletes += 1
return True
except Exception, e:
self.connected = False
if not self.drop_ok:
self.log.error("connect %s: %s" % (self.destination, e))
return False
def send(self, msg):
trynum = 0
while trynum < self.MAX_SEND_ATTEMPTS:
self.logSent()
if not self.connected:
if not self.connect():
self.stats.badSends += 1
time.sleep(self.DELAY_BEFORE_RECONNECT)
continue
try:
if ((time.time() - self.lastReconnectTime) > self.RECONNECT_EVERY):
self.lastReconnectTime = time.time()
assert False, "Reconnecting with destination to redistribute load."
self.sock.sendall(msg)
#self.closeSocket()
self.stats.events += 1
return True
except Exception, e:
whichKind = None
if 'Broken pipe' in str(e):
self.stats.brokenPipe += 1
elif 'Resource temporarily unavilable' in str(e):
self.stats.resourceTempUnavail += 1
elif 'Bad file descriptor' in str(e):
self.stats.badFileDescriptor += 1
self.log.error("send: %s %s" % (str(self.destination), str(e)))
try:
self.sock.close()
except:
pass
self.connected = False
self.stats.badSends += 1
trynum += 1
if trynum == 1:
self.stats.eventsWithRetry += 1
if trynum > 1:
self.log.warning("recon_sender.send(): Trynum non-singular, was: %s" % (trynum))
return False
def __del__(self):
try:
self.sock.close()
except:
pass
问题:
Python的Socket库是线程安全的吗?也就是说,两个或更多的线程都指向Router这个对象。两个线程都在调用self.sock.sendall(msg),我担心它们会互相干扰。
一个症状是,可能连续的消息会被拼接在一起。我不太确定,但看起来是这样的。
我看到很多资源临时不可用(意思是目标忙),还有差不多数量的断管,以及少量的坏文件描述符。
- [Errno 9] 坏文件描述符
- [Errno 11] 资源暂时不可用
- [Errno 32] 断管
这些错误信息大约占通过这个程序的消息总数的0.5%(0.005)。
我尝试让每次发送都进行连接/发送全部/关闭,但这导致出现了很多关于“连接被对方重置”的消息。
大家似乎都专注于处理多线程接收socket消息的代码,但很少有人讨论多线程发送socket消息的问题。
我还尝试使用(可能不太正确):
import threading self.lock = threading.Lock() with self.lock: sock.sendall(msg)
但这导致出现了关于超时的错误信息(真糟糕)。
有人能给我指个方向,推荐一些好的例子(或者直接提供一些?)来演示多线程socket的sendall()吗?
1 个回答
我想说,如果这些进程之间不需要相互沟通,最好的办法就是为每个进来的连接创建一个新的进程。这样你就不用担心锁的问题,因为每个连接都是单独处理的。
一个简单的实现方式是:
import socket
import multiprocessing
import pdb
import random
from pycurl import Curl
import os
import time
import re
class query(object):
pid, addr, conn, url, ua, ref = [None for i in range(6)]
compression = True
def __init__(self, conn, addr):
self.pid = addr[1]
self.addr = addr
self.conn = conn
self.process()
def process(self):
#do your socket stuff here
class ProxyServer(object):
def __init__(self, host, port):
self.host = host
self.port = port
def start(self):
logging.info("Server started on %s:%i" % (self.host, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((self.host, self.port))
self.sock.listen(0)
while True:
conn, addr = self.sock.accept()
logging.info('Connection made from %s' % conn)
proc = multiprocessing.Process(target=query, args=(conn, addr))
proc.daemon = True
proc.start()
logging.info('Started processing query %r for %s' % (proc, addr))
if __name__ == "__main__":
serv = ProxyServer(host, port)
try:
serv.start()
except:
finally:
for proc in multiprocessing.active_children():
proc.terminate()
proc.join()
请记住,这只是我从旧的概念验证代码中剪切下来的例子,你需要在它准备好投入使用之前稍微调整一下。