将Spark结构流数据帧转换为Pandas数据帧

2024-06-15 15:28:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我建立了一个Spark流媒体应用程序,从Kafka主题中获取数据,我需要使用一些接受熊猫数据帧的API,但是当我试图转换它时,我得到了这个

: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
        at org.apache.spark.sql.dataset.withNewExecutionId(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

这是我的python代码

^{pr2}$

现在我知道我正在创建另一个流数据帧的实例,但是无论我在哪里尝试使用start()和awaitermination(),我都会得到相同的错误。在

有什么想法吗?在


Tags: orgsqlapacheanalysisjavaatsparkexecution
1条回答
网友
1楼 · 发布于 2024-06-15 15:28:25

TL;DR这样的操作根本不起作用。在

Now I am aware I am creating another instance of a streaming Dataframe

好吧,问题是你真的没有。toPandas,在DataFrame上调用创建了一个简单的,本地的,非分布式的熊猫DataFramein memory of the driver node。在

它不仅与Spark无关,而且作为一种抽象,它与结构化流有着内在的不相容性——PandasDataFrame代表一组固定的元组,而结构化流则代表一个无限的元组流。在

目前还不清楚您要实现什么,这可能是XY问题,但如果您确实需要在结构化流中使用Pandas,可以尝试使用pandas_udf-SCALAR和{}变量至少与基本的基于时间的触发器兼容(其他变体也可能受支持,虽然有些组合显然没有任何意义,我也不知道有任何官方的兼容性矩阵)。在

相关问题 更多 >