如何将python代码转换为Spark兼容代码(pyspark)?

2024-04-18 17:41:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个pyspark代码可以从文本。这个代码给了我结果,但是处理我的大数据需要花费大量的时间,因为它的某些部分是更多的Python。请求帮助您将其转换为更多pyspark方式以提高效率(spark环境新手)

articles=sc.textFile("file:///home//XXX//articles.csv").map(lambda line: line.split(","))
articles_ls=list(articles.map(lambda x: [x[0].lower(),x[1].lower(),x[2].lower(),x[3].lower().strip()]).collect())

                 #Function which needs to be optimized to run faster
def mapper(f):
    article_list=[]    
    list1=[]
    list2=[]
    list3=[]
    list4=[]
    list5=[]
    list6=[]
    list7=[]
    for i in range(len(articles_ls)):
        for j in range(len(articles_ls[i])-1):
            comment=re.split(r'\W+', f.lower().strip())
            if articles_ls[i][j] in comment:
                if articles_ls[i][j]:

                    if articles_ls[i][3] == 'typea':
                        if articles_ls[i][j] not in list1:
                            list1.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typeb':
                        if articles_ls[i][j] not in list2:
                            list2.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typec':
                        if articles_ls[i][j] not in list3:
                            list3.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typed':
                        if articles_ls[i][j] not in list4:
                            list4.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typee':
                        if articles_ls[i][j] not in list5:
                            list5.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typef':
                        if articles_ls[i][j] not in list6:
                            list6.append(articles_ls[i][0])

                    if articles_ls[i][3] == 'typeg':
                        if articles_ls[i][j] not in list7:
                            list7.append(articles_ls[i][0])

    list1 = list(set(list1))
    list2 = list(set(list2))
    list3 = list(set(list3))
    list4 = list(set(list4))
    list5 = list(set(list5))
    list6 = list(set(list6))
    list7 = list(set(list7))

    article_list.append([("ProductA:".split())+list1]+[("ProductB:".split())+list2]+[("ProductC:".split())+list3]+\
                         [("ProductD:".split())+list4]+[("ProductE:".split())+list5]+[("ProductF:".split())+list6]+\
                         [("ProductG:".split())+list7])
    return article_list

lines = sc.textFile("file:///home//XXX//data.csv").map(lambda line: line.split(",")).map(lambda x: (x[0],x[1],x[2].encode("ascii", "ignore")))
articles_all = (lines.map(lambda x: (x[0],x[1],x[2],(mapper(x[2].lower())))))

Tags: inifnotlowerlsarticleslistsplit
1条回答
网友
1楼 · 发布于 2024-04-18 17:41:30

当我将数据集加载到spark数据帧而不是将其附加到列表中时,我发现spark的运行速度要快得多。然后,我更倾向于使用功能性更强的样式,而不是for循环。在

相关问题 更多 >