如何正确使用KeyboardInterrupt与UDP socket.recv()?
我现在在用 socket.socketpair()
和 signal.set_wakeup_fd()
这个方法,目的是让 socket.recv()
在 Windows 上能和 KeyboardInterrupt
一起工作,特别是在使用 UDP 的时候——下面有个例子。
我也知道还有一种比较流行的解决办法,就是设置一个忙循环,配合 timeout=
来定期检查程序是否被中断。
不过这两种方法看起来都像是一些不太优雅的解决方案。有没有什么“正确”的做法呢?
import select
import signal
import socket
def _udp_listen(address, family=socket.AF_INET, flags=0, sockopts=frozenset()):
bufsize = getpagesize()
with socket.socket(family, socket.SOCK_DGRAM) as sock:
for sockopt in sockopts:
sock.setsockopt(*sockopt)
sock.bind(address)
_coalmine, _canary = socket.socketpair()
with _canary, _coalmine, _wakeup_fd_ctx(_coalmine.fileno(), strict=True, warn_on_full_buffer=False):
while True:
ready = select.select((sock, _canary), (), ())[0]
if _canary in ready:
# There's no need to raise any error ourselves,
# since Python itself will raise KeyboardInterrupt
# out of select.select() if needed
pass
if sock in ready:
yield sock.recv(bufsize, flags)
# -----
from contextlib import contextmanager
try:
from resource import getpagesize
except ImportError:
import mmap
def getpagesize():
return mmap.PAGESIZE
@contextmanager
def _wakeup_fd_ctx(fd, strict=True, **k):
_orig_wakeup_fd = signal.set_wakeup_fd(fd, **k)
_needs_restore = True
try:
if _orig_wakeup_fd == -1:
yield fd
else:
# We overwrote the existing handler
if strict:
raise RuntimeError(f'wakeup fd already occupied ({_orig_wakeup_fd}). Not sure what to do about that.')
else:
signal.set_wakeup_fd(_orig_wakeup_fd)
_needs_restore = False
yield _orig_wakeup_fd
finally:
if _needs_restore:
signal.set_wakeup_fd(_orig_wakeup_fd)
1 个回答
0
以下内容是否有帮助呢?基本的思路是把你要读取的套接字放到一个列表里,然后进行选择操作,接着检查哪个套接字有数据可以读取,然后对这些数据进行处理。
#!/usr/bin/python3
import time
import socket
import select
import threading
glob_control_read_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_control_read_socket.bind(('localhost', 0))
glob_control_write_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_control_write_socket.connect(('localhost', glob_control_read_socket.getsockname()[1]))
glob_server_read_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_server_read_socket.bind(('localhost', 12000))
print("Server running on port " + str(glob_server_read_socket.getsockname()[1]))
def sockets_shutdown():
glob_control_write_socket.close()
glob_control_read_socket.close()
glob_server_read_socket.close()
print("Graceful shutdown")
def handle_server_read():
read_str = glob_server_read_socket.recv(100).decode("utf-8")
print("Server got: " + read_str.rstrip())
def sockets_thread_main():
read_sockets_list = [glob_control_read_socket, glob_server_read_socket]
while True:
try:
read_sockets,_,_ = select.select(read_sockets_list, [], [])
for ready_socket in read_sockets:
if ready_socket == glob_control_read_socket:
control_bytes = glob_control_read_socket.recv(32)
if control_bytes == b"QUIT":
sockets_shutdown()
return
if ready_socket == glob_server_read_socket:
handle_server_read()
except Exception as e:
print(str(e))
sockets_thread = threading.Thread(target = sockets_thread_main, args = ())
try:
sockets_thread.start()
while True:
time.sleep(100000) # Placeholder, do something real here
except KeyboardInterrupt:
glob_control_write_socket.send(b"QUIT")
sockets_thread.join()
在最近的Windows 10和ncat上测试过(有趣的是,如果我用ncat连接到本地,连接会失败,并且显示“远程主机强制关闭了一个现有连接。”我需要使用127.0.0.1)。
补充:socketpair()在Windows上可用吗?