无法从Databricks笔记本在ADLS中执行并行写操作。OSError: [Errno 107] 传输端点未连接

0 投票
1 回答
43 浏览
提问于 2025-04-14 16:49

我在使用 foreach() 对一个数据框进行写入操作时遇到了这个错误。之前这段代码运行了超过三个月都没问题,但从上周开始就出错了。

为了让大家更明白,我有一个数据框叫做 extract_df,里面有两列,分别是 xml_full_name 和 content。我用下面的代码把这些记录写成 XML 文件,存到 ADLS 的一个目标文件夹里。

extract_df.foreach(write_file)

write_file 的定义如下:

def write_file(row):
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

这个笔记本还使用了 spark.write 命令来写一些 parquet 文件,这部分运行得很好。

经过进一步调查,我发现这个问题可能和并行处理有关。作为一个临时解决办法,我尝试了下面的代码:

for row in extract_df.collect():
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

这个方法有效,说明连接没有问题,但并行处理没有正常工作。不过,这个方法不能作为长期解决方案,因为会影响性能。

这里有没有人遇到过类似的问题?如果这是和某些配置有关的话,有什么建议可以检查的地方吗?欢迎大家提供意见。谢谢。

1 个回答

0

我尝试了下面的代码:

import os

def write_file_partition(rows):
    for row in rows:
        with open(row.xml_full_name, "w") as f:
            f.write(row.content)
extract_df.foreachPartition(write_file_partition)
write_file_partition(extract_df.collect())

结果:

在这里输入图片描述

在上面的代码中,使用了 foreachPartition() 方法,这个方法的作用是对每个数据分区执行一个函数。

我定义了一个函数 write_file_partition(),这个函数会接收一个数据分区(来自 extract_df 数据框),然后把内容写入文件。

接下来,你可以在 extract_df 数据框上调用 foreachPartition() 方法,并把 write_file_partition() 函数作为参数传进去。这样就会并行地对数据框的每个分区应用这个函数。

撰写回答