Python应用在Pandas合并时经常OOM崩溃

2 投票
2 回答
185 浏览
提问于 2025-04-12 21:42

我有一个轻量级的Python应用程序,它应该完成一个非常简单的任务,但由于内存不足(OOM)而不断崩溃。

应用程序的功能

  1. .parquet文件加载数据到数据框(dataframe)
  2. 使用stockstats包计算指标
  3. 将新计算的数据合并到原始数据框中,以便在一个数据框中同时包含OHCL和SUPERTREND -> 在这里崩溃
  4. 将数据框存储为.parquet文件

崩溃的位置

df = pd.merge(df, st, on=['datetime'])

使用的技术

  • Python 3.10
  • pandas~=2.1.4
  • stockstats~=0.4.1
  • Kubernetes 1.28.2-do.0(在Digital Ocean上运行)

奇怪的是,数据框非常小(df.size208446,文件大小是1.00337 MB,内存使用量是1.85537 MB)。

测量结果

import os

file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024)  # 1.00337 MB

df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6   # 1.85537 MB

df_size = dataframe.size  # 208446

部署信息

应用程序通过Helm部署到Kubernetes,设置了以下资源

resources:
  limits:
    cpu: 1000m
    memory: 6000Mi
  requests:
    cpu: 1000m
    memory: 1000Mi

我使用的是4个虚拟CPU和8GB内存的节点,并且节点没有性能压力。我创建了一个专用节点池,配置为8个虚拟CPU和16GB的节点,但问题依旧。

kubectl top node test-pool
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   38m          0%     2377Mi          17%

Pod信息

kubectl describe pod xxx
...
    State:          Waiting
      Reason:       CrashLoopBackOff
    Last State:     Terminated
      Reason:       OOMKilled
      Exit Code:    137
      Started:      Sun, 24 Mar 2024 16:08:56 +0000
      Finished:     Sun, 24 Mar 2024 16:09:06 +0000
...

这是来自Grafana的CPU和内存消耗情况。我知道很短的内存或CPU峰值很难看到,但从长期来看,应用程序并没有消耗很多RAM。另一方面,根据我的经验,我们在内存更少的容器上使用相同的pandas操作,数据框要大得多,但没有问题。

Grafana统计

我该如何解决这个问题? 还有什么我应该调试的,以防止内存不足?

数据和代码示例

原始数据框(命名为df

              datetime   open   high    low  close        volume
0  2023-11-14 11:15:00  2.185  2.187  2.171  2.187  19897.847314
1  2023-11-14 11:20:00  2.186  2.191  2.183  2.184   8884.634728
2  2023-11-14 11:25:00  2.184  2.185  2.171  2.176  12106.153954
3  2023-11-14 11:30:00  2.176  2.176  2.158  2.171  22904.354082
4  2023-11-14 11:35:00  2.171  2.173  2.167  2.171   1691.211455

新数据框(命名为st)。
注意:如果trend_orientation = 1 => st_lower = NaN,如果-1 => st_upper = NaN

              datetime   supertrend_ub  supertrend_lb    trend_orientation    st_trend_segment
0  2023-11-14 11:15:00   0.21495        NaN              -1                   1
1  2023-11-14 11:20:00   0.21495        NaN              -10                  1
2  2023-11-14 11:25:00   0.21495        NaN              -11                  1
3  2023-11-14 11:30:00   0.21495        NaN              -12                  1
4  2023-11-14 11:35:00   0.21495        NaN              -13                  1

代码示例

import pandas as pd
import multiprocessing
import numpy as np
import stockstats


def add_supertrend(market):
    try:
        # Read data from file
        df = pd.read_parquet(market, engine="fastparquet")

        # Extract date columns
        date_column = df['datetime']

        # Convert to stockstats object
        st_a = stockstats.wrap(df.copy())
        # Generate supertrend
        st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]

        # Add back datetime columns
        st_a.insert(0, "datetime", date_column)

        # Add trend orientation using conditional columns
        conditions = [
            st_a['supertrend_ub'] == st_a['supertrend'],
            st_a['supertrend_lb'] == st_a['supertrend']
        ]
        
        values = [-1, 1]
        st_a['trend_orientation'] = np.select(conditions, values)

        # Remove not required supertrend values
        st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
        st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN

        # Unwrap back to dataframe
        st = stockstats.unwrap(st_a)

        # Ensure correct date types are used
        st = st.astype({
            'supertrend': 'float32',
            'supertrend_ub': 'float32',
            'supertrend_lb': 'float32',
            'trend_orientation': 'int8'
        })
        # Add trend segments
        st_to = st[['trend_orientation']]
        st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
        
        # Remove trend value
        st.drop(columns=['supertrend'], inplace=True)

        # Merge ST with DF
        df = pd.merge(df, st, on=['datetime'])
        
        # Write back to parquet
        df.to_parquet(market, compression=None)
    except Exception as e:
        # Using proper logger in real code
        print(e)
        pass


def main():
    # Using fixed market as example, in real code market is fetched
    market = "BTCUSDT"
    # Using multiprocessing to free up memory after each iteration
    p = multiprocessing.Process(target=add_supertrend, args=(market,))
    p.start()
    p.join()


if __name__ == "__main__":
    main()

Dockerfile

FROM python:3.10

ENV PYTHONFAULTHANDLER=1 \
    PYTHONHASHSEED=random \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=.

# Adding vim
RUN ["apt-get", "update"]

# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt

# Copy main app
ADD . .
CMD main.py

可能的解决方案/尝试的方法

  • ❌: 尝试过;没有成功
  • : 一个我打算测试的想法
  • : 没有完全解决问题,但对解决方案有所帮助
  • ✅: 可行的解决方案

Lukasz Tracewski的建议

使用节点压力驱逐来测试Pod是否能够在节点上分配足够的内存

我已经做了:

  • 创建了新的节点池:8vCPU + 16 GB RAM
  • 确保只有我的Pod(和一些系统Pod)会部署在这个节点上(使用容忍和亲和性)
  • 进行了压力测试,没有出现内存不足或其他错误
...
          image: "polinux/stress"
          command: ["stress"]
          args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   694m         8%     7557Mi          54%

节点描述

  Namespace                   Name                                   CPU Requests  CPU Limits  Memory Requests  Memory Limits  Age
  ---------                   ----                                   ------------  ----------  ---------------  -------------  ---
  kube-system                 cilium-24qxl                           300m (3%)     0 (0%)      300Mi (2%)       0 (0%)         43m
  kube-system                 cpc-bridge-proxy-csvvg                 100m (1%)     0 (0%)      75Mi (0%)        0 (0%)         43m
  kube-system                 csi-do-node-tzbbh                      0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 disable-systemd-upgrade-timer-mqjsk    0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 do-node-agent-dv2z2                    102m (1%)     0 (0%)      80Mi (0%)        300Mi (2%)     43m
  kube-system                 konnectivity-agent-wq5p2               0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 kube-proxy-gvfrv                       0 (0%)        0 (0%)      125Mi (0%)       0 (0%)         43m
  scanners                    data-gw-enrich-d5cff4c95-bkjkc         100m (1%)     1 (12%)     1000Mi (7%)      6000Mi (43%)   2m33s

Pod没有因为内存不足而崩溃。所以很可能问题出在代码的某个地方。

详细的内存监控

我在多个点插入了内存测量。我使用psutil测量数据框的大小和内存使用情况。

import psutil

total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")

内存使用情况

  • 在从文件读取数据之前
    • RAM: 938.1929 MB
  • 在数据框加载后
    • df_mem_usage: 1.947708 MB
    • RAM: 954.1181 MB
  • 在生成ST之后
    • ST数据框的df_mem_usage: 1.147757 MB
    • RAM: 944.9226 MB
  • 在数据框合并之前
    • df_mem_usage: 945.4223 MB

❌ 不使用multiprocessing

为了“重置”每次迭代的内存,我使用了multiprocessing。但是我想确保这不会造成问题。我已经去掉了它,直接调用add_supertrend。但结果还是内存不足,所以我认为这不是问题所在。

真实数据

根据Lukasz Tracewski的建议,我分享了导致内存不足崩溃的真实数据。由于它们是parquet格式,我无法使用像pastebin这样的服务,所以我使用了GDrive。这个文件夹将用于分享与这个问题相关的其他内容。

❌ 升级pandas到2.2.1

有时候简单的包升级可能会有所帮助,所以我决定尝试将pandas升级到2.2.1,同时将fastparquet升级到2024.2.0更新的pandas需要更新的fastparquet)。pyarrow也升级到了15.0.0

在最初的几次迭代中似乎有效,但之后又崩溃了,出现内存不足。

❌ 使用Dask

我记得在处理复杂的数据框操作时,我曾使用过Dask。所以我也尝试在这种情况下使用它。但没有成功,还是内存不足。使用的版本是dask 2024.3.1

import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯\_(ツ)_/¯

重复的时间戳

在用Dask调查数据时,我注意到datetime列中有重复记录。这显然是不对的,时间戳必须是唯一的。我认为这可能导致了问题。我会进一步调查。

df.tail(10)
             datetime   open   high     low   close         volume
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408

我在准备数据的其他组件中实现了一个修复,去除了重复记录。修复的代码如下,我将监控这是否会有所帮助。

    # Append gathered data to df and write to file
    df = pd.concat([df, fresh_data])

    # Drop duplicates
    df = df.drop_duplicates(subset=["datetime"])

2 个回答

2

stockstats.StockDataFrame 是 pd.DataFrame 的一个子类。这意味着最后不需要进行合并操作。

在确保原始数据框(df)中的 "datetime" 列没有重复值后,你可以这样定义 st 数据框:

    st = stockstats.wrap(df, index_column="datetime")

所以,直接传入 df 而不是 df.copy(),并使用现有的 datetime 列作为索引。如果不这样做,stockstats 会去找一个名为 date 的列,但这里并没有这个列。这样它就会默认使用数据框的当前行索引,这可能不是你想要的,可能会导致一些难以发现的错误。

也不需要单独创建一个 st_a 变量。所有的数据框操作都可以在 st 上进行(完成后可以删除不需要的列)。

最后,不需要进行昂贵的合并操作,甚至不需要解包 st 数据框,因为它已经是 pd.DataFrame 的一个实例(包含了原始数据),所以你可以直接调用:

    st.reset_index().to_parquet(market, compression=None)
    # reset_index since you do want the `datetime` column back as regular column

唯一的不同是,st 数据框的所有列名都会变成小写。这对你的示例数据框来说不是问题,但如果这不是你想要的,你可以通过创建一个小的映射或者其他方式来恢复原来的列名(参考:stockstats.py#retype)。

在你原来的代码中,你明确地复制了一次 df,如果你使用 stocktrace.unwrap,你又会隐式地创建一个新的完整副本。而且同时使用 st_ast 意味着你隐式地处理了两个额外的完整副本。

使用合并(在原始代码中使用的 on=... 的方式)的问题在于,你实际上是将一个数据框与它自己(或接近副本的东西)合并——除了 on 列之外,所有原始列都会在合并的结果中重复,这可能不是你想要的。这也可能加重内存问题。

话虽如此,遇到如此小的数据框出现内存问题确实很神秘。我在一个模拟的数据框上运行了你的代码(相同的列,200,000 行),并没有在交互式 Python 会话中遇到任何问题(可用内存 32 GB,但峰值内存使用大约在 100MB 左右)(python 3.12/3.10,pandas 2.1.4,stockstats 0.6.2 和 0.4.1)。我没有使用多进程——其实我不明白你为什么要使用这个,因为在这种情况下它只会增加复杂性,至少会加倍内存压力,而没有真正的额外功能或好处。

最后一点评论。不知道你是否意识到,以下代码:

# Remove not required supertrend values
        st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
        st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN

并不会删除那些行,它只是将一些值设置为 NaN。要真正删除这些行,你需要在之后调用 st.dropna(inplace=True)

我修改后的代码如下:

def add_supertrend(market):
    try:
        # Read data from file
        df = pd.read_parquet(market, engine="fastparquet")  # consider using pyarrow

        # Convert to stockstats object
        st = stockstats.wrap(df, index_column="datetime")

        # Add trend orientation using conditional columns
        # The 'supertrend' related columns are magically added by stockstats
        _ = st["supertrend"]
        
        conditions = [
            st.supertrend_ub == st.supertrend,
            st.supertrend_lb == st.supertrend
        ]
        
        values = [-1, 1]
        st['trend_orientation'] = np.select(conditions, values)

        # Remove not required supertrend values
        st.loc[st.trend_orientation < 0, 'st_lower'] = np.NaN
        st.loc[st.trend_orientation > 0, 'st_upper'] = np.NaN
        # st.dropna(inplace=True) if you really want to remove those rows
        
        # Ensure correct date types are used
        st = st.astype({
            'supertrend': 'float32',
            'supertrend_ub': 'float32',
            'supertrend_lb': 'float32',
            'trend_orientation': 'int8'
        })
        
        # Add trend segments
        st_to = st.trend_orientation
        st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
        
        # Remove trend value
        st.drop(columns=['supertrend'], inplace=True)
        
        # Write back to parquet
        st.reset_index(inplace=True)
        st.to_parquet(market, compression=None)
    except Exception as e:
        # Using proper logger in real code
        print(e)
3

你的Python应用在使用Pandas进行合并操作时遇到了内存不足(OOM)崩溃的问题。

+---------------+       +-------------+        +--------------+          +-------------------+
| Load .parquet |  -->  | Calculate   |  --->  | Merge Data   |  --X-->  | Store .parquet    |
|  to DataFrame |       | Indicator   |        | Frames       |          | (Crashes here)    |
+---------------+       +-------------+        +--------------+          +-------------------+

虽然这些数据框相对较小,但崩溃表明在合并操作时内存使用突然增加。根据Grafana的CPU和内存使用情况,正常情况下资源使用是可以接受的,但在合并操作时可能会出现超过可用内存的情况。

你可以尝试将合并操作分成更小的部分,以减少内存使用。如果有临时的数据框或变量不再使用,记得明确删除它们,以释放内存。

更一般来说,可以逐步监控内存使用情况,以准确找出内存使用高峰出现的时刻。同时,实施日志记录,以捕捉进程的状态:

import pandas as pd
import multiprocessing
import numpy as np
import stockstats
import os

# Add a memory usage logger function:
def log_memory_usage(df, step):
    mem = df.memory_usage(deep=True).sum() / (1024 * 1024)  # in MB
    print(f'Memory usage after {step}: {mem:.2f} MB')

def add_supertrend(market):
    try:
        df = pd.read_parquet(market, engine="fastparquet")
        log_memory_usage(df, 'loading dataframe')

        # Perform the rest of the operations as before
        # 

        # After generating supertrend data
        log_memory_usage(st, 'after generating supertrend')

        # Before merge operation
        log_memory_usage(df, 'before merge')

        df = pd.merge(df, st, on=['datetime'])

        # After merge operation
        log_memory_usage(df, 'after merge')

        # Save the result
        df.to_parquet(market, compression=None)

    except Exception as e:
        print(e)

# main() and if __name__ == "__main__": block remains the same

通过在每个步骤添加日志记录,你可以查看控制台输出,看看崩溃发生前的内存使用情况。


正如Lukasz Tracewski评论中所建议的,进行一次合理性检查可以确保Kubernetes环境和应用的配置能够按预期处理内存分配。

你可以分配较大的内存(5 GB)来看看Kubernetes环境是否能正常处理。如果测试成功且没有发生OOM崩溃,那么问题可能不在Kubernetes配置上,而是Python应用本身的内存处理方式或是如何执行合并操作。

创建一个名为memory-stress-test.yaml的文件,并运行kubectl apply -f memory-stress-test.yaml

apiVersion: v1
kind: Pod
metadata:
  name: memory-stress-test
spec:
  containers:
  - name: memory-stress-test
    image: polinux/stress
    resources:
      limits:
        memory: "6000Mi"
      requests:
        memory: "5000Mi"
    command: ["stress"]
    args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]

然后使用kubectl get pod memory-stress-test监控Pod的状态,并使用kubectl describe pod memory-stress-test查看任何事件。

如果环境通过了这个测试,问题很可能出在代码或数据处理上,而不是容器或节点配置上。


节点资源不足:内存。阈值数量:100Mi, 可用:88864Ki。容器data-gw-enrich使用了6023732Ki,请求为4000Mi,内存消耗过大。

新的错误信息和驱逐状态表明,调度Pod的Kubernetes节点内存不足,触发了节点压力驱逐。data-gw-enrich容器的内存消耗远大于4000 MiB的请求,这表明要么是容器的内存需求被低估,要么是应用存在效率低下或内存泄漏的问题。

Kubernetes节点压力驱逐文档中的信息表明,kubelet可能会主动终止Pods以回收节点上的资源。当Pod因节点压力被驱逐时,是因为kubelet判断节点没有足够的可用内存来维持系统稳定。

根据你最后的错误信息:

  • 节点上的可用内存约为86.78 MiB,低于定义的阈值。
  • 容器使用了大约5882.55 MiB,超过了4000 MiB的请求。

如果你的工作负载通常需要更多内存,而你的节点接近内存限制,考虑为你的集群添加更多内存容量更大的节点,或者调整现有节点的大小。
如果你添加了一个内存更大的新节点,可以使用节点亲和性将你的Pod专门调度到该节点上。这样,你的内存密集型Pod就不会影响集群中的其他Pod。
给你的节点打标签(kubectl label nodes <node-name> high-memory=true),并更新你的Pod配置以使用节点亲和性:

apiVersion: v1
kind: Pod
metadata:
  name: memory-intensive-pod
spec:
  containers:
  - name: app-container
    image: your-image
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: high-memory
            operator: In
            values:
            - "true"

为了保留系统资源,你需要调整kubelet配置,指定--system-reserved--kube-reserved。这可以在kubelet配置文件中完成,或者通过命令行参数设置。

# /var/lib/kubelet/config.yaml
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
systemReserved:
  cpu: "500m"
  memory: "1Gi"
kubeReserved:
  cpu: "500m"
  memory: "1Gi"

撰写回答