polars:高效应用函数过滤字符串列的方法
我有一列很长的字符串(像句子一样),我想对它们做以下几件事:
- 替换某些字符
- 创建一个剩余字符串的列表
- 如果一个字符串全是文字,看看它是否在字典里,如果在就保留它
- 如果一个字符串全是数字,就保留它
- 如果一个字符串是数字和文字的混合,找出数字和字母的比例,如果比例高于某个阈值就保留它
我现在是这样做的:
for memo_field in self.memo_columns:
data = data.with_columns(
pl.col(memo_field).map_elements(
lambda x: self.filter_field(text=x, word_dict=word_dict))
)
filter_field 方法使用的是普通的 Python,所以:
text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text)
用来替换字符text_sani = text_sani.split(' ')
用来分割字符串len(re.findall(r'[A-Za-z]', x))
用来找出 text_sani 列表中每个元素的字母数量(数字的数量也是类似的),比例是字母数量和总字符数的差值除以总字符数- 使用列表推导和
if
来过滤单词列表
其实这样做还不错,处理 1.28 亿行数据大约需要 10 分钟。不过,未来的文件会大得多。在处理大约 3 亿行的文件时,这种方法会逐渐增加内存消耗,直到操作系统(Ubuntu)杀掉这个进程。而且,所有的处理似乎都是在一个核心上进行的。
我开始尝试使用 Polars 的字符串表达式,下面提供了代码和一个简单的示例。
目前看来,我唯一的选择是调用一个函数来完成剩下的工作。我的问题是:
- 在我最初的方法中,内存消耗增长是正常的吗?
map_elements
是否会创建原始序列的副本,从而消耗更多内存? - 我最初的方法正确吗?还是有更好的方法,比如我刚开始了解 Polars 中的
struct
? - 是否可以仅使用 Polars 表达式来实现我想要的?
更新
来自 @Hericks 和 @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ 的答案中的代码示例已经应用,并在很大程度上解决了我的第三个问题。实现 Polars 表达式大大减少了运行时间,有两个观察点:
- 在我的使用案例中,
memo
字段的复杂性对运行时间有很大影响。主要挑战是字典中项目的查找;一个大的字典和许多有效的单词在memo
字段中会严重影响运行时间; - 当我使用
pl.DataFrame
保存为.parquet
格式时,遇到了许多段错误。当使用pl.LazyFrame
和sink_parquet
时没有错误,但运行时间大大延长(驱动器是 NVME SSD,速度为 2000MB/s)
示例代码/数据:
玩具数据:
temp = pl.DataFrame({"foo": ['COOOPS.autom.SAPF124',
'OSS REEE PAAA comp. BEEE atm 6079 19000000070 04-04-2023',
'ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO',
'OSS REFF pagopago cost. Becf atm 9682 50012345726 10-04-2023']
})
代码函数:
def num_dec(x):
return len(re.findall(r'[0-9_\/]', x))
def num_letter(x):
return len(re.findall(r'[A-Za-z]', x))
def letter_dec_ratio(x):
if len(x) == 0:
return None
nl = num_letter(x)
nd = num_dec(x)
if (nl + nd) == 0:
return None
ratio = (nl - nd)/(nl + nd)
return ratio
def filter_field(text=None, word_dict=None):
if type(text) is not str or word_dict is None:
return 'no memo and/or dictionary'
if len(text) > 100:
text = text[0:101]
print("TEXT: ",text)
text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text) # parse by replacing most artifacts and symbols with space
words = text_sani.split(' ') # create words separated by spaces
print("WORDS: ",words)
kept = []
ratios = [letter_dec_ratio(w) for w in words]
[kept.append(w.lower()) for i, w in enumerate(words) if ratios[i] is not None and ((ratios[i] == -1 or (-0.7 <= ratios[i] <= 0)) or (ratios[i] == 1 and w.lower() in word_dict))]
print("FINAL: ",' '.join(kept))
return ' '.join(kept)
当前实现的代码:
temp.with_columns(
pl.col("foo").map_elements(
lambda x: filter_field(text=x, word_dict=['cost','atm'])).alias('clean_foo') # baseline
)
使用 Polars 的部分尝试代码:
这让我得到了正确的 WORDS
(见下一个代码块)
temp.with_columns(
(
pl.col(col)
.str.replace_all(r'[^a-zA-Z0-9\s\_\-\%]',' ')
.str.split(' ')
)
)
预期结果
(在每一步,见上面的 print
语句):
TEXT: COOOPS.autom.SAPF124
WORDS: ['COOOPS', 'autom', 'SAPF124']
FINAL:
TEXT: OSS REEE PAAA comp. BEEE atm 6079 19000000070 04-04-2023
WORDS: ['OSS', 'REEE', 'PAAA', 'comp', '', 'BEEE', '', 'atm', '6079', '19000000070', '04-04-2023']
FINAL: atm 6079 19000000070 04-04-2023
TEXT: ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO
WORDS: ['ABCD', '600000000397', '7667896-6', 'REG', 'REF', 'REE', 'PREPREO', 'HMO']
FINAL: 600000000397 7667896-6
TEXT: OSS REFF pagopago cost. Becf atm 9682 50012345726 10-04-2023
WORDS: ['OSS', 'REFF', 'pagopago', 'cost', '', 'Becf', '', 'atm', '9682', '50012345726', '10-04-2023']
FINAL: cost atm 9682 50012345726 10-04-2023
2 个回答
让我来看看能否帮你解决性能方面的问题。也许有办法让你的算法在你的系统上以可接受的性能运行。
为了下面的基准测试,我创建了一个包含超过十亿条记录的数据集,并添加了一些额外的列(模拟除了正在处理的字符串列以外的数据)。
shape: (1_073_741_824, 6)
┌───────────────────────────────────┬────────┬───────┬─────┬─────────────────────┬─────────────────────┐
│ foo ┆ string ┆ float ┆ int ┆ datetime ┆ date │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f32 ┆ i32 ┆ datetime[μs] ┆ datetime[μs] │
╞═══════════════════════════════════╪════════╪═══════╪═════╪═════════════════════╪═════════════════════╡
│ COOOPS.autom.SAPF124 ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE atm 60… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf at… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124 ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ OSS REFF pagopago cost. Becf at… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124 ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE atm 60… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf at… ┆ string ┆ 1.0 ┆ 1 ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
└───────────────────────────────────┴────────┴───────┴─────┴─────────────────────┴─────────────────────┘
为了模拟你描述的情况,我将数据框保存为一个默认压缩的parquet文件。这个parquet文件随后被用作下面代码中的输入(input_filepath
)。
作为参考,上面的数据集在完全加载到内存中时消耗了77 GB的内存(根据Polars的estimated_size
方法)。我在一台有32个核心的Threadripper Pro上运行这些基准测试,系统可用内存为504 GB。
使用 collect
使用正常的collect
方法和默认选项,我运行了以下从@Hericks的优秀回答中复制的代码:
from time import perf_counter
start = perf_counter()
word_list = ["cost", "atm"]
num_dec_expr = pl.element().str.count_matches(r"[0-9_\/]").cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r"[A-Za-z]").cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)
(
pl.scan_parquet(input_filepath)
.with_columns(
pl.col("foo")
.str.to_lowercase()
.str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
.str.split(" ")
.list.eval(
pl.element().filter(
pl.element().str.len_chars() > 0,
pl.element().is_in(word_list)
| num_letter_expr.eq(0)
| ratio_expr.is_between(-0.7, 0),
)
)
.list.join(" ")
.alias("foo_clean")
)
.collect()
)
print("Elapsed time: ", perf_counter() - start)
运行性能如你所描述的那样。算法很快就回到了单线程的行为,并且根据top
命令分配了过多的内存(250 GB)。我等了超过15分钟后杀掉了这个进程。
使用 collect(streaming=True, comm_subplan_elim=False)
将collect语句替换为collect(streaming=True, comm_subplan_elim=False)
后,上面的代码仅用167秒就完成了。而且top
显示算法将我Threadripper Pro系统的所有64个逻辑核心都推到了100%。
不过,算法在运行时仍然消耗了大量内存:大约160 GB,从top
命令上看。不过,这确实比使用默认选项的collect
时的250 GB要好。
使用 sink_parquet
将collect
方法替换为sink_parquet
方法,直接将结果保存到磁盘,算法运行了459秒(大约7.5分钟)。而在处理文件时,内存最高只用了8GB。
我想提醒一点,运行时的表现显示它使用了我系统的所有64个逻辑核心,但并没有达到100%。这可能是由于I/O瓶颈,但我对此表示怀疑。(我的系统有四个Gen4 1TB的NVME硬盘以RAID 0方式作为工作项目存储。)因此,我建议在I/O瓶颈更明显的系统上使用sink_parquet
可能会花更长时间。
诚然,直接将结果流式传输到磁盘可能不是你想要的。但这可能是你在处理过程中迈过这一步所需要的,以获得可接受的运行时间和内存占用。
如果这些内容对你有帮助,那就太好了。同时,请继续给@Hericks的接受回答以应有的认可。他的回答在如何使用Polars的原生字符串处理能力方面非常准确。
可以使用polars的原生表达式API来实现过滤,下面是具体的做法。我从问题中的简单实现中提取了正则表达式。
word_list = ["cost", "atm"]
# to avoid long expressions in ``pl.Expr.list.eval``
num_dec_expr = pl.element().str.count_matches(r'[0-9_\/]').cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r'[A-Za-z]').cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)
(
df
.with_columns(
pl.col("foo")
# convert to lowercase
.str.to_lowercase()
# replace special characters with space
.str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
# split string at spaces into list of words
.str.split(" ")
# filter list of words
.list.eval(
pl.element().filter(
# only keep non-empty string...
pl.element().str.len_chars() > 0,
# ...that either
# - are in the list of words,
# - consist only of characters related to numbers,
# - have a ratio between -0.7 and 0
pl.element().is_in(word_list) | num_letter_expr.eq(0) | ratio_expr.is_between(-0.7, 0)
)
)
# join list of words into string
.list.join(" ")
.alias("foo_clean")
)
)
shape: (4, 2)
┌───────────────────────────────────────────────────────────────┬──────────────────────────────────────┐
│ foo ┆ foo_clean │
│ --- ┆ --- │
│ str ┆ str │
╞═══════════════════════════════════════════════════════════════╪══════════════════════════════════════╡
│ COOOPS.autom.SAPF124 ┆ │
│ OSS REEE PAAA comp. BEEE atm 6079 19000000070 04-04-2023 ┆ atm 6079 19000000070 04-04-2023 │
│ ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO ┆ 600000000397 7667896-6 │
│ OSS REFF pagopago cost. Becf atm 9682 50012345726 10-04-2023 ┆ cost atm 9682 50012345726 10-04-2023 │
└───────────────────────────────────────────────────────────────┴──────────────────────────────────────┘