如何使用pyspark连接到运行在gcp上的kerberossecured kafka集群?

2024-04-16 11:47:40 发布

您现在位置:Python中文网/ 问答频道 /正文

所以在我开始之前,我想说,我知道这个问题可能看起来是重复或重复的,但现有的答案与我的要求无关。它们主要用于java和scala,但对于python只有一点点。你知道吗

所以我有一个kerberos安全的kafka集群在gcp上运行。我用kafkapython包创建了producer和consumer,检查了两次,运行得非常好。你知道吗

但是当我尝试使用pyspark用我的spark应用程序连接到集群时,它不起作用。我的spark应用程序看起来像本文件:你知道吗

def application(topic, batchTime, appName, **kwargs):
    import os

    try:
        os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars <full-path-to-spark>/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark-shell'

        conf = SparkConf().setAppName(appName).setMaster('local[*]') 
        sc = SparkContext(conf=conf)

        stream_context = StreamingContext(sparkContext=sc, batchDuration=batchTime)
        kafka_stream = KafkaUtils.createDirectStream(ssc=stream_context, topics=[topic], 
                                            kafkaParams={"metadata.broker.list":"broker1:9092",
                                                    "ssl.context": 'context',
                                                    'sasl.plain.username': '****',
                                                    'sasl.plain.password': '*********',
                                                    'sasl.mechanism': 'PLAIN',
                                                    'security.protocol': "SASL_PLAINTEXT"})

        lines = kafka_stream.map(lambda x: json.loads(x[1]))
        final_obj = lines.map(lambda line: SparkHelper.get_app_type(line, line['app_type']))
        final_obj.foreachRDD(handler)

        final_obj.pprint()

当我运行这个时,它显示错误例如:你知道吗

19/11/26 19:11:59 WARN Utils: Your hostname, openstack-inspiron-3543 resolves to a loopback address: 127.0.1.1; using 10.10.0.25 instead (on interface wlp6s0)
19/11/26 19:11:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/26 19:11:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/11/26 19:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.mechanism is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.password is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.username is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property security.protocol is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property ssl.context is not valid

控制台在那里卡住了。它不会终止也不会运行任何东西。是的,只有当本地卡夫卡集群运行时才会发生这种情况。如果我关闭它并运行spark应用程序,它会显示“NoBrokerAvailable”。这不应该发生对吗?当我尝试连接到不同的集群时,本地kafka集群是否正在运行并不重要。你知道吗

我试着安装一个不同的spark stream kafka程序集jar文件,但没有一个能正常工作。我从https://jar-download.com/?search_box=spark-streaming-kafka-assembly下载了jar文件。你知道吗

我目前使用的版本是。。。你知道吗

火花:2.4.4, Hadoop:2.7版本, api\ U版本:0.10

我正在使用confluent platform 5.3.1运行kafka集群。你知道吗

我不明白问题出在哪里。请指出任何错误,我正在做或如果没有什么我应该改变,使这项工作。感谢您提前回复!!你知道吗


Tags: kafkatostreamiscontextnot集群property