给出以下apachespark(Python)代码(它正在工作):
import sys
from random import random
from operator import add
import sqlite3
from datetime import date
from datetime import datetime
from pyspark import SparkContext
def agePartition(recs):
gconn = sqlite3.connect('/home/chris/test.db')
myc = gconn.cursor()
today = date.today()
return_part = []
for rec in recs:
sql = "select birth_date from peeps where name = '{n}'".format(n=rec[0])
myc.execute(sql)
bdrec = myc.fetchone()
born = datetime.strptime(bdrec[0], '%Y-%m-%d')
return_part.append( (rec[0], today.year - born.year - ((today.month, today.day) < (born.month, born.day))) )
gconn.close()
return iter(return_part)
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonDBTEST")
print('starting...')
data = [('Chris', 1), ('Amanda', 2), ('Shiloh', 2), ('Sammy', 2), ('Tim', 1)]
rdd = sc.parallelize(data,5)
rslt_collect = rdd.mapPartitions(agePartition).collect()
for x in rslt_collect:
print("{n} is {a}".format(n=x[0], a=x[1]))
sc.stop()
在总共有8个CPU的两个计算/从节点设置中,是否将每个分区创建为一个任务并分配给2个节点,以便所有5个分区并行运行?如果没有,还需要做些什么来确保这一点?你知道吗
这里的目的是测试保持每个从属工作进程的全局数据库连接处于活动状态,这样就不必为RDD中处理的每个记录重新打开数据库连接。我在本例中使用的是SQLite,但它将是一个SQLCipher数据库,在数据库连接上打开要花费更多的时间。你知道吗
假设集群中有8个可用插槽(CPU)。最多可以同时处理8个分区。在您的例子中,您有5个分区,因此它们都应该并行处理。这将是5个到数据库的并发连接。你知道吗
在您的情况下,它将是每个分区。如果您有20个分区和8个核心,那么仍然会创建20次连接。你知道吗
相关问题 更多 >
编程相关推荐