如何在后台运行过程并持续检查输入 - 线程?
我有一个小的服务器和客户端的Python脚本,客户端发送一个字符串,服务器则返回这个字符串的反转。当客户端输入一个退出的字符串时,客户端会退出,然后服务器也会退出。
我想让服务器的“接收、反转、发送”这个过程在后台运行,同时程序要不断检查输入,看有没有退出的字符串。
我试过使用threading
(线程),但是因为很多socket调用会导致阻塞,所以它没有正常工作。
为了让你了解我已经做了什么。
server.py:
import socket
from time import sleep
sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept()
print "Client connected"
while True:
m = conn[0].recv(4096)
if m == "exit":
sleep(1)
break
else:
conn[0].send(m[::-1])
sock.shutdown(socket.SHUT_RDWR)
sock.close()
client.py:
import socket
sock = socket.socket()
sock.connect(("127.0.0.1",12346))
while True:
s = raw_input("message: ")
sock.send(s)
if s == "exit":
print "Quitting"
break
print sock.recv(4096)
sock.shutdown(socket.SHUT_RDWR)
sock.close()
3 个回答
我之前发布过一个关于如何用Python构建预先分叉的JSON-RPC服务器的代码片段,这次我对代码进行了修改,以解决这个问题。代码片段在这里:https://gist.github.com/matthewstory/4547282
$ python server.py localhost 9999 5
exit
$
接下来,我们来看看为什么这个方法有效。主进程会创建很多子进程(在上面的例子中是5
个),每个子进程都会进入一个接收请求的循环:
# simple pre-fork server, fork before accept
for i in range(int(argv[2])):
# fork our current process
pid = os.fork()
# if we are the child fork ...
if 0 == pid:
# die without unhandled exception
for signum in ( signal.SIGINT, signal.SIGTERM, ):
signal.signal(signum, _gogentle)
# under the hood, this calls `socket.accept`
s.serve_forever()
os._exit(0)
# if we are the papa fork
else:
_PIDS.append(pid)
这些子进程会处理所有发往localhost:9999
的请求。然后,主进程会进入一个选择/等待的组合循环:
# setup signal relaying for INT and TERM
for signum in ( signal.SIGINT, signal.SIGTERM, ):
signal.signal(signum, _kronos)
# wait on the kids
while len(_PIDS):
# 1s timeout here means we're checking for exiting children at most
# 1x per second, prevents a busy loop
reads, _, _ = select.select([sys.stdin], [], [], 1)
if sys.stdin in reads:
# blocking, read 1 line
cmd = sys.stdin.readline()
# kill ourselves ... kronos will propegate
if cmd.strip() == 'exit':
os.kill(os.getpid(), signal.SIGTERM)
# check for exited children, non-blocking
while True:
pid, rc = os.waitpid(-1, os.WNOHANG)
if not pid:
break
_PIDS.remove(pid)
这个select
会检查标准输入(stdin
)是否准备好可以读取,如果准备好了,我们就会从标准输入读取一行;如果没有准备好,它最多会等待1秒
,然后直接检查有没有子进程结束(使用os.waitpid
和WNOHANG
标志)。
这是一个非阻塞套接字接收的例子。如果没有数据可接收,套接字会抛出一个异常。
import sys
import socket
import fcntl, os
import errno
from time import sleep
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1',9999))
fcntl.fcntl(s, fcntl.F_SETFL, os.O_NONBLOCK)
while True:
try:
msg = s.recv(4096)
except socket.error, e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
sleep(1)
print 'No data available'
continue
else:
# a "real" error occurred
print e
sys.exit(1)
else:
# got a message, do something :)
这里是一个非阻塞标准输入读取的例子:
import sys
import select
# If there's input ready, do something, else do something
# else. Note timeout is zero so select won't block at all.
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
line = sys.stdin.readline()
if line:
something(line)
else: # an empty line means stdin has been closed
print('eof')
exit(0)
else:
something_else()
基本上,你想把它们结合起来,并可能添加一些超时设置,以便在有很多连接的情况下,定期强制读取标准输入。
因为你希望服务器在处理客户端的同时,还能从服务器的 stdin
接收输入,所以你可以把当前的服务器代码放到一个 Thread
里,然后等待从 stdin
输入。
import socket
from time import sleep
import threading
def process():
sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept()
print "Client connected"
while True:
m = conn[0].recv(4096)
conn[0].send(m[::-1])
sock.shutdown(socket.SHUT_RDWR)
sock.close()
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
while True:
exit_signal = raw_input('Type "exit" anytime to stop server\n')
if exit_signal == 'exit':
break
这样的话,你可以去掉客户端的“退出”检查。
在这个代码中,服务器在客户端断开后会什么都不做,只是等待在 stdin
输入“退出”。你可能想要扩展这个代码,让服务器能够接受新的客户端连接,因为你不希望客户端有能力关闭服务器。在这种情况下,你可以在 conn = sock.accept()
和 sock.close()
之间再加一个 while
循环。
正如 @usmcs 提到的,如果你没有其他命令要发送给服务器,使用 CTRL-C (KeyboardInterrupt
) 会更好,这样你就不需要线程了,同时它也能优雅地结束服务器(意思是不会因为 CTRL-C 而报错),可以用以下代码实现:
import socket
from time import sleep
import threading
sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept()
print "Client connected"
while True:
try:
m = conn[0].recv(4096)
conn[0].send(m[::-1])
except KeyboardInterrupt:
break
sock.close()