数据流管道在ParDo上返回我自己的一个类时引发PicklingError

2024-05-16 13:46:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个管道,如下所示:

import base64
import gzip
import logging
import apache_beam as beam

import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError

class GetClearMessage(beam.DoFn):
    def process(self, element, **kwargs):
        """ Parse encoded proto 
        Returns an instance of EntryPoint decoded.
        """
        logging.info('Unserializing data')
        logging.info(element)
        batch_entry_point = common_pb2.BatchEntryPoint()
        data = element.data
        logging.info(data)
        try:
            batch_entry_point.ParseFromString(data)
        except DecodeError:
            unziped = gzip.decompress(data)
            batch_entry_point.ParseFromString(unziped)
        logging.info(batch_entry_point)
        return [batch_entry_point]

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='projects/production-213911/subscriptions/ps-to-bq-console',
        with_attributes=True)
    )

    clear_message = console_message | beam.ParDo(GetClearMessage())
    gcloud_id = console_message | beam.ParDo(GetGcloudId())
    registry = console_message | beam.ParDo(GetTableData())
    #clear_message | beam.ParDo(Test())

我删除了一些类,因为不需要理解这个问题

在数据流上运行管道时,经常会出现以下错误:

Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-4468']

请参阅下面的完整堆栈跟踪

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed

During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
        response = task()
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
        lambda: self.create_worker().do_instruction(request), request)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
        getattr(request, request_type), request.instruction_id)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
        bundle_processor.process_bundle(instruction_id))
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
        element.data)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
        self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
        raise exc.with_traceback(traceback)
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
      File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
      File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
      File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
      File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
      File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
        lambda x: dumps(x, protocol), pickle.loads)
    _pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-314']

但正如您可能在GetClearMessage中看到的,我记录了一些数据,当我查看我的日志以了解这个特定步骤时,当我记录batch\u entry\u point时,每件事情似乎都很好。batch\u entry\u point是BatchEntryPoint的一个实例,该类导致了问题

知道是什么导致了这种行为吗


编辑

我试图在ParDo(GetClearMessage)的结果上添加一个步骤,但这个步骤从未达到。因此,我猜pickle错误是因为我想返回BatchEntryPoint的一个实例

我不理解这种行为,您知道如何修复它吗?
谢谢


Tags: inpyimportapachelinecommonprocessfile
1条回答
网友
1楼 · 发布于 2024-05-16 13:46:52

我没有解决这个问题,但我找到了一个解决方法,即不返回批处理入口点,而是返回其中的每个元素,如下所示:

        for i in batch_entry_point.entrypoints:
        logging.info(i)
        obj['proto'] = i
        yield obj

然后,管道的下一步将处理每个元素

相关问题 更多 >