polars:高效应用函数过滤字符串列的方法

3 投票
2 回答
178 浏览
提问于 2025-04-13 03:04

我有一列很长的字符串(像句子一样),我想对它们做以下几件事:

  1. 替换某些字符
  2. 创建一个剩余字符串的列表
  3. 如果一个字符串全是文字,看看它是否在字典里,如果在就保留它
  4. 如果一个字符串全是数字,就保留它
  5. 如果一个字符串是数字和文字的混合,找出数字和字母的比例,如果比例高于某个阈值就保留它

我现在是这样做的:

            for memo_field in self.memo_columns:
                data = data.with_columns(
                    pl.col(memo_field).map_elements(
                        lambda x: self.filter_field(text=x, word_dict=word_dict))
                    )

filter_field 方法使用的是普通的 Python,所以:

  • text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text) 用来替换字符
  • text_sani = text_sani.split(' ') 用来分割字符串
  • len(re.findall(r'[A-Za-z]', x)) 用来找出 text_sani 列表中每个元素的字母数量(数字的数量也是类似的),比例是字母数量和总字符数的差值除以总字符数
  • 使用列表推导和 if 来过滤单词列表

其实这样做还不错,处理 1.28 亿行数据大约需要 10 分钟。不过,未来的文件会大得多。在处理大约 3 亿行的文件时,这种方法会逐渐增加内存消耗,直到操作系统(Ubuntu)杀掉这个进程。而且,所有的处理似乎都是在一个核心上进行的。

我开始尝试使用 Polars 的字符串表达式,下面提供了代码和一个简单的示例

目前看来,我唯一的选择是调用一个函数来完成剩下的工作。我的问题是:

  1. 在我最初的方法中,内存消耗增长是正常的吗?map_elements 是否会创建原始序列的副本,从而消耗更多内存?
  2. 我最初的方法正确吗?还是有更好的方法,比如我刚开始了解 Polars 中的 struct
  3. 是否可以仅使用 Polars 表达式来实现我想要的?

更新

来自 @Hericks 和 @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ 的答案中的代码示例已经应用,并在很大程度上解决了我的第三个问题。实现 Polars 表达式大大减少了运行时间,有两个观察点:

  1. 在我的使用案例中,memo 字段的复杂性对运行时间有很大影响。主要挑战是字典中项目的查找;一个大的字典和许多有效的单词在 memo 字段中会严重影响运行时间;
  2. 当我使用 pl.DataFrame 保存为 .parquet 格式时,遇到了许多段错误。当使用 pl.LazyFramesink_parquet 时没有错误,但运行时间大大延长(驱动器是 NVME SSD,速度为 2000MB/s)

示例代码/数据:

玩具数据:

temp = pl.DataFrame({"foo": ['COOOPS.autom.SAPF124',
                            'OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023',
                            'ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO',
                            'OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023']
                    })

代码函数:

def num_dec(x):
    return len(re.findall(r'[0-9_\/]', x))

def num_letter(x):
    return len(re.findall(r'[A-Za-z]', x))

def letter_dec_ratio(x):
    if len(x) == 0:
        return None
    nl = num_letter(x)
    nd = num_dec(x)
    if (nl + nd) == 0:       
        return None
    ratio = (nl - nd)/(nl + nd)
    return ratio

def filter_field(text=None, word_dict=None):

    if type(text) is not str or word_dict is None:
        return 'no memo and/or dictionary'

    if len(text) > 100:
        text = text[0:101]
    print("TEXT: ",text)
    text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text) # parse by replacing most artifacts and symbols with space 

    words = text_sani.split(' ') # create words separated by spaces
    print("WORDS: ",words)

    kept = []
    ratios = [letter_dec_ratio(w) for w in words]
    [kept.append(w.lower()) for i, w in enumerate(words) if ratios[i] is not None and ((ratios[i] == -1 or (-0.7 <= ratios[i] <= 0)) or (ratios[i] == 1 and w.lower() in word_dict))]
    print("FINAL: ",' '.join(kept))

    return ' '.join(kept)

当前实现的代码:

temp.with_columns(
                pl.col("foo").map_elements(
                    lambda x: filter_field(text=x, word_dict=['cost','atm'])).alias('clean_foo') # baseline
                )

使用 Polars 的部分尝试代码:

这让我得到了正确的 WORDS(见下一个代码块)

temp.with_columns(
    (
        pl.col(col)
        .str.replace_all(r'[^a-zA-Z0-9\s\_\-\%]',' ')
        .str.split(' ')
    )
)

预期结果(在每一步,见上面的 print 语句):

TEXT:  COOOPS.autom.SAPF124
WORDS:  ['COOOPS', 'autom', 'SAPF124']
FINAL:  
TEXT:  OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023
WORDS:  ['OSS', 'REEE', 'PAAA', 'comp', '', 'BEEE', '', 'atm', '6079', '19000000070', '04-04-2023']
FINAL:  atm 6079 19000000070 04-04-2023
TEXT:  ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO
WORDS:  ['ABCD', '600000000397', '7667896-6', 'REG', 'REF', 'REE', 'PREPREO', 'HMO']
FINAL:  600000000397 7667896-6
TEXT:  OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023
WORDS:  ['OSS', 'REFF', 'pagopago', 'cost', '', 'Becf', '', 'atm', '9682', '50012345726', '10-04-2023']
FINAL:  cost atm 9682 50012345726 10-04-2023

2 个回答

3

让我来看看能否帮你解决性能方面的问题。也许有办法让你的算法在你的系统上以可接受的性能运行。

为了下面的基准测试,我创建了一个包含超过十亿条记录的数据集,并添加了一些额外的列(模拟除了正在处理的字符串列以外的数据)。

shape: (1_073_741_824, 6)
┌───────────────────────────────────┬────────┬───────┬─────┬─────────────────────┬─────────────────────┐
│ foo                               ┆ string ┆ float ┆ int ┆ datetime            ┆ date                │
│ ---                               ┆ ---    ┆ ---   ┆ --- ┆ ---                 ┆ ---                 │
│ str                               ┆ str    ┆ f32   ┆ i32 ┆ datetime[μs]        ┆ datetime[μs]        │
╞═══════════════════════════════════╪════════╪═══════╪═════╪═════════════════════╪═════════════════════╡
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ …                                 ┆ …      ┆ …     ┆ …   ┆ …                   ┆ …                   │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
└───────────────────────────────────┴────────┴───────┴─────┴─────────────────────┴─────────────────────┘

为了模拟你描述的情况,我将数据框保存为一个默认压缩的parquet文件。这个parquet文件随后被用作下面代码中的输入(input_filepath)。

作为参考,上面的数据集在完全加载到内存中时消耗了77 GB的内存(根据Polars的estimated_size方法)。我在一台有32个核心的Threadripper Pro上运行这些基准测试,系统可用内存为504 GB。

使用 collect

使用正常的collect方法和默认选项,我运行了以下从@Hericks的优秀回答中复制的代码:

from time import perf_counter


start = perf_counter()
word_list = ["cost", "atm"]

num_dec_expr = pl.element().str.count_matches(r"[0-9_\/]").cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r"[A-Za-z]").cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)

(
    pl.scan_parquet(input_filepath)
    .with_columns(
        pl.col("foo")
        .str.to_lowercase()
        .str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
        .str.split(" ")
        .list.eval(
            pl.element().filter(
                pl.element().str.len_chars() > 0,
                pl.element().is_in(word_list)
                | num_letter_expr.eq(0)
                | ratio_expr.is_between(-0.7, 0),
            )
        )
        .list.join(" ")
        .alias("foo_clean")
    )
    .collect()
)

print("Elapsed time: ", perf_counter() - start)

运行性能如你所描述的那样。算法很快就回到了单线程的行为,并且根据top命令分配了过多的内存(250 GB)。我等了超过15分钟后杀掉了这个进程。

使用 collect(streaming=True, comm_subplan_elim=False)

将collect语句替换为collect(streaming=True, comm_subplan_elim=False)后,上面的代码仅用167秒就完成了。而且top显示算法将我Threadripper Pro系统的所有64个逻辑核心都推到了100%。

不过,算法在运行时仍然消耗了大量内存:大约160 GB,从top命令上看。不过,这确实比使用默认选项的collect时的250 GB要好。

使用 sink_parquet

collect方法替换为sink_parquet方法,直接将结果保存到磁盘,算法运行了459秒(大约7.5分钟)。而在处理文件时,内存最高只用了8GB。

我想提醒一点,运行时的表现显示它使用了我系统的所有64个逻辑核心,但并没有达到100%。这可能是由于I/O瓶颈,但我对此表示怀疑。(我的系统有四个Gen4 1TB的NVME硬盘以RAID 0方式作为工作项目存储。)因此,我建议在I/O瓶颈更明显的系统上使用sink_parquet可能会花更长时间。

诚然,直接将结果流式传输到磁盘可能不是你想要的。但这可能是你在处理过程中迈过这一步所需要的,以获得可接受的运行时间和内存占用。

如果这些内容对你有帮助,那就太好了。同时,请继续给@Hericks的接受回答以应有的认可。他的回答在如何使用Polars的原生字符串处理能力方面非常准确。

4

可以使用polars的原生表达式API来实现过滤,下面是具体的做法。我从问题中的简单实现中提取了正则表达式。

word_list = ["cost", "atm"]

# to avoid long expressions in ``pl.Expr.list.eval``
num_dec_expr = pl.element().str.count_matches(r'[0-9_\/]').cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r'[A-Za-z]').cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)

(
    df
    .with_columns(
        pl.col("foo")
        # convert to lowercase
        .str.to_lowercase()
        # replace special characters with space
        .str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
        # split string at spaces into list of words
        .str.split(" ")
        # filter list of words
        .list.eval(
            pl.element().filter(
                # only keep non-empty string...
                pl.element().str.len_chars() > 0,
                # ...that either 
                # - are in the list of words,
                # - consist only of characters related to numbers,
                # - have a ratio between -0.7 and 0
                pl.element().is_in(word_list) | num_letter_expr.eq(0) | ratio_expr.is_between(-0.7, 0)
            )
        )
        # join list of words into string
        .list.join(" ")
        .alias("foo_clean")
    )
)
shape: (4, 2)
┌───────────────────────────────────────────────────────────────┬──────────────────────────────────────┐
│ foo                                                           ┆ foo_clean                            │
│ ---                                                           ┆ ---                                  │
│ str                                                           ┆ str                                  │
╞═══════════════════════════════════════════════════════════════╪══════════════════════════════════════╡
│ COOOPS.autom.SAPF124                                          ┆                                      │
│ OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023     ┆ atm 6079 19000000070 04-04-2023      │
│ ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO           ┆ 600000000397 7667896-6               │
│ OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023 ┆ cost atm 9682 50012345726 10-04-2023 │
└───────────────────────────────────────────────────────────────┴──────────────────────────────────────┘

撰写回答