ReadFromKafka抛出值错误:不支持的信号:2

2024-06-16 11:30:40 发布

您现在位置:Python中文网/ 问答频道 /正文

目前,我试图与ApacheKafka一起掌握ApacheBeam的诀窍

Kafka服务正在(本地)运行,我使用Kafka控制台生成器编写了一些测试消息

首先,我编写了这个Java代码片段,用我熟悉的语言测试ApacheBeam。它的工作原理与预期一致

public class Main {

  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create();

    Read<Long, String> kafkaReader = KafkaIO.<Long, String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("beam-test")
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class);

    kafkaReader.withoutMetadata();

    pipeline
        .apply("Kafka", kafkaReader
        ).apply(
          "Extract words", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
          public void processElement(ProcessContext c){
              System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
            }
        })
    );

    pipeline.run();
  }
}

我的目标是用python编写同样的代码,这就是我目前的工作:

def run_pipe():
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p
        | 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'}, topics=['beam-test'])
        | 'Test Print' >> beam.Map(print)
        )

if __name__ == '__main__':
    run_pipe()

现在来谈谈问题。当我尝试运行python代码时,出现以下错误:

(app) λ python ArghKafkaExample.py 
Traceback (most recent call last):
  File "ArghKafkaExample.py", line 22, in <module>
    run_pipe()
  File "ArghKafkaExample.py", line 10, in run_pipe
    (p
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 1028, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 572, in __ror__
    result = p.apply(self, pvalueish, label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 648, in apply
    return self.apply(transform, pvalueish)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 691, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 198, in apply
    return m(transform, input, options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 228, in apply_PTransform
    return transform.expand(input)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 322, in expand
    self._expanded_components = self._resolve_artifacts(
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 120, in __exit__
    next(self.gen)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 372, in _service
    yield stub
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 523, in __exit__
    self._service_provider.__exit__(*args)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 74, in __exit__
    self.stop()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 133, in stop
    self.stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 179, in stop_process
    return super(JavaJarServer, self).stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 143, in stop_process
    self._process.send_signal(signal.SIGINT)
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py", line 1434, in send_signal
    raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2

通过谷歌搜索我发现,这与程序退出代码(如Strg+C)有关,但总体而言,我完全不知道问题出在哪里

任何建议都会有帮助

帕斯卡你好


Tags: inpygitselfapplibpackagesapache
1条回答
网友
1楼 · 发布于 2024-06-16 11:30:40

您的管道代码在这里似乎是正确的。这个问题是由于Python SDK中Kafka IO的要求造成的。从module documentation开始:

These transforms are currently supported by Beam portable runners (for example, portable Flink and Spark) as well as Dataflow runner.

Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline.

Kafka IO在Python中作为Java中的跨语言转换实现,而您的管道正在失败,因为您尚未设置环境以执行跨语言转换。用外行的话来解释什么是跨语言转换:这意味着Kafka转换实际上是在Java SDK而不是Python SDK上执行的,因此它可以利用Java上现有的Kafka代码

有两个障碍阻止管道工作。更容易解决的一个问题是,只有我上面引用的运行程序支持跨语言转换,因此如果您使用Direct runner运行此管道,它将不起作用,您需要切换到本地模式下的Flink或Spark runner

更棘手的障碍是,您需要启动扩展服务,以便能够向管道中添加外部转换。您获得的stacktrace正在发生,因为Beam正在尝试扩展转换,但无法连接到扩展服务,并且扩展失败

如果您仍然想尝试使用跨语言运行此服务,尽管有额外的设置,我链接的文档包含运行扩展服务的说明。在我写这个答案的时候,这个特性还是新的,文档中可能有盲点。如果你遇到问题,我鼓励你在Apache Beam users mailing list or Apache Beam slack channel上提问

相关问题 更多 >