如何使用Python在Spark中执行两个RDD表的基本连接?

2024-04-23 15:23:22 发布

您现在位置:Python中文网/ 问答频道 /正文

使用python如何在Spark中执行基本连接?在R中,可以使用merg()来执行此操作。spark上使用python的语法是什么:

  1. 内部连接
  2. 左外连接
  3. 交叉连接

有两个表(RDD),每个表中有一个具有公共键的列。

RDD(1):(key,U)
RDD(2):(key,V)

我认为内部连接是这样的:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

对吗?我在网上搜索了一下,找不到一个很好的连接示例。提前谢谢。


Tags: key示例map语法ls交叉sparkcase
1条回答
网友
1楼 · 发布于 2024-04-23 15:23:22

它可以使用PairRDDFunctions或Spark数据帧来完成。由于数据帧操作从Catalyst Optimizer中受益,第二个选项值得考虑。

假设您的数据如下所示:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

带pairdds:

内部连接:

rdd1.join(rdd2)

左外部连接:

rdd1.leftOuterJoin(rdd2)

笛卡尔积(不需要RDD[(T, U)]):

rdd1.cartesian(rdd2)

广播连接(不需要RDD[(T, U)]):

最后还有cogroup,它没有直接的SQL等价物,但在某些情况下可能有用:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

带Spark数据帧

可以使用SQL DSL,也可以使用sqlContext.sql执行原始SQL。

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use sqlContext.sql
df1.createTempView('df1')
df2.createTempView('df2')

内部连接:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner') 
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

左外部连接:

df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

交叉连接(Spark中需要显式交叉连接或配置更改。2.0-spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')

df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')

因为1.6(Scala中是1.5),所以每一个都可以与broadcast函数组合:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), df1.k == df2.k)

执行广播连接。另请参见Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark

相关问题 更多 >