Python 子进程在守护进程服务器中死锁

1 投票
2 回答
757 浏览
提问于 2025-04-17 05:08

我正在尝试为dar设置一个远程备份服务器,大致是这个方向。如果可以的话,我真的希望能用python来处理所有的管道,但我已经问了一个单独的问题关于这个。

我在使用subprocess.Popen(cmd, shell=True)中的netcat时,成功做了一个增量备份,就像dar网站上的例子那样。现在只有两个问题:

  1. 我不知道怎么动态分配端口号
  2. 如果我在后台执行服务器,它就会失败。为什么呢?

更新:这似乎和netcat无关;即使不使用netcat,它也会卡住。

这是我的代码:

from socket import socket, AF_INET, SOCK_STREAM
import os, sys
import SocketServer
import subprocess

class DarHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        print('entering handler')
        data = self.request.recv(1024).strip()
        print('got: ' + data)
        if data == 'xform':
            cmd1 = 'nc -dl 41201 | dar_slave archives/remotehost | nc -l 41202'
            print(cmd1)
            cmd2 = 'nc -dl 41200 | dar_xform -s 10k - archives/diffbackup'
            print(cmd2)
            proc1 = subprocess.Popen(cmd1, shell=True)
            proc2 = subprocess.Popen(cmd2, shell=True)
            print('sending port number')
            self.request.send('41200')
            print('waiting')
            result = str(proc1.wait())
            print('nc-dar_slave-nc returned ' + result)
            result = str(proc2.wait())
            print('nc-dar_xform returned ' + result)
        else:
            result = 'bad request'
        self.request.send(result)
        print('send result, exiting handler')

myaddress = ('localhost', 18010)
def server():
    server = SocketServer.TCPServer(myaddress, DarHandler)
    print('listening')
    server.serve_forever()

def client():
    sock = socket(AF_INET, SOCK_STREAM)
    print('connecting')
    sock.connect(('localhost', 18010))
    print('connected, sending request')
    sock.send('xform')
    print('waiting for response')
    port = sock.recv(1024)
    print('got: ' + port)
    try:
        os.unlink('toslave')
    except:
        pass
    os.mkfifo('toslave')
    cmd1 = 'nc -w3 localhost 41201 < toslave'
    cmd2 = 'nc -w3 localhost 41202 | dar -B config/test.dcf -A - -o toslave -c - | nc -w3 localhost ' + port
    print(cmd2)
    proc1 = subprocess.Popen(cmd1, shell=True)
    proc2 = subprocess.Popen(cmd2, shell=True)
    print('waiting')
    result2 = proc2.wait()
    result1 = proc1.wait()
    print('nc<fifo returned: ' + str(result1))
    print('nc-dar-nc returned: ' + str(result2))
    result = sock.recv(1024)
    print('received: ' + result)
    sock.close()
    print('socket closed, exiting')

if __name__ == "__main__":
    if sys.argv[1].startswith('serv'):
        server()
    else:
        client()

这是服务器上发生的事情:

$ python clientserver.py serve &
[1] 4651
$ listening
entering handler
got: xform
nc -dl 41201 | dar_slave archives/remotehost | nc -l 41202
nc -dl 41200 | dar_xform -s 10k - archives/diffbackup
sending port number
waiting

[1]+  Stopped                 python clientserver.py serve

这是客户端上发生的事情:

$ python clientserver.py client
connecting
connected, sending request
waiting for response
got: 41200
nc -w3 localhost 41202 | dar -B config/test.dcf -A - -o toslave -c - | nc -w3 localhost 41200
waiting
FATAL error, aborting operation
Corrupted data read on pipe
nc<fifo returned: 1
nc-dar-nc returned: 1

客户端也会卡住,我不得不用键盘中断来强制结束它。

2 个回答

1

我建议放弃现在的做法,重新开始。这个解决方案太复杂,而且不太好用。其实在这个领域有很多现成的解决方案:

如果你想简单点,可以试试Fwbackups;如果你想要更强大的功能,可以用rsync加ssh。

1
  1. 使用 Popen.communicate() 而不是 Popen.wait()

    根据 Python的文档wait() 的使用有个警告:

    警告:如果子进程产生的输出太多,导致它阻塞在等待操作系统的缓冲区接受更多数据时,就会出现死锁。为了避免这种情况,应该使用 communicate()

  2. 如果 dar 及其相关的可执行文件不是在交互模式下运行,应该加上 -Q 选项。

  3. 在同步多个进程时,确保先在“最弱的环节”上调用 communicate():也就是说,先在 dar_slave 上调用,然后是 dar_xform,最后是 dar 再到 cat。这个顺序在问题中已经做得很好,但值得注意。

  4. 清理共享资源。客户端进程仍然保持着一个套接字,而 dar_xform 还在从中读取数据。在 dar 和其他进程完成后,如果不关闭这个套接字就尝试在这个套接字上发送或接收数据,会导致死锁。

这里有一个有效的示例,它没有使用 shell=True 或 netcat。这样做的一个好处是我可以动态分配次要端口,因此可以同时为多个备份客户端提供服务。

撰写回答