在Jupy运行kafkastream和JAR artifact

2024-03-29 04:49:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用pyspark编写一个简单的python脚本来流化来自Kafka的消息,而我正在使用jupyter。

我收到一条错误消息,说Spark Streaming's Kafka libraries not found in class path(下面是更多详细信息)。为了避免这个问题,我将@tshilidzi mudau建议的解决方案包含在previous post(并在docs中得到确认)。我该怎么做才能修复这个bug?

按照错误提示中的建议,我下载了工件的JAR,将其存储在$SPARK_HOME/jars中,并在代码中包含了引用。

代码如下:

import os
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-10-assembly_2.10-2.2.2.jar pyspark-shell' #note that the "pyspark-shell" part is very important!!.

    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")

    try:
        sc.stop()
    except:
        pass

    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()

这是一个错误:

^{pr2}$
TypeError                                 Traceback (most recent call last)
<ipython-input-9-34de7dbdfc7c> in <module>()
     13 ssc = StreamingContext(sc,1)
     14 broker = "<my_broker_ip>"
---> 15 directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"], {"metadata.broker.list": broker})
     16 directKafkaStream.pprint()
     17 ssc.start()

/opt/spark/python/pyspark/streaming/kafka.pyc in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
    120             return messageHandler(m)
    121 
--> 122         helper = KafkaUtils._get_helper(ssc._sc)
    123 
    124         jfromOffsets = dict([(k._jTopicAndPartition(helper),

/opt/spark/python/pyspark/streaming/kafka.pyc in _get_helper(sc)
    193     def _get_helper(sc):
    194         try:
--> 195             return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
    196         except TypeError as e:
    197             if str(e) == "'JavaPackage' object is not callable":

TypeError: 'JavaPackage' object is not callable

Tags: kafkainfromimporthelperconf错误broker