在Django中,如何调用启动时间慢的子进程

4 投票
3 回答
3492 浏览
提问于 2025-04-15 14:21

假设你在Linux上运行Django,并且你有一个视图(view),你希望这个视图能返回一个叫cmd的子进程处理的数据,这个子进程会处理一个视图创建的文件,比如这样:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

现在,假设cmd启动得很慢,但运行得很快,而且它本身没有守护进程模式。我想改善这个视图的响应时间。

我希望通过启动多个cmd的实例在一个工作池中,让它们等待输入,然后让call_process请求其中一个工作池的进程来处理数据,从而让整个系统运行得更快。

这实际上分为两个部分:

第一部分是一个函数,它调用cmd,并且cmd会等待输入。这可以通过管道来实现,也就是:

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

第二部分是一组在后台运行的工作者,它们在等待数据。也就是说,我们想扩展上面的内容,让子进程已经在运行,比如当Django实例初始化时,或者当第一次调用call_process时,创建一组这样的工作者。

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

应该在某个地方初始化工作者,像这样:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

现在,我上面提到的内容变成了这样:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

现在,有几个问题:

  1. 这样做可行吗?(我刚刚在StackOverflow上随便写的,所以肯定有问题,但从概念上讲,这样做可以吗)

  2. 需要注意哪些问题?

  3. 有没有更好的替代方案?比如线程能否同样有效(这是Debian Lenny Linux)?有没有处理并行进程工作池的库?

  4. 与Django的交互中,有哪些我需要注意的地方?

感谢你的阅读!希望你觉得这个问题和我一样有趣。

布莱恩

3 个回答

0

你可以试试用 python-daemon 或者它的后续版本 grizzled 来“守护”子进程的调用。

3

Issy已经提到过Celery,但由于评论区不太适合放代码示例,我就以回答的形式来说明。

你应该尝试用Celery进行同步操作,并使用AMQP结果存储。这样你可以把实际的执行过程分配到另一个进程,甚至是另一台机器上。在Celery中进行同步执行是很简单的,比如:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

AMQP结果存储可以非常快速地返回结果,但这个功能目前只在当前的开发版本中可用(正在冻结代码,准备发布0.8.0版本)。

3

可能你会觉得我在推销这个产品,因为这是我第二次推荐它了。

但看起来你需要一个消息队列服务,特别是一个分布式的消息队列。

它的工作原理是这样的:

  1. 你的Django应用请求一个命令(CMD)
  2. 这个命令会被放到一个队列里
  3. 命令会被分发给几个工作者
  4. 工作者执行命令,并把结果返回给上游

大部分代码已经存在,你不需要自己去搭建一个系统。

可以看看Celery,它最初就是为Django开发的。

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

撰写回答