有很多方法可以将spark数据帧读/写到kafka。我试图阅读卡夫卡主题的信息,并从中创建一个数据框。我能够从主题中获取消息,但无法将其转换为datafame。任何建议都会有帮助
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
consumer = KafkaConsumer('Jim_Topic')
for message in consumer:
data = message
print(data) # Printing the messages properly
df = data.map # am unable to convert it to a dataframe.
我也试过下面的方法
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
低于错误
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
根据您的用例,您可以
用于流式查询
用于批量查询
还要确保添加所需的依赖项:
(替换为您的Spark版本-上面提到的是Spark版本
2.0.2
)相关问题 更多 >
编程相关推荐