尽管知道内存泄漏来源,仍无法修复生成器内存泄漏

0 投票
1 回答
68 浏览
提问于 2025-04-12 22:15

除了可以在这里下载的压缩文件,

https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/BTCUSDT/

下面是一个简单的示例。首先,下载三个或更多的压缩文件,并获取每个压缩文件的路径。然后在下面的代码中,把我写的路径替换成你的路径:

from zipfile import ZipFile
import numpy as np
import pandas as pd
import psutil
import re

def print_memory_usage(msg):
    # Get memory usage information
    memory_info = psutil.virtual_memory()
    
    # Print memory usage details
    print(msg)
    print(f"Used: {memory_info.used / (1024 ** 3):.2f} GB")

def batch_generator(files: list):

    def _create_batch(file):
        return _read_file(file)

    for batch in files:
        df = _create_batch(batch)
        yield df

def _read_file(file):
    with ZipFile(file) as zipfile:
        csv_filename = re.split(r'/', file)[-1][:-4] + ".csv"
        with zipfile.open(csv_filename) as f:
            try:
                return read_aggtrades(f)
            except Exception as e:
                print(e)
                raise Exception(f"Error occurred reading file: {csv_filename}")

def read_aggtrades(file) -> pd.DataFrame:

    # EX: 578304464,17085,0.01449000,684164672,684164672,14,True,True
    columns = ['a', 'price', 'q', 'first_trade_id', 'last_trade_id', 't', 'was_the_buyer_maker']
    usecols = ['a', 'price', 'q', 't']
    dtype = {'a': np.int64, 'price': str, 'q': str, 't': np.int64}

    def peek_line(f):
        pos = f.tell()
        line = f.readline()
        f.seek(pos)

        # Convert bytes to str (line can be bytes or str)
        if type(line) == bytes:
            return line.decode()

        return line

    def _read_csv(f1):
        # 99.9% files don't have headers, some do. Discard it if we encounter it by reading header line.
        first_line = peek_line(f1)
        if first_line.startswith('agg_trade_id'):
            f1.readline()
        return pd.read_csv(f1,
                         sep=',',
                         header=None,
                         names=columns,
                         usecols=usecols,
                         dtype=dtype)
    print_memory_usage("Before _read_csv")
    df = _read_csv(file)
    print_memory_usage("After _read_csv")

    return df

file_list = [
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2019-12-31.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-01.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-02.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-03.zip"]
generator = batch_generator(file_list)

i = 0
for batch in generator:
    i += 1
    print(i)

当你运行这个代码时,你会看到输出显示内存使用量在不断增加:

Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.31 GB
1
Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.32 GB
2
Before _read_csv
Used: 10.32 GB
After _read_csv
Used: 10.33 GB
3
Before _read_csv
Used: 10.33 GB
After _read_csv
Used: 10.35 GB
4

我不知道该怎么办。我试着在打印语句后面加上 gc.collect()del(batch),但这并没有帮助。

1 个回答

-1

使用不同的方法来收集数据似乎会有所帮助。

这里没有发现内存泄漏的迹象。

import zipfile
from pathlib import Path
import psutil
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor

maxmem = float("-inf")
minmem = float("inf")

def print_memory_usage(msg):
    global maxmem, minmem
    mem = psutil.virtual_memory().used / (1024 ** 3)
    maxmem = max(mem, maxmem)
    minmem = min(mem, minmem)
    print(f"{msg}: {mem:.2f} GB")


def read_aggtrades(file) -> pd.DataFrame:
    columns = [
        "agg_trade_id",
        "price",
        "quantity",
        "first_trade_id",
        "last_trade_id",
        "transact_time",
        "is_buyer_maker",
    ]
    usecols = ["agg_trade_id", "price", "quantity", "transact_time"]
    dtype = {
        "agg_trade_id": np.int64,
        "price": np.float64,
        "quantity": np.float64,
        "transact_time": np.int64,
    }

    def _read_csv(f1):
        c1, *_ = f1.readline().split(",")
        try:
            int(c1)
            f1.seek(0)
        except ValueError:
            pass
        return pd.read_csv(
            f1, sep=",", header=None, names=columns, usecols=usecols, dtype=dtype
        )

    print_memory_usage("Before _read_csv")
    df = _read_csv(file)
    print_memory_usage("After _read_csv")

    return df


def read_file(filename):
    with zipfile.ZipFile(filename) as z:
        zf, *_ = z.filelist
        z.extract(zf, DOWNLOADS)
        with open(DOWNLOADS / zf.filename) as data:
            return read_aggtrades(data)


def batch_generator(files):
    with ThreadPoolExecutor(8) as exe:
        yield from exe.map(read_file, files)


DOWNLOADS = Path("/Users/SIGHUP/Downloads")

dfs = []

for i, df in enumerate(batch_generator(DOWNLOADS.glob("BTCUSDT*zip"))):
    print(i)
    dfs.append(df)
    print(df.head())

print(f"{maxmem=:.2f}GB {minmem=:.2f}GB")

平台:

macOS 14.4
Python 3.12.2

撰写回答