Spark 1.6.1的问题,pyspark
我有流数据
{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}
我有一个名为writeToHBase(rdd)的函数,它向HBase写入数据,要求rdd具有以下结构中的元组:
(rowkey, [rowkey, column-family, key, value])
从输入格式中可以看到,我必须获取原始数据集并遍历所有键,使用send函数调用发送每个键/值对。
阅读spark流式编程指南中的“使用foreachRDD的设计模式”一节http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13
似乎建议在数据集外部执行操作时使用foreachRDD。在我的例子中,我想通过网络向HBase写入数据,因此我对流数据使用foreachRDD并调用处理发送数据的函数:
stream.foreachRDD(lambda k: process(k))
我现在对spark函数的理解非常有限,所以我无法找到在原始数据集上迭代以使用write函数的方法。如果是python iterable,我可以这样做:
def process(rdd):
for key, value in my_rdd.iteritems():
writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))
在rdd中找到rowkey可以获得的位置
rdd.map(lambda x: x['rowkey'])
如何完成process()在pyspark中的作用?我看到一些例子使用foreach,但我不能让它做我想做的事情。
为什么要在writeToHBase函数需要rdd作为参数时在rdd上迭代。只需在流程函数中调用
writeToHBase(rdd)
,就这样。如果您需要从rdd中获取所有记录,可以调用
在process record函数中,您将得到要处理的单个记录。
相关问题 更多 >
编程相关推荐