无法在pyspark中应用udf

2024-05-14 08:45:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用pandas_udf将python函数应用于数据帧。这就是功能:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

我在udf中的每个系列中应用函数extract_text,如下所示:

extract_text_udf = f.pandas_udf(lambda s : s.apply(udfs.extract_text), t.StringType())
df = df.withColumn("texto", extract_text_udf(f.col("html_raw")))

然后我得到以下错误:

Traceback (most recent call last):
  File "process_info.py", line 70, in <module>
    row_count = info_df.count()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 523, in count
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o123.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3 in stage 2.0 (TID 2327, ip-10-2-6-163.eu-west-1.compute.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 254, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

如何使用pandas\u udf将函数extract\u text应用于我的数据帧


Tags: inpyhadoopapplicationcontainerlineziparray

热门问题