使用Python将Apache Kafka与Apache Spark Streaming集成

2024-03-29 11:37:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用Python将apachekafka与apachespark流集成(我对所有这些都是新手)。在

为此,我完成了以下步骤

  1. 启动Zookeeper
  2. 创办了阿帕奇·卡夫卡
  3. 在Apache Kafka中添加了主题
  4. 已成功使用此命令列出可用主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

  1. 我从这里取了卡夫卡的字数

https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py

密码是

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
  1. 我使用命令执行代码

./spark-submit /root/girish/python/kafkawordcount.py localhost:2181

我得到了这个错误

^{pr2}$
  1. 我已经用这个问题的答案更新了执行代码

spark submit failed with spark streaming workdcount python code

 ./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar  /root/girish/python/kafkawordcount.py localhost:2181 <topic name>

现在我得到了这个错误

File "/root/girish/python/kafkawordcount.py", line 28, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
    jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
  File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'

请帮忙解决这个问题。在

提前谢谢

PS:我使用的是ApacheSpark1.2


Tags: kafkalambdafrompyimporttopicbinsys
2条回答

通过使用ApacheSpark1.3解决了这个问题,它比1.2版本对Python有更好的支持

面临同样的问题,通过添加kafka组装包解决了这个问题

bin/spark-submit   packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 ~/py/sparkjob.py

根据你的spark和kafka版本使用。在

相关问题 更多 >