序列化Spark RDD并在Python中读取

5 投票
3 回答
11370 浏览
提问于 2025-05-10 15:44

我正在尝试通过“腌制”(也就是序列化)一个Spark的RDD,然后直接把这个腌制好的文件读入Python。

a = sc.parallelize(['1','2','3','4','5'])
a.saveAsPickleFile('test_pkl')

接着,我把这些test_pkl文件复制到我的本地电脑上。我该如何直接在Python中读取它们呢?当我尝试用普通的pickle包读取'test_pkl'的第一个腌制部分时,它失败了:

pickle.load(open('part-00000','rb'))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib64/python2.6/pickle.py", line 1370, in load
    return Unpickler(file).load()
  File "/usr/lib64/python2.6/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib64/python2.6/pickle.py", line 970, in load_string
    raise ValueError, "insecure string pickle"
ValueError: insecure string pickle

我猜Spark使用的腌制方法和Python的pickle方法不太一样(如果我错了请纠正我)。有没有办法让我从Spark中腌制数据,并直接从文件中在Python里读取这个腌制好的对象呢?

相关文章:

  • 暂无相关问题
暂无标签

3 个回答

1

问题在于这个文件格式不是一个pickle文件。它是一个包含了被“腌制”的对象的SequenceFile。这个序列文件可以在Hadoop和Spark环境中打开,但并不适合在Python中使用,并且它使用的是基于JVM的序列化方式来处理数据,在这个例子中是一个字符串列表。

1

一种更好的方法可能是把每个部分的数据进行序列化(也就是“腌制”),然后编码后写入一个文本文件:

import cPickle
import base64

def partition_to_encoded_pickle_object(partition):
    p = [i for i in partition] # convert the RDD partition to a list
    p = cPickle.dumps(p, protocol=2) # pickle the list
    return [base64.b64encode(p)] # base64 encode the list, and return it in an iterable

my_rdd.mapPartitions(partition_to_encoded_pickle_object).saveAsTextFile("your/hdfs/path/")

在你把文件下载到本地目录后,可以使用以下代码来读取它:

# you first need to download the file, this step is not shown
# afterwards, you can use 
path = "your/local/path/to/downloaded/files/"
data = []
for part in os.listdir(path):
    if part[0] != "_": # this prevents system generated files from getting read - e.g. "_SUCCESS"
        data += cPickle.loads(base64.b64decode((open(part,'rb').read())))
3

可以通过使用 sparkpickle 这个项目来实现。操作非常简单,如下所示:

with open("/path/to/file", "rb") as f:
    print(sparkpickle.load(f))

撰写回答