下面是连接到SQL server并将1个表保存到CSV格式文件的工作代码。在
conf = new SparkConf().setAppName("test").setMaster("local").set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
sqlContext = new SQLContext(sc)
df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://DBServer:PORT").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxx").option("password","xxxx").load()
df.registerTempTable("test")
df.write.format("com.databricks.spark.csv").save("poc/amitesh/csv")
exit()
我有一个场景,在这个场景中,我必须通过pyspark代码以CSV格式将同一数据库中的4个表保存在4个不同的文件中。我们能达到目标吗?或者,这些拆分是在HDFS块大小级别进行的,所以如果您有一个300mb的文件,并且HDFS块大小设置为128,那么您将分别得到128mb、128mb和44mb的3个块?在
必须为数据库中的每个表编写一个转换(读写)(使用
sqlContext.read.format
)。在特定于表的ETL管道之间的唯一区别是每个表有不同的
dbtable
选项。一旦你有了一个数据帧,保存到它自己的CSV文件。在代码可以如下所示(在Scala中,因此我将其转换为Python作为家庭练习):
对每个要保存到CSV的表重复相同的代码。在
完成!
100桌案例-公平安排
解决方案需要另一个:
位于
SparkSession
后面的SparkContext
是线程安全的,这意味着您可以从多个线程使用它。如果你考虑每个表有一个线程,这是正确的方法。在你可以生成尽可能多的线程,比如说100个,然后启动它们。然后Spark可以决定什么时候执行。在
这是Spark使用Fair Scheduler Pools做的事情。Spark的这一特性并不广为人知,但在本案例中值得考虑:
使用它,您的加载和保存管道可能会更快。在
相关问题 更多 >
编程相关推荐