正在分析的pyspark udf打印行

2024-04-20 07:49:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pyspark udf函数中遇到了一个问题,我想打印产生问题的行的编号。在

我试图使用Python中“static variable”的等价物来计算行数,这样当用新行调用udf时,计数器就会递增。但是,它不起作用:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

如何计算调用udf的次数,以便找到在pyspark中生成问题的行数?在


Tags: 函数importinputsqldefas计数器static
1条回答
网友
1楼 · 发布于 2024-04-20 07:49:12

udf是在worker上执行的,因此它们内部的print语句不会显示在输出(来自驱动程序)中。处理UDF问题的最佳方法是将UDF的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串res中。在

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line {}".format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

相关问题 更多 >