df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use sqlContext.sql
df1.createTempView('df1')
df2.createTempView('df2')
内部连接:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
左外部连接:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
它可以使用
PairRDDFunctions
或Spark数据帧来完成。由于数据帧操作从Catalyst Optimizer中受益,第二个选项值得考虑。假设您的数据如下所示:
带pairdds:
内部连接:
左外部连接:
笛卡尔积(不需要
RDD[(T, U)]
):广播连接(不需要
RDD[(T, U)]
):最后还有
cogroup
,它没有直接的SQL等价物,但在某些情况下可能有用:带Spark数据帧
可以使用SQL DSL,也可以使用
sqlContext.sql
执行原始SQL。内部连接:
左外部连接:
交叉连接(Spark中需要显式交叉连接或配置更改。2.0-spark.sql.crossJoin.enabled for Spark 2.x):
因为1.6(Scala中是1.5),所以每一个都可以与
broadcast
函数组合:执行广播连接。另请参见Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark
相关问题 更多 >
编程相关推荐