多处理队列.get()挂起

2024-03-28 11:56:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试实现基本的多处理,我遇到了一个问题。python脚本附在下面。

import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue(10)
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        item = append_queue.get()
        database.append(item)
        print("Appended to database in %.4f seconds" % database.append_time)
        append_queue.task_done()
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    append_queue.join()
    #append_queue_process.join()
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

AnalyzeFrequency分析文件中单词的频率,get_results()返回所述单词和频率的排序列表。这个单子很大,大概有一万件。

然后将此列表传递给add_to_append_queue方法,该方法将其添加到队列中。process_append_队列逐个获取项目并将频率添加到“数据库”中。这个操作比main()中的实际分析要长一点,因此我尝试对这个方法使用单独的进程。当我尝试对线程模块执行此操作时,一切都工作得很好,没有错误。当我尝试使用Process时,脚本挂在item = append_queue.get()

有人能解释一下这里发生了什么事吗,也许能帮我找个解决办法?

感谢所有答案!

更新

泡菜错误是我的错,只是打字错误。现在我在多处理中使用Queue类,但是append_Queue.get()方法仍然挂起。 新代码

import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue()
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        database.append(append_queue.get())
        print("Appended to database in %.4f seconds" % database.append_time)
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    #append_queue.join()
    #append_queue_process.join()
    print str(append_queue.qsize())
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

更新2

这是数据库代码:

class FrequencyStore:

    def __init__(self):
        self.sorter = Sorter()
        self.db = {}
        self.load_time = -1
        self.save_time = -1
        self.append_time = -1
        self.sort_time = -1

    def load_db(self):
        start_time = time.time()

        try:
            file = open("results.txt", 'r')
        except:
            raise IOError

        self.db = {}
        for line in file:
            word, count = line.strip("\n").split("=")
            self.db[word] = int(count)
        file.close()

        self.load_time = time.time() - start_time

    def save_db(self):
        start_time = time.time()

        _db = []
        for key in self.db:
            _db.append([key, self.db[key]])
        _db = self.sort(_db)

        try:
            file = open("results.txt", 'w')
        except:
            raise IOError

        file.truncate(0)
        for x in _db:
            file.write(x[0] + "=" + str(x[1]) + "\n")
        file.close()

        self.save_time = time.time() - start_time

    def create_sorted_db(self):
        _temp_db = []
        for key in self.db:
            _temp_db.append([key, self.db[key]])
        _temp_db = self.sort(_temp_db)
        _temp_db.reverse()
        return _temp_db

    def get_db(self):
        return self.db

    def sort(self, _list):
        start_time = time.time()

        _list = self.sorter.mergesort(_list)
        _list.reverse()

        self.sort_time = time.time() - start_time
        return _list

    def append(self, _list):
        start_time = time.time()

        for x in _list:
            if x[0] not in self.db:
                self.db[x[0]] = x[1]
            else:
                self.db[x[0]] += x[1]

        self.append_time = time.time() - start_time

Tags: inselftruedbgettimequeuedef
2条回答

评论建议您尝试在Windows上运行此操作。正如我在评论中所说

If you're running this on Windows, it can't work - Windows doesn't have fork(), so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue in main(), and pass it as an argument to the worker function.

这里详细说明了您需要做些什么才能使其可移植,尽管我删除了所有数据库内容,因为它与您目前描述的问题无关。我还去掉了摆弄,因为那通常是一种懒散的方式,避免把东西干干净净地关上,而且通常不会回来咬你:

def process_append_queue(append_queue):
    while True:
        x = append_queue.get()
        if x is None:
            break
        print("processed %d" % x)
    print("worker done")

def main():
    import multiprocessing as mp

    append_queue = mp.Queue(10)
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
    append_queue_process.start()
    for i in range(100):
        append_queue.put(i)
    append_queue.put(None)  # tell worker we're done
    append_queue_process.join()

if __name__=="__main__":
    main()

输出是“显而易见”的东西:

processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done

注意:由于Windows没有(不能)fork(),工作进程不可能继承Windows上的任何Python对象。每个进程从一开始就运行整个程序。这就是为什么原始程序不能工作:每个进程都创建了自己的Queue,与另一个进程中的Queue完全无关。在上面所示的方法中,只有主进程创建一个Queue,主进程将它(作为参数)传递给工作进程。

queue.Queue是线程安全的,但不能跨进程工作。不过,这很容易解决。而不是:

from multiprocessing import Process
from Queue import Queue

你想要:

from multiprocessing import Process, Queue

相关问题 更多 >