Python应用在Pandas合并时经常OOM崩溃
我有一个轻量级的Python应用程序,它应该完成一个非常简单的任务,但由于内存不足(OOM)而不断崩溃。
应用程序的功能
- 从
.parquet
文件加载数据到数据框(dataframe) - 使用
stockstats
包计算指标 - 将新计算的数据合并到原始数据框中,以便在一个数据框中同时包含OHCL和SUPERTREND -> 在这里崩溃
- 将数据框存储为
.parquet
文件
崩溃的位置
df = pd.merge(df, st, on=['datetime'])
使用的技术
- Python
3.10
pandas~=2.1.4
stockstats~=0.4.1
- Kubernetes
1.28.2-do.0
(在Digital Ocean上运行)
奇怪的是,数据框非常小(df.size
是208446
,文件大小是1.00337 MB
,内存使用量是1.85537 MB
)。
测量结果
import os
file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024) # 1.00337 MB
df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6 # 1.85537 MB
df_size = dataframe.size # 208446
部署信息
应用程序通过Helm部署到Kubernetes,设置了以下资源
resources:
limits:
cpu: 1000m
memory: 6000Mi
requests:
cpu: 1000m
memory: 1000Mi
我使用的是4个虚拟CPU和8GB内存的节点,并且节点没有性能压力。我创建了一个专用节点池,配置为8个虚拟CPU和16GB的节点,但问题依旧。
kubectl top node test-pool
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 38m 0% 2377Mi 17%
Pod信息
kubectl describe pod xxx
...
State: Waiting
Reason: CrashLoopBackOff
Last State: Terminated
Reason: OOMKilled
Exit Code: 137
Started: Sun, 24 Mar 2024 16:08:56 +0000
Finished: Sun, 24 Mar 2024 16:09:06 +0000
...
这是来自Grafana的CPU和内存消耗情况。我知道很短的内存或CPU峰值很难看到,但从长期来看,应用程序并没有消耗很多RAM。另一方面,根据我的经验,我们在内存更少的容器上使用相同的pandas
操作,数据框要大得多,但没有问题。
我该如何解决这个问题? 还有什么我应该调试的,以防止内存不足?
数据和代码示例
原始数据框(命名为df
)
datetime open high low close volume
0 2023-11-14 11:15:00 2.185 2.187 2.171 2.187 19897.847314
1 2023-11-14 11:20:00 2.186 2.191 2.183 2.184 8884.634728
2 2023-11-14 11:25:00 2.184 2.185 2.171 2.176 12106.153954
3 2023-11-14 11:30:00 2.176 2.176 2.158 2.171 22904.354082
4 2023-11-14 11:35:00 2.171 2.173 2.167 2.171 1691.211455
新数据框(命名为st
)。
注意:如果trend_orientation = 1
=> st_lower = NaN
,如果-1 => st_upper = NaN
datetime supertrend_ub supertrend_lb trend_orientation st_trend_segment
0 2023-11-14 11:15:00 0.21495 NaN -1 1
1 2023-11-14 11:20:00 0.21495 NaN -10 1
2 2023-11-14 11:25:00 0.21495 NaN -11 1
3 2023-11-14 11:30:00 0.21495 NaN -12 1
4 2023-11-14 11:35:00 0.21495 NaN -13 1
代码示例
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
def add_supertrend(market):
try:
# Read data from file
df = pd.read_parquet(market, engine="fastparquet")
# Extract date columns
date_column = df['datetime']
# Convert to stockstats object
st_a = stockstats.wrap(df.copy())
# Generate supertrend
st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]
# Add back datetime columns
st_a.insert(0, "datetime", date_column)
# Add trend orientation using conditional columns
conditions = [
st_a['supertrend_ub'] == st_a['supertrend'],
st_a['supertrend_lb'] == st_a['supertrend']
]
values = [-1, 1]
st_a['trend_orientation'] = np.select(conditions, values)
# Remove not required supertrend values
st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN
# Unwrap back to dataframe
st = stockstats.unwrap(st_a)
# Ensure correct date types are used
st = st.astype({
'supertrend': 'float32',
'supertrend_ub': 'float32',
'supertrend_lb': 'float32',
'trend_orientation': 'int8'
})
# Add trend segments
st_to = st[['trend_orientation']]
st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
# Remove trend value
st.drop(columns=['supertrend'], inplace=True)
# Merge ST with DF
df = pd.merge(df, st, on=['datetime'])
# Write back to parquet
df.to_parquet(market, compression=None)
except Exception as e:
# Using proper logger in real code
print(e)
pass
def main():
# Using fixed market as example, in real code market is fetched
market = "BTCUSDT"
# Using multiprocessing to free up memory after each iteration
p = multiprocessing.Process(target=add_supertrend, args=(market,))
p.start()
p.join()
if __name__ == "__main__":
main()
Dockerfile
FROM python:3.10
ENV PYTHONFAULTHANDLER=1 \
PYTHONHASHSEED=random \
PYTHONUNBUFFERED=1 \
PYTHONPATH=.
# Adding vim
RUN ["apt-get", "update"]
# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt
# Copy main app
ADD . .
CMD main.py
可能的解决方案/尝试的方法
- ❌: 尝试过;没有成功
- : 一个我打算测试的想法
- : 没有完全解决问题,但对解决方案有所帮助
- ✅: 可行的解决方案
Lukasz Tracewski的建议
使用节点压力驱逐来测试Pod是否能够在节点上分配足够的内存
我已经做了:
- 创建了新的节点池:
8vCPU + 16 GB RAM
- 确保只有我的Pod(和一些系统Pod)会部署在这个节点上(使用容忍和亲和性)
- 进行了压力测试,没有出现内存不足或其他错误
...
image: "polinux/stress"
command: ["stress"]
args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 694m 8% 7557Mi 54%
节点描述
Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits Age
--------- ---- ------------ ---------- --------------- ------------- ---
kube-system cilium-24qxl 300m (3%) 0 (0%) 300Mi (2%) 0 (0%) 43m
kube-system cpc-bridge-proxy-csvvg 100m (1%) 0 (0%) 75Mi (0%) 0 (0%) 43m
kube-system csi-do-node-tzbbh 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system disable-systemd-upgrade-timer-mqjsk 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system do-node-agent-dv2z2 102m (1%) 0 (0%) 80Mi (0%) 300Mi (2%) 43m
kube-system konnectivity-agent-wq5p2 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system kube-proxy-gvfrv 0 (0%) 0 (0%) 125Mi (0%) 0 (0%) 43m
scanners data-gw-enrich-d5cff4c95-bkjkc 100m (1%) 1 (12%) 1000Mi (7%) 6000Mi (43%) 2m33s
Pod没有因为内存不足而崩溃。所以很可能问题出在代码的某个地方。
详细的内存监控
我在多个点插入了内存测量。我使用psutil
测量数据框的大小和内存使用情况。
import psutil
total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")
内存使用情况
- 在从文件读取数据之前
- RAM:
938.1929 MB
- RAM:
- 在数据框加载后
- df_mem_usage:
1.947708 MB
- RAM:
954.1181 MB
- df_mem_usage:
- 在生成ST之后
- ST数据框的df_mem_usage:
1.147757 MB
- RAM:
944.9226 MB
- ST数据框的df_mem_usage:
- 在数据框合并之前
- df_mem_usage:
945.4223 MB
- df_mem_usage:
❌ 不使用multiprocessing
为了“重置”每次迭代的内存,我使用了multiprocessing
。但是我想确保这不会造成问题。我已经去掉了它,直接调用add_supertrend
。但结果还是内存不足,所以我认为这不是问题所在。
真实数据
根据Lukasz Tracewski的建议,我分享了导致内存不足崩溃的真实数据。由于它们是parquet
格式,我无法使用像pastebin这样的服务,所以我使用了GDrive。这个文件夹将用于分享与这个问题相关的其他内容。
❌ 升级pandas到2.2.1
有时候简单的包升级可能会有所帮助,所以我决定尝试将pandas升级到2.2.1
,同时将fastparquet
升级到2024.2.0
(更新的pandas需要更新的fastparquet)。pyarrow
也升级到了15.0.0
。
在最初的几次迭代中似乎有效,但之后又崩溃了,出现内存不足。
❌ 使用Dask
我记得在处理复杂的数据框操作时,我曾使用过Dask。所以我也尝试在这种情况下使用它。但没有成功,还是内存不足。使用的版本是dask
2024.3.1
。
import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯\_(ツ)_/¯
重复的时间戳
在用Dask调查数据时,我注意到datetime
列中有重复记录。这显然是不对的,时间戳必须是唯一的。我认为这可能导致了问题。我会进一步调查。
df.tail(10)
datetime open high low close volume
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
我在准备数据的其他组件中实现了一个修复,去除了重复记录。修复的代码如下,我将监控这是否会有所帮助。
# Append gathered data to df and write to file
df = pd.concat([df, fresh_data])
# Drop duplicates
df = df.drop_duplicates(subset=["datetime"])
2 个回答
stockstats.StockDataFrame 是 pd.DataFrame 的一个子类。这意味着最后不需要进行合并操作。
在确保原始数据框(df)中的 "datetime" 列没有重复值后,你可以这样定义 st
数据框:
st = stockstats.wrap(df, index_column="datetime")
所以,直接传入 df
而不是 df.copy()
,并使用现有的 datetime
列作为索引。如果不这样做,stockstats
会去找一个名为 date
的列,但这里并没有这个列。这样它就会默认使用数据框的当前行索引,这可能不是你想要的,可能会导致一些难以发现的错误。
也不需要单独创建一个 st_a
变量。所有的数据框操作都可以在 st
上进行(完成后可以删除不需要的列)。
最后,不需要进行昂贵的合并操作,甚至不需要解包 st
数据框,因为它已经是 pd.DataFrame 的一个实例(包含了原始数据),所以你可以直接调用:
st.reset_index().to_parquet(market, compression=None)
# reset_index since you do want the `datetime` column back as regular column
唯一的不同是,st
数据框的所有列名都会变成小写。这对你的示例数据框来说不是问题,但如果这不是你想要的,你可以通过创建一个小的映射或者其他方式来恢复原来的列名(参考:stockstats.py#retype)。
在你原来的代码中,你明确地复制了一次 df,如果你使用 stocktrace.unwrap
,你又会隐式地创建一个新的完整副本。而且同时使用 st_a
和 st
意味着你隐式地处理了两个额外的完整副本。
使用合并(在原始代码中使用的 on=...
的方式)的问题在于,你实际上是将一个数据框与它自己(或接近副本的东西)合并——除了 on
列之外,所有原始列都会在合并的结果中重复,这可能不是你想要的。这也可能加重内存问题。
话虽如此,遇到如此小的数据框出现内存问题确实很神秘。我在一个模拟的数据框上运行了你的代码(相同的列,200,000 行),并没有在交互式 Python 会话中遇到任何问题(可用内存 32 GB,但峰值内存使用大约在 100MB 左右)(python 3.12/3.10,pandas 2.1.4,stockstats 0.6.2 和 0.4.1)。我没有使用多进程——其实我不明白你为什么要使用这个,因为在这种情况下它只会增加复杂性,至少会加倍内存压力,而没有真正的额外功能或好处。
最后一点评论。不知道你是否意识到,以下代码:
# Remove not required supertrend values
st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN
并不会删除那些行,它只是将一些值设置为 NaN。要真正删除这些行,你需要在之后调用 st.dropna(inplace=True)
。
我修改后的代码如下:
def add_supertrend(market):
try:
# Read data from file
df = pd.read_parquet(market, engine="fastparquet") # consider using pyarrow
# Convert to stockstats object
st = stockstats.wrap(df, index_column="datetime")
# Add trend orientation using conditional columns
# The 'supertrend' related columns are magically added by stockstats
_ = st["supertrend"]
conditions = [
st.supertrend_ub == st.supertrend,
st.supertrend_lb == st.supertrend
]
values = [-1, 1]
st['trend_orientation'] = np.select(conditions, values)
# Remove not required supertrend values
st.loc[st.trend_orientation < 0, 'st_lower'] = np.NaN
st.loc[st.trend_orientation > 0, 'st_upper'] = np.NaN
# st.dropna(inplace=True) if you really want to remove those rows
# Ensure correct date types are used
st = st.astype({
'supertrend': 'float32',
'supertrend_ub': 'float32',
'supertrend_lb': 'float32',
'trend_orientation': 'int8'
})
# Add trend segments
st_to = st.trend_orientation
st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
# Remove trend value
st.drop(columns=['supertrend'], inplace=True)
# Write back to parquet
st.reset_index(inplace=True)
st.to_parquet(market, compression=None)
except Exception as e:
# Using proper logger in real code
print(e)
你的Python应用在使用Pandas进行合并操作时遇到了内存不足(OOM)崩溃的问题。
+---------------+ +-------------+ +--------------+ +-------------------+
| Load .parquet | --> | Calculate | ---> | Merge Data | --X--> | Store .parquet |
| to DataFrame | | Indicator | | Frames | | (Crashes here) |
+---------------+ +-------------+ +--------------+ +-------------------+
虽然这些数据框相对较小,但崩溃表明在合并操作时内存使用突然增加。根据Grafana的CPU和内存使用情况,正常情况下资源使用是可以接受的,但在合并操作时可能会出现超过可用内存的情况。
你可以尝试将合并操作分成更小的部分,以减少内存使用。如果有临时的数据框或变量不再使用,记得明确删除它们,以释放内存。
更一般来说,可以逐步监控内存使用情况,以准确找出内存使用高峰出现的时刻。同时,实施日志记录,以捕捉进程的状态:
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
import os
# Add a memory usage logger function:
def log_memory_usage(df, step):
mem = df.memory_usage(deep=True).sum() / (1024 * 1024) # in MB
print(f'Memory usage after {step}: {mem:.2f} MB')
def add_supertrend(market):
try:
df = pd.read_parquet(market, engine="fastparquet")
log_memory_usage(df, 'loading dataframe')
# Perform the rest of the operations as before
#
# After generating supertrend data
log_memory_usage(st, 'after generating supertrend')
# Before merge operation
log_memory_usage(df, 'before merge')
df = pd.merge(df, st, on=['datetime'])
# After merge operation
log_memory_usage(df, 'after merge')
# Save the result
df.to_parquet(market, compression=None)
except Exception as e:
print(e)
# main() and if __name__ == "__main__": block remains the same
通过在每个步骤添加日志记录,你可以查看控制台输出,看看崩溃发生前的内存使用情况。
正如Lukasz Tracewski在评论中所建议的,进行一次合理性检查可以确保Kubernetes环境和应用的配置能够按预期处理内存分配。
你可以分配较大的内存(5 GB)来看看Kubernetes环境是否能正常处理。如果测试成功且没有发生OOM崩溃,那么问题可能不在Kubernetes配置上,而是Python应用本身的内存处理方式或是如何执行
创建一个名为memory-stress-test.yaml
的文件,并运行kubectl apply -f memory-stress-test.yaml
。
apiVersion: v1
kind: Pod
metadata:
name: memory-stress-test
spec:
containers:
- name: memory-stress-test
image: polinux/stress
resources:
limits:
memory: "6000Mi"
requests:
memory: "5000Mi"
command: ["stress"]
args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
然后使用kubectl get pod memory-stress-test
监控Pod的状态,并使用kubectl describe pod memory-stress-test
查看任何事件。
如果环境通过了这个测试,问题很可能出在代码或数据处理上,而不是容器或节点配置上。
节点资源不足:内存。阈值数量:100Mi, 可用:88864Ki。容器data-gw-enrich使用了6023732Ki,请求为4000Mi,内存消耗过大。
新的错误信息和驱逐状态表明,调度Pod的Kubernetes节点内存不足,触发了节点压力驱逐。data-gw-enrich
容器的内存消耗远大于4000 MiB的请求,这表明要么是容器的内存需求被低估,要么是应用存在效率低下或内存泄漏的问题。
Kubernetes节点压力驱逐文档中的信息表明,kubelet可能会主动终止Pods以回收节点上的资源。当Pod因节点压力被驱逐时,是因为kubelet判断节点没有足够的可用内存来维持系统稳定。
根据你最后的错误信息:
- 节点上的可用内存约为86.78 MiB,低于定义的阈值。
- 容器使用了大约5882.55 MiB,超过了4000 MiB的请求。
如果你的工作负载通常需要更多内存,而你的节点接近内存限制,考虑为你的集群添加更多内存容量更大的节点,或者调整现有节点的大小。
如果你添加了一个内存更大的新节点,可以使用节点亲和性将你的Pod专门调度到该节点上。这样,你的内存密集型Pod就不会影响集群中的其他Pod。
给你的节点打标签(kubectl label nodes <node-name> high-memory=true
),并更新你的Pod配置以使用节点亲和性:
apiVersion: v1
kind: Pod
metadata:
name: memory-intensive-pod
spec:
containers:
- name: app-container
image: your-image
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: high-memory
operator: In
values:
- "true"
为了保留系统资源,你需要调整kubelet配置,指定--system-reserved
和--kube-reserved
。这可以在kubelet配置文件中完成,或者通过命令行参数设置。
# /var/lib/kubelet/config.yaml
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
systemReserved:
cpu: "500m"
memory: "1Gi"
kubeReserved:
cpu: "500m"
memory: "1Gi"