为什么我不能用datastaxenterprise运行spark python脚本?

2024-04-25 07:01:11 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我的测试代码,我根本不明白为什么我不能用DSE运行它,但是没有它似乎不是一个问题。在

这是我的python代码:

from future import print_function
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if name == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

当我运行他这样的代码时,它就是不起作用。我不明白为什么? 这是我的DSE错误代码:

^{pr2}$

编辑:

在使用了您的建议之后,现在似乎出现了一个no class def错误:

java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
 at java.lang.Class.getDeclaredMethods0(Native Method)
 at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
 at java.lang.Class.privateGetPublicMethods(Class.java:2902)
 at java.lang.Class.getMethods(Class.java:1615)
 at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:365)
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:317)
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
 at py4j.Gateway.invoke(Gateway.java:251)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 12 more

谢谢你的帮助!在


Tags: kafkalambdafromimportlangsysjavaat
1条回答
网友
1楼 · 发布于 2024-04-25 07:01:11

你看到的错误是它的核心

Caused by: java.lang.NoSuchFieldException: SHUTDOWN_HOOK_PRIORITY

这让我们知道Spark已经在您的类路径上找到了Hadoop2库。DSE4.8不支持Hadoop2,只有hadoop1库。在

你对卡夫卡的使用让我怀疑你在你的CP中包含了spark-kafka-assembly

^{pr2}$

这个jar中包含Hadoop2库,将导致CP问题。尝试对spark-kafka使用非汇编jar,一切都会顺利进行。在

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.4.1

相关问题 更多 >

    热门问题