这是我的测试代码,我根本不明白为什么我不能用DSE运行它,但是没有它似乎不是一个问题。在
这是我的python代码:
from future import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if name == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
当我运行他这样的代码时,它就是不起作用。我不明白为什么? 这是我的DSE错误代码:
^{pr2}$编辑:
在使用了您的建议之后,现在似乎出现了一个no class def错误:
java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetPublicMethods(Class.java:2902)
at java.lang.Class.getMethods(Class.java:1615)
at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:365)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:317)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:251)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 12 more
谢谢你的帮助!在
你看到的错误是它的核心
这让我们知道Spark已经在您的类路径上找到了Hadoop2库。DSE4.8不支持Hadoop2,只有hadoop1库。在
你对卡夫卡的使用让我怀疑你在你的CP中包含了
^{pr2}$spark-kafka-assembly
这个jar中包含Hadoop2库,将导致CP问题。尝试对
spark-kafka
使用非汇编jar,一切都会顺利进行。在http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.4.1
相关问题 更多 >
编程相关推荐