回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我正在尝试使用Python将apachekafka与apachespark流集成(我对所有这些都是新手)。在</p>
<p>为此,我完成了以下步骤</p>
<ol>
<li>启动Zookeeper</li>
<li>创办了阿帕奇·卡夫卡</li>
<li>在Apache Kafka中添加了主题</li>
<li>已成功使用此命令列出可用主题</li>
</ol>
<blockquote>
<p>bin/kafka-topics.sh --list --zookeeper localhost:2181</p>
</blockquote>
<ol start=“5”>
<li>我从这里取了卡夫卡的字数</li>
</ol>
<p><a href="https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py" rel="nofollow noreferrer">https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py</a></p>
<p>密码是</p>
<pre><code>from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
</code></pre>
<ol start=“6”>
<li>我使用命令执行代码</li>
</ol>
<blockquote>
<p>./spark-submit /root/girish/python/kafkawordcount.py localhost:2181
</p>
</blockquote>
<p>我得到了这个错误</p>
^{pr2}$
<ol start=“7”>
<li>我已经用这个问题的答案更新了执行代码</li>
</ol>
<p><a href="https://stackoverflow.com/questions/29485175/spark-submit-failed-with-spark-streaming-workdcount-python-code">spark submit failed with spark streaming workdcount python code</a></p>
<p>到</p>
<pre><code> ./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
</code></pre>
<p>现在我得到了这个错误</p>
<pre><code>File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'
</code></pre>
<p>请帮忙解决这个问题。在</p>
<p>提前谢谢</p>
<p>PS:我使用的是ApacheSpark1.2</p>