在D上运行Apache Beam Python时出现奇怪的酸洗错误

2024-05-16 22:57:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我们运行的是一个相当简单的作业,它读取JSON,进行一些处理并输出JSON。 由于某些原因,这通常会失败,并出现一个非常奇怪的“酸洗”错误:

PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed [while running 'map to user_activity']

它几乎在处理第一行数据时立即失败。前一阶段发出一个元组(String,[])。当“map to user_activity”阶段运行时,如果尝试迭代[],它将失败。在

没有lambdas,这似乎是这些酸洗错误的常见来源。我们将其缩小到迭代输入元组中的[]。如果我们不迭代,那么作业就会“工作”。一旦我们这样做:

^{pr2}$

作业立即失败。在

****更新**** 事实证明,迭代输入元组并不是关键。Map函数中的任何for循环都会导致崩溃,甚至类似这样的情况:

q=[1,2,3,4,5,6]
for a in q:
  pass

以下是错误的完整堆栈跟踪:

An exception was raised when trying to execute the workitem 4985068250752295797 : Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:3175)
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:3079)
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:2994)
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:2938)
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output (apache_beam/runners/worker/operations.c:5783)
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3622)
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11089)
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11043)
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:10156)
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10458)
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented (apache_beam/runners/common.c:11363)
    raise
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10371)
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process (apache_beam/runners/common.c:6270)
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs (apache_beam/runners/common.c:12500)
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3622)
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11089)
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11043)
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:10156)
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10458)
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented (apache_beam/runners/common.c:11673)
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10371)
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process (apache_beam/runners/common.c:6270)
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs (apache_beam/runners/common.c:12500)
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3588)
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start (apache_beam/runners/worker/operations.c:3808)
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 62, in apache_beam.runners.worker.opcounters.OperationCounters.update_from (apache_beam/runners/worker/opcounters.c:2396)
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 80, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample (apache_beam/runners/worker/opcounters.c:3017)
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:22968)
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:22687)
    self._value_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 260, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:9578)
    self.encode_to_stream(value, out, nested)
  File "apache_beam/coders/coder_impl.py", line 298, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream (apache_beam/coders/coder_impl.c:10416)
    self.fallback_coder_impl.encode_to_stream(value, stream, nested)
  File "apache_beam/coders/coder_impl.py", line 154, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream (apache_beam/coders/coder_impl.c:5883)
    return stream.write(self._encoder(value), nested)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/coders.py", line 437, in <lambda>
    lambda x: dumps(x, HIGHEST_PROTOCOL), pickle.loads)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed [while running 'map to user_activity']

Tags: inpyselfvalueapachelinecommonprocess