在一个Flask/mod_wsgi应用程序中,在所有mod廕wsgi进程之间共享全局数据

2024-04-27 11:10:29 发布

您现在位置:Python中文网/ 问答频道 /正文

代码说明:

我正在尝试从我的Flask web应用程序启动和管理一些进程。我希望能够启动一个进程并能够终止/终止它。在

在我的例子中,测试脚本.sh只是睡了10秒。在

我的类processManager实现了Borg设计模式,试图解决我的问题,但它没有改变任何东西。(之前,我的进程、锁和线程变量只是全局变量,而processManager根本不是一个类。)

“进程”属性是所有已启动进程的列表。在

addProcess启动新进程,将其添加到进程中。它检查轮询线程是否正在运行,如果没有,则启动它。在

轮询线程轮询进程列表中的所有进程,并检查它们的退出状态是否已更改。如果所有进程都已完成,线程将停止。在

stopproces循环处理进程以找到正确的进程并终止它。在

class Borg:
  _shared_state = {}
  def __init__(self):
    self.__dict__ = self._shared_state
# Pokes the processes to see their exit status. Stops when there is no running thread left.
class processManager(Borg):
class pollProcesses (threading.Thread):
    def __init__(self, pManager):
        threading.Thread.__init__(self)
        self.pManager = pManager
    def run(self):
        while True:
            #poll exit codes:
            #None   Running
            #0      Finished nicely 
            #-n     terminated by signal n
            time.sleep(1)
            pprint.pprint("-----------")
            self.pManager.lock.acquire()
            stop = True
            for p in self.pManager.processes:
                status = p.poll()
                pprint.pprint(str(p.jobid)+" "+str(p.pid)+" "+str(status))
                if status is None:
                    stop = False
                                    #else log process status somewhere

            self.pManager.lock.release()
            if stop:
                pprint.pprint("-----------")
                pprint.pprint("No process running, stopping scan.")
                break;

def __init__(self):
    Borg.__init__(self)
    pprint.pprint("New instance!")
    if not hasattr(self, "processes"):
        pprint.pprint("FIRST instance!")
        self.processes = []
    if not hasattr(self, "lock"):
        self.lock = threading.Lock()
    if not hasattr(self, "thread"):
        self.thread = None

def addProcess(self, job):
    pprint.pprint("-----------")
    path = os.path.realpath(__file__)

    pprint.pprint("Adding new process")
    p = Popen(["/path/to/testscript.sh"], shell=True)
    p.jobid = job['id']

    # Lock the processes list before adding data
    self.lock.acquire()
    self.processes.append(p)

    #If thread is finished, start it up
    if self.thread is None or not self.thread.isAlive():
        pprint.pprint("Starting thread")
        self.thread = None
        self.thread = self.pollProcesses(self)
        self.thread.start()
        pprint.pprint(self.thread)
    self.lock.release()
    return

def stopProcess(self, jobid):
    pprint.pprint("STOP job"+str(jobid))
    self.lock.acquire()
    pprint.pprint(self.thread)
    pprint.pprint("Jobs in processes:")
    for p in self.processes:
        pprint.pprint(p.jobid)
        if p.jobid == jobid:
            p.terminate()
    self.lock.release()
    return

在flask应用程序中,我基本上是这样做的:

^{pr2}$

和停止工作很相似。在

现在,如果我启动一个作业并试图从Flask应用程序中停止它,有时进程列表会创建一个全新的Borg/processManager(打印“FIRST instance!”)在

从那时起,一切都变得不可预测:

即使来自两个线程和两个不同的进程列表,我的进程状态仍在更新。这不是我的目标,但它奏效了。在

我的问题是:

但是如果我想停止一个进程,stopProcess函数可能是“错误的”,因为现在有多个processmanager具有不同的进程列表,我无法知道调用的stopProcess函数是否访问了我希望停止的特定进程。在

我认为多线程管理器的运行机制可能完全不同。在

我希望processManager的进程列表、锁和线程都是相同的,不管是哪个mod\wsgi进程访问它。在

我对python还相当陌生,所以我的问题可能完全是另外一回事。感谢任何帮助。在

请记住,这是代码简化,它仍然复制问题,但它真的没有多大作用。在


Tags: selflock列表if进程initdef线程
1条回答
网友
1楼 · 发布于 2024-04-27 11:10:29

一般来说,从web应用程序创建子进程不是一个好主意,因为根据所使用的web服务器,它可能会导致各种问题。由于使用多进程web服务器配置、从父级继承奇数信号掩码等,可能会出现问题

如果您想创建作业来执行某些任务,那么最好使用专门构建的任务队列系统,如Celery、Redis Queue(RQ)、gearman或类似系统。在

相关问题 更多 >