我想自动测试一台仪器,并编写了一个小型服务器程序来模拟仪器,它将发回命令,除非它收到一个特殊命令“*IDN?”。当我用自己的脚本直接运行echo服务器,然后单独运行客户端脚本时,一切都很好,我得到了预期的结果。现在我想直接从测试脚本运行服务器。所以我想我应该用多重处理来启动它。但问题似乎是,当服务器套接字到达s.accept()行时,它只是在那里等待,永远不会返回。那么,如果我不能在与测试函数相同的代码中运行此服务器,如何实现自动测试呢
import socket
import multiprocessing as mp
import time,sys
HOST = '127.0.0.1' # Standard loopback interface address (localhost),
PORT = 65432 # Port to listen on (non-privileged ports are > 1023),
FTP_PORT = 63217 # Port for ftp testing, change to 21 for device
def handle_connection(conn,addr):
with conn:
conn.send('Connected by', addr)
print("Got connection")
data = conn.recv(1024)
if not data:
return 'Nodata'
elif (data == b'*IDN?\n'):
print('SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
conn.sendall(b'SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
return 'IDN'
else:
conn.sendall(data)
return 'Data'
def echo_server(c_conn,host=HOST,port=PORT):
# this server simulates the AWG command protocol, simply echoing the command back except for IDN?
p = mp.current_process()
print('Starting echo server:', p.name, p.pid)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
try:
while True:
print("Waiting for connection...")
c_conn.send('waiting for connection...')
conn, addr = s.accept()
handle_connection(conn,addr)
c_conn.send('serving client...')
finally:
conn.close()
c_conn.send('done')
time.sleep(2)
print('Exiting echo server:', p.name, p.pid)
sys.stdout.flush()
def test_echo_server():
print("entering client part")
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as mysock:
mysock.connect((HOST,PORT))
mysock.sendall('test\n'.encode())
data = mysock.recv(1024)
print('received:',repr(data))
if __name__ == '__main__':
parent_conn, child_conn = mp.Pipe()
echo_demon = mp.Process(name='echo', target=echo_server(child_conn, ))
echo_demon.daemon = True
echo_demon.start()
time.sleep(1)
echo_demon.join(1)
test_echo_server()
if parent_conn.poll(1):
print(parent_conn.recv())
else:
print('Waiting for echo server')
我设法用我在Romano、Baka和Phillips的《Python入门》一书中找到的一些代码片段解决了我自己的问题。以下是服务器的代码:
下面是测试文件的代码,它在一个单独的进程中运行此服务器,还有几个简单的测试:
将主机设置为环回IP,将端口设置为>;1024
相关问题 更多 >
编程相关推荐