通过PySpark将Kafka数据摄取到HBase调用时出错无.org.apache.spark.streaming.api.java.JavaStreamingContex文件

2024-06-01 05:08:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试根据this tutorial通过PySpark配置实时Kafka数据摄取到HBase中。我有一个火花流的问题,换句话说,我得到这样一个错误:

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.NullPointerException
    at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

spark --version返回version 2.4.0 Using Scala version 2.11.12。如果需要其他信息,请告诉我。我的源代码:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /my/path/spark/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'

import findspark
findspark.init()
import pyspark
import random
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import  *;
# from pyspark_ext import *
import happybase

appName = "Kafka_MapR-Streams_to_HBase"
config = SparkConf().setAppName(appName)  

props = []
props.append(("spark.rememberDuration", "10"))
props.append(("spark.batchDuration", "10"))
props.append(("spark.eventLog.enabled", "true"))
props.append(("spark.streaming.timeout", "30"))
props.append(("spark.ui.enabled", "true"))

config = config.setAll(props)

sc.stop()
sc = SparkContext(conf=config)
sc.stop()
ssc = StreamingContext(sc, int(config.get("spark.batchDuration")))

def runApplication(ssc, config):
  ssc.start()
  if config.get("spark.streaming.timeout") == '':
    ssc.awaitTermination()
  else:
    stopped = ssc.awaitTerminationOrTimeout(int(config.get("spark.streaming.timeout")))
  if not stopped :
    print("Stopping streaming context after timeout...")
    ssc.stop(True)
    print("Streaming context stopped.")

hbase_table = 'clicks'
hconn = happybase.Connection('hostname')  
ctable = hconn.table(hbase_table)

更新

我认为这个问题与sc.stop()有关。去掉这个并将sc = SparkContext(conf=config)改为SparkContext.getOrCreate(conf=config)可能解决了这个问题。你知道吗


Tags: fromimportconfigjavapropsatsparkpyspark