2024-04-24 21:37:23 发布
网友
我知道我们可以用Window function in pyspark来计算累积和。但Window仅在HiveContext中受支持,而在SQLContext中不受支持。我需要使用SQLContext,因为HiveContext不能在多个进程中运行。
使用SQLContext计算累积和有什么有效的方法吗?一个简单的方法是将数据加载到驱动程序的内存中并使用numpy.cumsum,但问题是数据需要能够装入内存
在尝试解决类似问题的线程上登陆后,我已经使用此代码解决了我的问题。不确定我是否缺少OP的一部分,但这是对一个SQLContext列求和的一种方法:
SQLContext
from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql.context import SQLContext sc = SparkContext() sc.setLogLevel("ERROR") conf = SparkConf() conf.setAppName('Sum SQLContext Column') conf.set("spark.executor.memory", "2g") sqlContext = SQLContext(sc) def sum_column(table, column): sc_table = sqlContext.table(table) return sc_table.agg({column: "sum"}) sum_column("db.tablename", "column").show()
不确定这是否是您要查找的内容,但以下是两个如何使用sqlContext计算累积和的示例:
首先,如果要按某些类别对其进行分区:
from pyspark.sql.types import StructType, StringType, LongType from pyspark.sql import SQLContext rdd = sc.parallelize([ ("Tablet", 6500), ("Tablet", 5500), ("Cell Phone", 6000), ("Cell Phone", 6500), ("Cell Phone", 5500) ]) schema = StructType([ StructField("category", StringType(), False), StructField("revenue", LongType(), False) ]) df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("test_table") df2 = sqlContext.sql(""" SELECT category, revenue, sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum FROM test_table """)
输出:
[Row(category='Tablet', revenue=5500, cumsum=5500), Row(category='Tablet', revenue=6500, cumsum=12000), Row(category='Cell Phone', revenue=5500, cumsum=5500), Row(category='Cell Phone', revenue=6000, cumsum=11500), Row(category='Cell Phone', revenue=6500, cumsum=18000)]
当你只想取一个变量的cumsum时。将df2改为:
df2 = sqlContext.sql(""" SELECT category, revenue, sum(revenue) OVER (ORDER BY revenue, category) as cumsum FROM test_table """)
[Row(category='Cell Phone', revenue=5500, cumsum=5500), Row(category='Tablet', revenue=5500, cumsum=11000), Row(category='Cell Phone', revenue=6000, cumsum=17000), Row(category='Cell Phone', revenue=6500, cumsum=23500), Row(category='Tablet', revenue=6500, cumsum=30000)]
希望这有帮助。在收集数据后使用np.cumsum不是很有效,尤其是在数据集很大的情况下。您可以探索的另一种方法是使用简单的RDD转换,如groupByKey(),然后使用map计算每个组按某个键的累积和,然后在最后将其减少。
下面是一个简单的例子:
import pyspark from pyspark.sql import window import pyspark.sql.functions as sf sc = pyspark.SparkContext(appName="test") sqlcontext = pyspark.SQLContext(sc) data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20), ("Cam", "F", "Cambridge", 1, 25), ("Lin", "F", "Cambridge", 1, 25), ("Cat", "M", "Boston", 1, 20), ("Sara", "F", "Cambridge", 1, 15), ("Jeff", "M", "Cambridge", 1, 25), ("Bean", "M", "Cambridge", 1, 26), ("Dave", "M", "Cambridge", 1, 21),], ["name", 'gender', "city", 'donation', "age"]) data.show()
输出
+----+------+---------+--------+---+ |name|gender| city|donation|age| +----+------+---------+--------+---+ | Bob| M| Boston| 1| 20| | Cam| F|Cambridge| 1| 25| | Lin| F|Cambridge| 1| 25| | Cat| M| Boston| 1| 20| |Sara| F|Cambridge| 1| 15| |Jeff| M|Cambridge| 1| 25| |Bean| M|Cambridge| 1| 26| |Dave| M|Cambridge| 1| 21| +----+------+---------+--------+---+
定义窗口
win_spec = (window.Window .partitionBy(['gender', 'city']) .rowsBetween(window.Window.unboundedPreceding, 0))
#window.window.unboundedReceiding——组的第一行 #.rowsBetween(…,0)--0引用当前行,如果指定-2,则在当前行之前最多2行
0
-2
现在,这里有个陷阱:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
有错误:
TypeErrorTraceback (most recent call last) <ipython-input-9-b467d24b05cd> in <module>() ----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec)) /Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self) 238 239 def __iter__(self): --> 240 raise TypeError("Column is not iterable") 241 242 # string methods TypeError: Column is not iterable
这是由于使用了python的sum函数而不是pyspark's。解决这个问题的方法是使用pyspark.sql.functions.sum中的sum函数:
sum
pyspark's
pyspark.sql.functions.sum
temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec)) temp.show()
将给予:
+----+------+---------+--------+---+--------------+ |name|gender| city|donation|age|CumSumDonation| +----+------+---------+--------+---+--------------+ |Sara| F|Cambridge| 1| 15| 1| | Cam| F|Cambridge| 1| 25| 2| | Lin| F|Cambridge| 1| 25| 3| | Bob| M| Boston| 1| 20| 1| | Cat| M| Boston| 1| 20| 2| |Dave| M|Cambridge| 1| 21| 1| |Jeff| M|Cambridge| 1| 25| 2| |Bean| M|Cambridge| 1| 26| 3| +----+------+---------+--------+---+--------------+
在尝试解决类似问题的线程上登陆后,我已经使用此代码解决了我的问题。不确定我是否缺少OP的一部分,但这是对一个
SQLContext
列求和的一种方法:不确定这是否是您要查找的内容,但以下是两个如何使用sqlContext计算累积和的示例:
首先,如果要按某些类别对其进行分区:
输出:
当你只想取一个变量的cumsum时。将df2改为:
输出:
希望这有帮助。在收集数据后使用np.cumsum不是很有效,尤其是在数据集很大的情况下。您可以探索的另一种方法是使用简单的RDD转换,如groupByKey(),然后使用map计算每个组按某个键的累积和,然后在最后将其减少。
下面是一个简单的例子:
输出
定义窗口
#window.window.unboundedReceiding——组的第一行 #.rowsBetween(…,0)--
0
引用当前行,如果指定-2
,则在当前行之前最多2行现在,这里有个陷阱:
有错误:
这是由于使用了python的
sum
函数而不是pyspark's
。解决这个问题的方法是使用pyspark.sql.functions.sum
中的sum
函数:将给予:
相关问题 更多 >
编程相关推荐