我正在创建一个spark数据帧,通过使用spark(批处理,而不是流)从kafka主题读取数据spark.数据帧是如下所示的字符串格式。你知道吗
根 |--值:字符串(nullable=true)
+--------------------+
|value |
+--------------------+
|"1,Visa,6574" |
|"3,Visa,6574" |
|"4,MasterCard,6574" |
|"5,MasterCard,6574" |
|"8,Maestro,8372" |
+--------------------+
我尝试使用','分隔符分割数据帧记录,并形成新的数据帧,我可以将数据传输给cassandra。你知道吗
创建sparkDF如下。你知道吗
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "earliest") \
.load()
df2=df.selectExpr("CAST(value AS STRING)")
df2.printSchema()
我试着用','分割数据。你知道吗
split_col=split(df2['value'],',')
df3=df2.withColumn('Name1',split_col.getItem(0))
df3=df2.withColumn('Name2',split_col.getItem(1))
df3=df2.withColumn('Name3',split_col.getItem(2))
上面的代码没有给出预期的结果,我被放在像
根 |--值:字符串(nullable=true) |--Name3:字符串(nullable=true)
+-------------------+-----+
|value |Name3|
+-------------------+-----+
|"1,Visa,6574" |6574"|
|"3,Visa,6574" |6574"|
|"4,MasterCard,6574"|6574"|
|"5,MasterCard,6574"|6574"|
|"8,Maestro,8372" |8372"|
+-------------------+-----+
我想得到这样的结果:
+-------------------+----------+------+
|Name1 |Name2 |Name3 |
+-------------------+----------+------+
| 1 |Visa |6574 |
| 3 |Visa |6574 |
| 4 |MasterCard|6574 |
| 5 |MasterCard|6574 |
| 8 |Maestro |8372 |
+-------------------+----------+------+
请帮忙!!你知道吗
你的解决方案很好。唯一的问题是在执行拆分并用于下一步之后
df2
和df3
的赋值。在执行第一次拆分后,您将分配给df3
,但对于后续拆分,您仅使用df2
。因此,spark只对第三个split语句求值。你知道吗解决方案是在最后一次拆分之前不要赋给新变量
或者在下一次拆分中使用指定的变量(除非必要,否则不鼓励使用这种方式)
相关问题 更多 >
编程相关推荐