我正在尝试用python构建一个流束管道,它应该捕获来自kafka的消息,然后执行从其他源获取数据和聚合的进一步阶段。 到目前为止,我所建立的一步一步的过程是:
在本地主机上运行Kafka实例:9092
./bin/kafka-server-start.sh ./config/server.properties
使用docker运行beam flink作业服务器
docker run --net=host apache/beam_flink1.10_job_server:latest
下梁式卡夫卡管道
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
if __name__ == '__main__':
options = PipelineOptions([
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
"--streaming",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
])
options = options.view_as(StandardOptions)
options.streaming = True
pipeline = beam.Pipeline(options=options)
result = (
pipeline
| "Read from kafka" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": 'localhost:9092',
},
topics=['mytopic'],
expansion_service='localhost:8097',
)
| beam.Map(print)
)
pipeline.run()
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
>tryme
发布此试用消息后,beam管道会感知到该消息,但由于出现以下错误而崩溃:
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:138)
at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:84)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:516)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForElementAndRestriction(FnApiDoFnRunner.java:838)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSizedElementAndRestriction(FnApiDoFnRunner.java:808)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$200(FnApiDoFnRunner.java:132)
at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:226)
at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:223)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:204)
at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:155)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
如果您尚未解决此问题,则发布了一条带有null键值的消息,这会导致Kafka抛出异常并导致作业失败
可以使用kafka命令行工具将消息发布为键值对,如下所示:
相关问题 更多 >
编程相关推荐