如何使用线程并发抓取多个文件块并原子性写入磁盘?

0 投票
1 回答
733 浏览
提问于 2025-04-18 13:58

我遇到了一个棘手的问题,搞不清楚该怎么解决。今天尝试了很多次,发了很多帖子,这个问题不是重复的,因为我需要弄清楚如何使用多个线程同时从服务器获取多个数据块,但在写入磁盘时要确保操作是原子的,也就是说要锁定文件写入操作,确保只有一个线程可以访问;而其他线程则需要等这个锁被释放。

import argparse,logging, Queue, os, requests, signal, sys, time, threading
import utils as _fdUtils

DESKTOP_PATH = os.path.expanduser("~/Desktop")

appName = 'FileDownloader'

logFile = os.path.join(DESKTOP_PATH, '%s.log' % appName)

_log = _fdUtils.fdLogger(appName, logFile, logging.DEBUG, logging.DEBUG, console_level=logging.DEBUG)

queue = Queue.Queue()
STOP_REQUEST = threading.Event()
maxSplits = threading.BoundedSemaphore(3)
threadLimiter = threading.BoundedSemaphore(5)
lock = threading.Lock()

更新:1

def _grabAndWriteToDisk(url, saveTo, first=None, queue=None, mode='wb', irange=None):
    """ Function to download file..

        Args:
            url(str): url of file to download
            saveTo(str): path where to save file
            first(int): starting byte of the range
            queue(Queue.Queue): queue object to set status for file download
            mode(str): mode of file to be downloaded
            irange(str): range of byte to download

    """
    fileName = url.split('/')[-1]
    filePath = os.path.join(saveTo, fileName)
    fileSize = int(_fdUtils.getUrlSizeInBytes(url))
    downloadedFileSize = 0 if not first else first
    block_sz = 8192
    irange = irange if irange else '0-%s' % fileSize
    # print mode
    resp = requests.get(url,  headers={'Range': 'bytes=%s' % irange}, stream=True)

    fileBuffer = resp.raw.read()
    with open(filePath, mode) as fd:
        downloadedFileSize += len(fileBuffer)
        fd.write(fileBuffer)
        status = r"%10d  [%3.2f%%]" % (downloadedFileSize, downloadedFileSize * 100. / fileSize)
        status = status + chr(8)*(len(status)+1)
        sys.stdout.write('%s\r' % status)
        time.sleep(.05)
        sys.stdout.flush()
        if downloadedFileSize == fileSize:
            STOP_REQUEST.set()
            if queue:
                queue.task_done()
            _log.info("Download Completed %s%% for file %s, saved to %s",
                downloadedFileSize * 100. / fileSize, fileName, saveTo)


class ThreadedFetch(threading.Thread):
    """ docstring for ThreadedFetch
    """
    def __init__(self, queue):
        super(ThreadedFetch, self).__init__()
        self.queue = queue
        self.lock = threading.Lock()

    def run(self):
        threadLimiter.acquire()
        try:
            items = self.queue.get()
            url = items[0]
            saveTo = DESKTOP_PATH if not items[1] else items[1]
            split = items[-1]

            # grab split chunks in separate thread.
            if split > 1:
                maxSplits.acquire()
                try:
                    sizeInBytes = int(_fdUtils.getUrlSizeInBytes(url))
                    byteRanges = _fdUtils.getRange(sizeInBytes, split)
                    mode = 'wb'
                    th = threading.Thread(target=_grabAndWriteToDisk, args=(url, saveTo, first, self.queue, mode, _range))
                    _log.info("Pulling for range %s using %s" , _range, th.getName())
                    th.start()
                    # _grabAndWriteToDisk(url, saveTo, first, self.queue, mode, _range)
                    mode = 'a'
                finally:
                    maxSplits.release()

            else:
                while not STOP_REQUEST.isSet():
                    self.setName("primary_%s" % url.split('/')[-1])
                    # if downlaod whole file in single chunk no need
                    # to start a new thread, so directly download here.
                    _grabAndWriteToDisk(url, saveTo, 0, self.queue)
        finally:
            threadLimiter.release()


def main(appName, flag='with'):

    args = _fdUtils.getParser()
    urls_saveTo = {}

    if flag == 'with':
        _fdUtils.Watcher()
    elif flag != 'without':
        _log.info('unrecognized flag: %s', flag)
        sys.exit()

    # spawn a pool of threads, and pass them queue instance
    # each url will be downloaded concurrently
    for i in xrange(len(args.urls)):
        t = ThreadedFetch(queue)
        t.daemon = True
        t.start()

    split = 4
    try:
        for url in args.urls:
            # TODO: put split as value of url as tuple with saveTo
            urls_saveTo[url] = args.saveTo

        # populate queue with data 
        for url, saveTo in urls_saveTo.iteritems():
            queue.put((url, saveTo, split))

        # wait on the queue until everything has been processed
        queue.join()
        _log.info('Finsihed all dowonloads.')
    except (KeyboardInterrupt, SystemExit):
        _log.critical('! Received keyboard interrupt, quitting threads.')

我希望多个线程能够抓取每个数据块,但在终端里我看到的是同一个线程在抓取每个数据块的范围:

信息 - 使用线程1抓取范围0-25583

信息 - 使用线程1抓取范围25584-51166

信息 - 使用线程1抓取范围51167-76748

信息 - 使用线程1抓取范围76749-102331

信息 - 文件607800main_kepler1200_1600-1200.jpg下载完成100.0%

但我期待的是:

信息 - 使用线程1抓取范围0-25583

信息 - 使用线程2抓取范围25584-51166

信息 - 使用线程3抓取范围51167-76748

信息 - 使用线程4抓取范围76749-102331

请不要标记为重复问题,没搞清楚情况之前……如果有更好的方法来实现我想做的事情,请推荐一下。

谢谢!

1 个回答

0

如果你想让下载和写入文件同时进行,并且确保写入文件的操作是原子的(也就是不会被打断),那么就不要把lock放在下载和写入文件的整个过程上,而只放在写入的部分。

因为每个线程都有自己的文件对象来写入,所以我觉得没有必要去锁定这个访问。你需要确保每个线程写入到正确的位置,所以在写入数据之前,你需要先调用seek()来定位文件的位置。否则,你就得按照文件的顺序来写入数据,这样会让事情变得更加复杂。

撰写回答