<p>让我们从示例数据开始:</p>
<pre><code>df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))
</code></pre>
<p>请注意,我已将<code>count</code>更改为<code>cnt</code>。在大多数SQL方言中,Count是一个保留关键字,它不是列名的好选择。</p>
<p>至少有两种方法可以重塑此数据:</p>
<ul>
<li><p>通过数据帧聚合</p>
<pre><code>from pyspark.sql.functions import col, when, max
majors = sorted(df.select("major")
.distinct()
.map(lambda row: row[0])
.collect())
cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m)
for m in majors]
maxs = [max(col(m)).alias(m) for m in majors]
reshaped1 = (df
.select(col("a"), *cols)
.groupBy("a")
.agg(*maxs)
.na.fill(0))
reshaped1.show()
## +---+---+---+---+---+---+
## | a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## | a| 1| 1| 2| 3| 0|
## | b| 4| 1| 2| 0| 0|
## | c| 3| 0| 4| 5| 0|
## | d| 6| 1| 2| 3| 4|
## | e| 4| 5| 1| 1| 1|
## +---+---+---+---+---+---+
</code></pre></li>
<li><p>RDD上的<code>groupBy</code></p>
<pre><code>from pyspark.sql import Row
grouped = (df
.map(lambda row: (row.a, (row.major, row.cnt)))
.groupByKey())
def make_row(kv):
k, vs = kv
tmp = dict(list(vs) + [("a", k)])
return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})
reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))
reshaped2.show()
## +---+---+---+---+---+---+
## | a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## | a| 1| 1| 2| 3| 0|
## | e| 4| 5| 1| 1| 1|
## | c| 3| 0| 4| 5| 0|
## | b| 4| 1| 2| 0| 0|
## | d| 6| 1| 2| 3| 4|
## +---+---+---+---+---+---+
</code></pre></li>
</ul>