从S3下载透明背景文件

2 投票
1 回答
826 浏览
提问于 2025-04-17 11:07

我正在尝试让一个Python应用程序访问存储在S3中的一个几GB的文件的不同位置。我想创建一个可以替代文件的对象,这个对象可以智能地在一个单独的线程中从S3下载数据块,以满足seek()和read()的请求。

有没有简单的数据结构可以用来存储文件的任意区间呢?

这个结构必须支持O(log n)的查找和O(n)的插入(n是数据块的数量,而不是文件的大小)。它还需要能够快速查询“空隙”,这样加载线程就能有效找到下一个应该下载的数据块。目前像SortedCollection这样的东西不支持这个功能,这意味着我可能需要在一个新的容器中手动使用bisect_*。

以下是一个使用示例:

import os
import time
from bigfile import BigFile

chunksize = (2**20)*64 # 64MB

bf = BigFile('my_bucket', 'key_name', chunksize=chunksize)

# read from beginning (blocks until first chunk arrives)
bf.read(100)

# continues downloading subsequent chunks in background
time.sleep(10)

# seek into second chunk and read (should not block)
bf.seek(blocksize, os.SEEK_SET)
bf.read(100)

# seek far into the file
bf.seek(blocksize*100 + 54, os.SEEK_SET) # triggers chunk download starting at new location
bf.read(100) # blocks until chunk arrives

# seek back to beginning (should not block, already have this chunk)
bf.seek(0, os.SEEK_SET)
bf.read(100)

# read entire rest of file (blocks until all chunks are downloaded)
bf.read()

1 个回答

1

这个实现使用了固定大小和偏移量的块。如果这些块非常大,而网络又很慢,那么读取数据可能会被阻塞很长时间(想象一下,如果从一个块的最后一个字节开始读取,就得等整个前一个块加载完,再加载下一个块)。

理想情况下,我们可以使用任意大小和位置的块,这样就能优化加载,从准确的读取点开始。但是下面的方案已经是一个不错的80%的解决办法了。

import boto
import threading
import tempfile
import os

DEFAULT_CHUNK_SIZE = 2**20 * 64 # 64 MB per request

class BigFile(object):
    def __init__(self, file_obj, file_size, chunksize=DEFAULT_CHUNK_SIZE, start=True):
        self._file_obj = file_obj
        self._file_size = file_size
        self._lock = threading.RLock()
        self._load_condition = threading.Condition(self._lock)
        self._load_run = True
        self._loc = 0
        self._chunk_size = chunksize
        chunk_count = self._file_size // self._chunk_size
        chunk_count += 1 if self._file_size % self._chunk_size else 0
        self._chunks = [None for _ in xrange(chunk_count)]
        self._load_thread = threading.Thread(target=self._load)
        if start:
            self._load_thread.start()

    def _chunk_loc(self):
        ' Returns (chunk_num, chunk_offset) for a given location in the larger file '
        return self._loc // self._chunk_size, self._loc % self._chunk_size

    def _load_chunk(self, chunk_num):
        tf = tempfile.TemporaryFile()
        start_idx = chunk_num * self._chunk_size
        self._file_obj.seek(start_idx)
        tf.write(self._file_obj.read(self._chunk_size))
        with self._lock:
            self._chunks[chunk_num] = (tf, tf.tell()) # (tempfile, size)
            self._load_condition.notify()

    def _load(self):
        while self._load_run:
            # check current chunk, load if needed
            with self._lock:
                chunk_num, _ = self._chunk_loc()
                chunk_and_size = self._chunks[chunk_num]
            if chunk_and_size is None:
                self._load_chunk(chunk_num)

            # find next empty chunk
            for i in xrange(len(self._chunks)):
                cur_chunk = chunk_num + i
                    cur_chunk %= len(self._chunks) # loop around
                if self._chunks[cur_chunk] is None:
                    self._load_chunk(cur_chunk)
                    break
            else:
                # all done, stop thread
                break

    def seek(self, loc, rel=os.SEEK_SET):
        with self._lock:
            if rel == os.SEEK_CUR:
                self._loc += loc
            elif rel == os.SEEK_SET:
                self._loc = loc
            elif rel == os.SEEK_END:
                self._loc = self._file_size + loc

    def read(self, bytes_to_read):
        ret = []
        with self._lock:
            chunk_num, chunk_offset = self._chunk_loc()
            while (bytes_to_read > 0 or bytes_to_read == -1) and chunk_num < len(self._chunks):
                while not self._chunks[chunk_num]:
                    self._load_condition.wait()
                chunk, size = self._chunks[chunk_num]
                cur_chunk_bytes = min(self._chunk_size-chunk_offset, bytes_to_read, size)
                chunk.seek(chunk_offset, os.SEEK_SET)
                data = chunk.read(cur_chunk_bytes)
                ret.append(data)
                bytes_to_read -= len(data)
                chunk_num += 1
        return ''.join(ret)

    def start(self):
        self._load_thread.start()

    def join(self):
        self._load_thread.join()

    def stop(self):
        self._load_run = False

class S3RangeReader:
    def __init__(self, key_obj):
        self._key_obj = key_obj
        self.size = self._key_obj.size
        self._pos = 0

    def __len__(self):
        return self.size

    def seek(self, pos, rel=os.SEEK_SET):
        if rel == os.SEEK_CUR:
            self._pos += pos
        elif rel == os.SEEK_SET:
            self._pos = pos
        elif rel == os.SEEK_END:
            self._pos = self.size + pos

    def read(self, bytes=-1):
        if bytes == 0 or self._pos >= self.size:
            return ''
        else:
            if bytes == -1:
                bytes = self.size
            headers = {'Range': 'bytes=%s-%s' % (self._pos, self._pos + bytes - 1)} # S3 ranges are closed ranges: [start,end]
            return self._key_obj.get_contents_as_string(headers=headers)

if __name__ == '__main__':
    key = boto.s3_connect().get_bucket('mybucket').get_key('my_key')
    reader = S3RangeReader(key)
    bf = BigFile(reader, len(reader)) # download starts by default
    bf.seek(1000000)
    bf.read(100) # blocks
    bf.seek(0)
    bf.read(100) # should not block

撰写回答