将函数传递给spark,spark使用pysp读取S3文件

2024-04-16 19:35:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我在s3中有GBs的数据,并试图在读取代码时通过引用下面的Link来实现并行性。你知道吗

我使用以下代码作为示例,但在运行时,会出现以下错误:

任何帮助这是非常感谢,因为我是非常新的火花。你知道吗

编辑:我必须使用并行性来读取我的s3文件,这在任何帖子中都没有解释。请先阅读问题。

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

class telco_cn:
    def __init__(self, sc):
        self.sc = sc

    def decode_module(msg):
        df=spark.read.json(msg)
        return df

    def consumer_input(self, sc, k_topic):
        a = sc.parallelize(['s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json'])
        d = a.map(lambda x: telco_cn.decode_module(x)).collect()
        print (d)
if __name__ == "__main__":
    cn = telco_cn(sc)
    cn.consumer_input(sc, '')

Tags: 代码selfthats3ondefnotmsg
1条回答
网友
1楼 · 发布于 2024-04-16 19:35:08

您正试图从RDD上的map操作中调用spark.read.json。由于此映射操作将在Spark的executor/worker节点上执行,因此不能在映射中引用SparkContext/SparkSession变量(在Spark驱动程序上定义)。这就是错误消息试图告诉您的内容。你知道吗

为什么不直接打电话给df=spark.read.json('s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json')?你知道吗

相关问题 更多 >