无法从Databricks笔记本在ADLS中执行并行写操作。OSError: [Errno 107] 传输端点未连接
我在使用 foreach()
对一个数据框进行写入操作时遇到了这个错误。之前这段代码运行了超过三个月都没问题,但从上周开始就出错了。
为了让大家更明白,我有一个数据框叫做 extract_df,里面有两列,分别是 xml_full_name 和 content。我用下面的代码把这些记录写成 XML 文件,存到 ADLS 的一个目标文件夹里。
extract_df.foreach(write_file)
write_file
的定义如下:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
这个笔记本还使用了 spark.write
命令来写一些 parquet 文件,这部分运行得很好。
经过进一步调查,我发现这个问题可能和并行处理有关。作为一个临时解决办法,我尝试了下面的代码:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
这个方法有效,说明连接没有问题,但并行处理没有正常工作。不过,这个方法不能作为长期解决方案,因为会影响性能。
这里有没有人遇到过类似的问题?如果这是和某些配置有关的话,有什么建议可以检查的地方吗?欢迎大家提供意见。谢谢。
1 个回答
0
我尝试了下面的代码:
import os
def write_file_partition(rows):
for row in rows:
with open(row.xml_full_name, "w") as f:
f.write(row.content)
extract_df.foreachPartition(write_file_partition)
write_file_partition(extract_df.collect())
结果:
在上面的代码中,使用了 foreachPartition()
方法,这个方法的作用是对每个数据分区执行一个函数。
我定义了一个函数 write_file_partition()
,这个函数会接收一个数据分区(来自 extract_df 数据框),然后把内容写入文件。
接下来,你可以在 extract_df 数据框上调用 foreachPartition()
方法,并把 write_file_partition()
函数作为参数传进去。这样就会并行地对数据框的每个分区应用这个函数。