如何使用线程并发抓取多个文件块并原子性写入磁盘?
我遇到了一个棘手的问题,搞不清楚该怎么解决。今天尝试了很多次,发了很多帖子,这个问题不是重复的,因为我需要弄清楚如何使用多个线程同时从服务器获取多个数据块,但在写入磁盘时要确保操作是原子的,也就是说要锁定文件写入操作,确保只有一个线程可以访问;而其他线程则需要等这个锁被释放。
import argparse,logging, Queue, os, requests, signal, sys, time, threading
import utils as _fdUtils
DESKTOP_PATH = os.path.expanduser("~/Desktop")
appName = 'FileDownloader'
logFile = os.path.join(DESKTOP_PATH, '%s.log' % appName)
_log = _fdUtils.fdLogger(appName, logFile, logging.DEBUG, logging.DEBUG, console_level=logging.DEBUG)
queue = Queue.Queue()
STOP_REQUEST = threading.Event()
maxSplits = threading.BoundedSemaphore(3)
threadLimiter = threading.BoundedSemaphore(5)
lock = threading.Lock()
更新:1
def _grabAndWriteToDisk(url, saveTo, first=None, queue=None, mode='wb', irange=None):
""" Function to download file..
Args:
url(str): url of file to download
saveTo(str): path where to save file
first(int): starting byte of the range
queue(Queue.Queue): queue object to set status for file download
mode(str): mode of file to be downloaded
irange(str): range of byte to download
"""
fileName = url.split('/')[-1]
filePath = os.path.join(saveTo, fileName)
fileSize = int(_fdUtils.getUrlSizeInBytes(url))
downloadedFileSize = 0 if not first else first
block_sz = 8192
irange = irange if irange else '0-%s' % fileSize
# print mode
resp = requests.get(url, headers={'Range': 'bytes=%s' % irange}, stream=True)
fileBuffer = resp.raw.read()
with open(filePath, mode) as fd:
downloadedFileSize += len(fileBuffer)
fd.write(fileBuffer)
status = r"%10d [%3.2f%%]" % (downloadedFileSize, downloadedFileSize * 100. / fileSize)
status = status + chr(8)*(len(status)+1)
sys.stdout.write('%s\r' % status)
time.sleep(.05)
sys.stdout.flush()
if downloadedFileSize == fileSize:
STOP_REQUEST.set()
if queue:
queue.task_done()
_log.info("Download Completed %s%% for file %s, saved to %s",
downloadedFileSize * 100. / fileSize, fileName, saveTo)
class ThreadedFetch(threading.Thread):
""" docstring for ThreadedFetch
"""
def __init__(self, queue):
super(ThreadedFetch, self).__init__()
self.queue = queue
self.lock = threading.Lock()
def run(self):
threadLimiter.acquire()
try:
items = self.queue.get()
url = items[0]
saveTo = DESKTOP_PATH if not items[1] else items[1]
split = items[-1]
# grab split chunks in separate thread.
if split > 1:
maxSplits.acquire()
try:
sizeInBytes = int(_fdUtils.getUrlSizeInBytes(url))
byteRanges = _fdUtils.getRange(sizeInBytes, split)
mode = 'wb'
th = threading.Thread(target=_grabAndWriteToDisk, args=(url, saveTo, first, self.queue, mode, _range))
_log.info("Pulling for range %s using %s" , _range, th.getName())
th.start()
# _grabAndWriteToDisk(url, saveTo, first, self.queue, mode, _range)
mode = 'a'
finally:
maxSplits.release()
else:
while not STOP_REQUEST.isSet():
self.setName("primary_%s" % url.split('/')[-1])
# if downlaod whole file in single chunk no need
# to start a new thread, so directly download here.
_grabAndWriteToDisk(url, saveTo, 0, self.queue)
finally:
threadLimiter.release()
def main(appName, flag='with'):
args = _fdUtils.getParser()
urls_saveTo = {}
if flag == 'with':
_fdUtils.Watcher()
elif flag != 'without':
_log.info('unrecognized flag: %s', flag)
sys.exit()
# spawn a pool of threads, and pass them queue instance
# each url will be downloaded concurrently
for i in xrange(len(args.urls)):
t = ThreadedFetch(queue)
t.daemon = True
t.start()
split = 4
try:
for url in args.urls:
# TODO: put split as value of url as tuple with saveTo
urls_saveTo[url] = args.saveTo
# populate queue with data
for url, saveTo in urls_saveTo.iteritems():
queue.put((url, saveTo, split))
# wait on the queue until everything has been processed
queue.join()
_log.info('Finsihed all dowonloads.')
except (KeyboardInterrupt, SystemExit):
_log.critical('! Received keyboard interrupt, quitting threads.')
我希望多个线程能够抓取每个数据块,但在终端里我看到的是同一个线程在抓取每个数据块的范围:
信息 - 使用线程1抓取范围0-25583
信息 - 使用线程1抓取范围25584-51166
信息 - 使用线程1抓取范围51167-76748
信息 - 使用线程1抓取范围76749-102331
信息 - 文件607800main_kepler1200_1600-1200.jpg下载完成100.0%
但我期待的是:
信息 - 使用线程1抓取范围0-25583
信息 - 使用线程2抓取范围25584-51166
信息 - 使用线程3抓取范围51167-76748
信息 - 使用线程4抓取范围76749-102331
请不要标记为重复问题,没搞清楚情况之前……如果有更好的方法来实现我想做的事情,请推荐一下。
谢谢!
1 个回答
0
如果你想让下载和写入文件同时进行,并且确保写入文件的操作是原子的(也就是不会被打断),那么就不要把lock
放在下载和写入文件的整个过程上,而只放在写入的部分。
因为每个线程都有自己的文件对象来写入,所以我觉得没有必要去锁定这个访问。你需要确保每个线程写入到正确的位置,所以在写入数据之前,你需要先调用seek()
来定位文件的位置。否则,你就得按照文件的顺序来写入数据,这样会让事情变得更加复杂。