基于Sp中另一个RDD的过滤器

2024-05-13 05:02:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我只保留第二个表中引用的具有部门ID的员工。

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

我尝试了以下不起作用的代码:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

有什么想法吗?我在Python中使用Spark 1.1.0。不过,我会接受Scala或Python的答案。


Tags: idtable员工employee部门departmentsmithsc
3条回答

我终于用join实现了一个解决方案。为了避免Spark出现异常,我必须给部门加上一个0值:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]

在多个列中筛选多个值:

如果要从数据库中提取数据(本例中是Hive或SQL类型的db),并且需要在多个列上进行筛选,则可能更容易使用第一个筛选加载表,然后通过RDD迭代筛选(鼓励使用多个小迭代的Spark编程方法):

{
    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")

}

当然,为了筛选正确的值,您需要稍微了解数据,但这是分析过程的一部分。

在这种情况下,您希望实现的是在每个分区上使用department表中包含的数据进行筛选: 这将是基本的解决方案:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

如果您的部门数据很大,广播变量将通过将数据一次传递到所有节点来提高性能,而不必将其与每个任务序列化

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

尽管使用join可以工作,但这是一个非常昂贵的解决方案,因为它需要分布式数据洗牌(byKey)来实现join。考虑到需求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

相关问题 更多 >