如何在后台运行过程并持续检查输入 - 线程?

7 投票
3 回答
11165 浏览
提问于 2025-04-17 23:54

我有一个小的服务器和客户端的Python脚本,客户端发送一个字符串,服务器则返回这个字符串的反转。当客户端输入一个退出的字符串时,客户端会退出,然后服务器也会退出。

我想让服务器的“接收、反转、发送”这个过程在后台运行,同时程序要不断检查输入,看有没有退出的字符串。

我试过使用threading(线程),但是因为很多socket调用会导致阻塞,所以它没有正常工作。

为了让你了解我已经做了什么。

server.py:

import socket
from time import sleep

sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept() 
print "Client connected"

while True:
    m = conn[0].recv(4096)
    if m == "exit":
        sleep(1)
        break
    else:
        conn[0].send(m[::-1])

sock.shutdown(socket.SHUT_RDWR)
sock.close()

client.py:

import socket

sock = socket.socket()
sock.connect(("127.0.0.1",12346))

while True:
    s = raw_input("message: ")
    sock.send(s)

    if s == "exit":
        print "Quitting"
        break

    print sock.recv(4096)

sock.shutdown(socket.SHUT_RDWR)
sock.close()

3 个回答

1

我之前发布过一个关于如何用Python构建预先分叉的JSON-RPC服务器的代码片段,这次我对代码进行了修改,以解决这个问题。代码片段在这里:https://gist.github.com/matthewstory/4547282

$ python server.py localhost 9999 5
exit
$

接下来,我们来看看为什么这个方法有效。主进程会创建很多子进程(在上面的例子中是5个),每个子进程都会进入一个接收请求的循环:

# simple pre-fork server, fork before accept
for i in range(int(argv[2])):
    # fork our current process
    pid = os.fork()

    # if we are the child fork ...
    if 0 == pid:
        # die without unhandled exception
        for signum in ( signal.SIGINT, signal.SIGTERM, ):
            signal.signal(signum, _gogentle)

        # under the hood, this calls `socket.accept`
        s.serve_forever()
        os._exit(0)

    # if we are the papa fork
    else:
        _PIDS.append(pid)

这些子进程会处理所有发往localhost:9999的请求。然后,主进程会进入一个选择/等待的组合循环:

# setup signal relaying for INT and TERM
for signum in ( signal.SIGINT, signal.SIGTERM, ):
    signal.signal(signum, _kronos)

# wait on the kids
while len(_PIDS):
    # 1s timeout here means we're checking for exiting children at most
    # 1x per second, prevents a busy loop
    reads, _, _ = select.select([sys.stdin], [], [], 1)
    if sys.stdin in reads:
        # blocking, read 1 line
        cmd = sys.stdin.readline()
        # kill ourselves ... kronos will propegate
        if cmd.strip() == 'exit':
            os.kill(os.getpid(), signal.SIGTERM)

    # check for exited children, non-blocking
    while True:
        pid, rc = os.waitpid(-1, os.WNOHANG)
        if not pid:
            break
        _PIDS.remove(pid)

这个select会检查标准输入(stdin)是否准备好可以读取,如果准备好了,我们就会从标准输入读取一行;如果没有准备好,它最多会等待1秒,然后直接检查有没有子进程结束(使用os.waitpidWNOHANG标志)。

1

这是一个非阻塞套接字接收的例子。如果没有数据可接收,套接字会抛出一个异常。

import sys
import socket
import fcntl, os
import errno
from time import sleep

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1',9999))
fcntl.fcntl(s, fcntl.F_SETFL, os.O_NONBLOCK)

while True:
    try:
        msg = s.recv(4096)
    except socket.error, e:
        err = e.args[0]
        if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
            sleep(1)
            print 'No data available'
            continue
        else:
            # a "real" error occurred
            print e
            sys.exit(1)
    else:
        # got a message, do something :)

这里是一个非阻塞标准输入读取的例子:

import sys
import select

# If there's input ready, do something, else do something
# else. Note timeout is zero so select won't block at all.
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
  line = sys.stdin.readline()
  if line:
    something(line)
  else: # an empty line means stdin has been closed
    print('eof')
    exit(0)
else:
  something_else()

基本上,你想把它们结合起来,并可能添加一些超时设置,以便在有很多连接的情况下,定期强制读取标准输入。

13

因为你希望服务器在处理客户端的同时,还能从服务器的 stdin 接收输入,所以你可以把当前的服务器代码放到一个 Thread 里,然后等待从 stdin 输入。

import socket
from time import sleep
import threading

def process():
    sock = socket.socket()
    sock.bind(("127.0.0.1",12346))
    sock.listen(3)
    print "Waiting on connection"
    conn = sock.accept()
    print "Client connected"

    while True:
        m = conn[0].recv(4096)
        conn[0].send(m[::-1])

    sock.shutdown(socket.SHUT_RDWR)
    sock.close()

thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
while True:
    exit_signal = raw_input('Type "exit" anytime to stop server\n')
    if exit_signal == 'exit':
        break

这样的话,你可以去掉客户端的“退出”检查。

在这个代码中,服务器在客户端断开后会什么都不做,只是等待在 stdin 输入“退出”。你可能想要扩展这个代码,让服务器能够接受新的客户端连接,因为你不希望客户端有能力关闭服务器。在这种情况下,你可以在 conn = sock.accept()sock.close() 之间再加一个 while 循环。

正如 @usmcs 提到的,如果你没有其他命令要发送给服务器,使用 CTRL-C (KeyboardInterrupt) 会更好,这样你就不需要线程了,同时它也能优雅地结束服务器(意思是不会因为 CTRL-C 而报错),可以用以下代码实现:

import socket
from time import sleep
import threading

sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept()
print "Client connected"

while True:
    try:
        m = conn[0].recv(4096)
        conn[0].send(m[::-1])
    except KeyboardInterrupt:
        break

sock.close()

撰写回答