我正在尝试实现基本的多处理,我遇到了一个问题。python脚本附在下面。
import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue(10)
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
item = append_queue.get()
database.append(item)
print("Appended to database in %.4f seconds" % database.append_time)
append_queue.task_done()
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
append_queue.join()
#append_queue_process.join()
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
AnalyzeFrequency分析文件中单词的频率,get_results()
返回所述单词和频率的排序列表。这个单子很大,大概有一万件。
然后将此列表传递给add_to_append_queue
方法,该方法将其添加到队列中。process_append_队列逐个获取项目并将频率添加到“数据库”中。这个操作比main()
中的实际分析要长一点,因此我尝试对这个方法使用单独的进程。当我尝试对线程模块执行此操作时,一切都工作得很好,没有错误。当我尝试使用Process时,脚本挂在item = append_queue.get()
。
有人能解释一下这里发生了什么事吗,也许能帮我找个解决办法?
感谢所有答案!
更新
泡菜错误是我的错,只是打字错误。现在我在多处理中使用Queue类,但是append_Queue.get()方法仍然挂起。 新代码
import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue()
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
database.append(append_queue.get())
print("Appended to database in %.4f seconds" % database.append_time)
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
#append_queue.join()
#append_queue_process.join()
print str(append_queue.qsize())
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
更新2
这是数据库代码:
class FrequencyStore:
def __init__(self):
self.sorter = Sorter()
self.db = {}
self.load_time = -1
self.save_time = -1
self.append_time = -1
self.sort_time = -1
def load_db(self):
start_time = time.time()
try:
file = open("results.txt", 'r')
except:
raise IOError
self.db = {}
for line in file:
word, count = line.strip("\n").split("=")
self.db[word] = int(count)
file.close()
self.load_time = time.time() - start_time
def save_db(self):
start_time = time.time()
_db = []
for key in self.db:
_db.append([key, self.db[key]])
_db = self.sort(_db)
try:
file = open("results.txt", 'w')
except:
raise IOError
file.truncate(0)
for x in _db:
file.write(x[0] + "=" + str(x[1]) + "\n")
file.close()
self.save_time = time.time() - start_time
def create_sorted_db(self):
_temp_db = []
for key in self.db:
_temp_db.append([key, self.db[key]])
_temp_db = self.sort(_temp_db)
_temp_db.reverse()
return _temp_db
def get_db(self):
return self.db
def sort(self, _list):
start_time = time.time()
_list = self.sorter.mergesort(_list)
_list.reverse()
self.sort_time = time.time() - start_time
return _list
def append(self, _list):
start_time = time.time()
for x in _list:
if x[0] not in self.db:
self.db[x[0]] = x[1]
else:
self.db[x[0]] += x[1]
self.append_time = time.time() - start_time
评论建议您尝试在Windows上运行此操作。正如我在评论中所说
这里详细说明了您需要做些什么才能使其可移植,尽管我删除了所有数据库内容,因为它与您目前描述的问题无关。我还去掉了摆弄,因为那通常是一种懒散的方式,避免把东西干干净净地关上,而且通常不会回来咬你:
输出是“显而易见”的东西:
注意:由于Windows没有(不能)
fork()
,工作进程不可能继承Windows上的任何Python对象。每个进程从一开始就运行整个程序。这就是为什么原始程序不能工作:每个进程都创建了自己的Queue
,与另一个进程中的Queue
完全无关。在上面所示的方法中,只有主进程创建一个Queue
,主进程将它(作为参数)传递给工作进程。queue.Queue
是线程安全的,但不能跨进程工作。不过,这很容易解决。而不是:你想要:
相关问题 更多 >
编程相关推荐