我正在尝试使用Spark SQL DataFrames和JDBC连接在MySql上插入和更新一些数据。
我已成功使用SaveMode.append插入新数据。有没有办法从Spark SQL更新MySql表中已经存在的数据?
我要插入的代码是:
myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)
如果我更改为SaveMode.Overwrite,它将删除完整的表并创建一个新表,我将查找类似MySql中的“ON DUPLICATE KEY UPDATE”之类的内容
这是不可能的。目前(Spark 1.6.0/2.2.0快照)Spark
DataFrameWriter
只支持四种写入模式:例如,您可以使用
mapPartitions
手动插入(因为您希望UPSERT操作应该是等幂的并且易于实现)、写入临时表并手动执行UPSERT或使用触发器。一般来说,为批处理操作实现upsert行为并保持良好的性能绝非易事。您必须记住,在一般情况下会有多个并发事务(每个分区一个),因此必须确保不会发生写冲突(通常是使用特定于应用程序的分区)或提供适当的恢复过程。实际上,最好执行临时表的批写操作,并直接在数据库中解析upsert部分。
覆盖
org.apache.spark.sql.execution.datasources.jdbc
JdbcUtils.scala
insert into
到replace into
用法:
注意死锁,不要频繁更新数据,只是在紧急情况下重新运行,我想这就是为什么spark不支持这个官方。
遗憾的是,对于像upserting这样非常常见的情况,Spark中没有
SaveMode.Upsert
模式。一般来说,zero322是正确的,但我认为应该有可能(在性能上有所妥协)提供这样的替换功能。
我还想为这个案例提供一些java代码。 当然,它并不像spark内置的那样出色,但它应该是满足您需求的良好基础。根据您的需要修改它:
相关问题 更多 >
编程相关推荐