使用foreach rdd和foreach在pysp中迭代rdd

2024-04-29 08:29:38 发布

您现在位置:Python中文网/ 问答频道 /正文

Spark 1.6.1的问题,pyspark

我有流数据

{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}

我有一个名为writeToHBase(rdd)的函数,它向HBase写入数据,要求rdd具有以下结构中的元组:

(rowkey, [rowkey, column-family, key, value])

从输入格式中可以看到,我必须获取原始数据集并遍历所有键,使用send函数调用发送每个键/值对。

阅读spark流式编程指南中的“使用foreachRDD的设计模式”一节http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

似乎建议在数据集外部执行操作时使用foreachRDD。在我的例子中,我想通过网络向HBase写入数据,因此我对流数据使用foreachRDD并调用处理发送数据的函数:

stream.foreachRDD(lambda k: process(k))

我现在对spark函数的理解非常有限,所以我无法找到在原始数据集上迭代以使用write函数的方法。如果是python iterable,我可以这样做:

def process(rdd):
    for key, value in my_rdd.iteritems():
        writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))

在rdd中找到rowkey可以获得的位置

rdd.map(lambda x: x['rowkey'])

如何完成process()在pyspark中的作用?我看到一些例子使用foreach,但我不能让它做我想做的事情。


Tags: 数据key函数原始数据valuecolumnfamilyprocess
1条回答
网友
1楼 · 发布于 2024-04-29 08:29:38

为什么要在writeToHBase函数需要rdd作为参数时在rdd上迭代。只需在流程函数中调用writeToHBase(rdd),就这样。

如果您需要从rdd中获取所有记录,可以调用

def processRecord(record):
        print(record)   
rdd.foreach(processRecord)

在process record函数中,您将得到要处理的单个记录。

相关问题 更多 >