如何以编程方式将Kafka主题加载并流式传输到PySpark数据帧

2024-04-19 11:09:51 发布

您现在位置:Python中文网/ 问答频道 /正文

有很多方法可以将spark数据帧读/写到kafka。我试图阅读卡夫卡主题的信息,并从中创建一个数据框。我能够从主题中获取消息,但无法将其转换为datafame。任何建议都会有帮助

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

consumer = KafkaConsumer('Jim_Topic')

for message in consumer:
    data = message
    print(data) # Printing the messages properly
    df = data.map # am unable to convert it to a dataframe.

我也试过下面的方法

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Jim_Topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

低于错误

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;


Tags: kafkatheto数据方法fromimportdf
1条回答
网友
1楼 · 发布于 2024-04-19 11:09:51

根据您的用例,您可以

  1. 要么create a Kafka source for streaming queries
  2. create a Kafka source of batch queries

用于流式查询

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]

用于批量查询

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]

还要确保添加所需的依赖项:

org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.2

(替换为您的Spark版本-上面提到的是Spark版本2.0.2

相关问题 更多 >