Python 线程与共享变量
我该如何在Python中更新不同线程之间的共享变量呢?
假设我有5个线程在处理一个队列(Queue.Queue())。当队列处理完毕后,我想进行另一个操作,但我希望这个操作只执行一次。
我能否在这些线程之间共享和更新一个变量?也就是说,当队列为空(Queue.empty()为真)时,这个事件就会触发,但如果有一个线程正在执行这个操作,我不希望其他线程也去执行,因为那样会导致结果错误。
补充说明
我有一个队列,它反映了文件系统中的文件。线程们将文件上传到一个网站,而在每个线程上传文件的同时,它会更新我从文件中得到的一组关键词。
当队列为空时,我需要联系网站,告诉它更新关键词的计数。目前每个线程都在执行这个操作,这样我就会收到每个线程的更新,这样不好。我也尝试过清空这个集合,但没有成功。
keywordset = set() hkeywordset = set() def worker(): while queue: if queue.empty(): if len(keywordset) or len(hkeywordset): # as soon as the queue is empty we send the keywords and hkeywords to the # imageapp so it can start updating apiurl = update_cols_url if apiurl[-1] != '/': apiurl = apiurl+'/' try: keywords = [] data = dict(keywords=list(keywordset), hkeywords=list(hkeywordset)) post = dict(data=simplejson.dumps(data)) post = urllib.urlencode(post) urllib2.urlopen(apiurl, post) hkeywordset.clear() keywordset.clear() print 'sent keywords and hkeywords to imageapp...' except Exception, e: print e # we get the task form the Queue and process the file based on the action task = queue.get() print str(task) try: reindex = task['reindex'] except: reindex = False data = updater.process_file(task['filename'], task['action'], task['fnamechange'], reindex) # we parse the images keywords and hkeywords and add them to the sets above for later # processing try: for keyword in data['keywords']: keywordset.add(keyword) except: pass try: for hkw in data['hkeywords']: hkeywordset.add(hkw) except:pass queue.task_done() for i in range(num_worker_threads): t = threading.Thread(target=worker) t.daemon = True t.start() while 1: line = raw_input('type \'q\' to stop filewatcher... or \'qq\' to force quit...\n').strip()
这基本上就是我尝试的内容。但当然,queue.empty()这一部分会被执行多次,次数和线程数一样。
3 个回答
0
可以再创建一个队列,把这个事件放到第二个队列里,等第一个队列空了再处理。
或者可以为这个事件专门开一个线程来处理。
0
为什么你不能直接把最后一步加到队列里呢?
0
如果你在使用队列来运行你的线程(线程池),那么你就能确保不会出现竞争条件(线程安全)。因为队列会按顺序运行你的线程,所以我认为你可以在多个线程之间共享一个变量,并且可以放心,这个变量不会出现竞争条件。
编辑:这里有一些类似你想做的事情,希望这能帮你解答这个问题 :):
import Queue
import threading
import ftplib
import os
class SendFileThread(threading.Thread):
""" Thread that will handle sending files to the FTP server"""
# Make set of keywords a class variable.
Keywords = set()
def __init__(self, queue, conn):
self.conn = conn
self.queue = queue
threading.Thread.__init__(self)
def run(self):
while True:
# Grabs file from queue.
file_name = self.queue.get()
# Send file to FTP server.
f=open(file_name,'rb')
self.conn.storbinary('STOR '+os.path.basename(file_name),f)
# Suppose that this keywords are in the first line.
# Update the set of keywords.
SendFileThread.Keywords.update(f.readline().split(" ")))
# Signals to queue job is done.
self.queue.task_done()
def main():
# Files to send.
files = os.listdir('/tosend')
queue = Queue.Queue()
# Connect to the FTP server.
conn = ftplib.FTP('ftp_uri')
conn.login()
# Create 5 threads that will handle file to send.
for i in range(5):
t = SendFileThread(queue, conn)
t.start()
# Fill the queue with files to be send.
for file in files:
queue.put(file)
# Wait until or thread are finish
queue.join()
# Send the keywords to the FTP server.
# I didn't understand well the part update keywords count,
# how this count is stored ...
# Here i will just send the keywords to the FTP server.
with open("keywords", "w") as keywords_file
keywords_file.write(";".join(SendFileThread.Keywords))
conn.storbinary('STOR '+os.path.basename("keywords"),
keywords_file)
conn.close()
if __name__ == '__main__':
main()