有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java迭代图形框架聚合达到内存限制的消息

我正在使用GraphFrame的aggregateMessages功能构建一个定制的聚类算法。我在一个小样本数据集(约100项)上测试了这个算法,并验证了它的有效性。但当我在50k个项目的真实数据集上运行时,经过10次迭代后,我得到了OOM错误。有趣的是,前几次迭代在几分钟内完成,mem是正常范围。在第6次迭代之后,mem的使用量逐渐增加到约30GB,并最终爆炸。我在一个2节点群集上运行这个程序,16核,32GB

由于这是一个迭代算法,而且每次迭代后内存只会增加,我想知道是否需要以某种方式释放内存。我在循环的末尾添加了非持久性块,但这并没有帮助

我还可以使用其他效率吗?在迭代环境中使用图形框架是否有最佳实践

我注意到的另一件事是,在executor页面的spark UI上,使用了约300MB的“存储内存”,但spark进程实际上占用了约30GB。不确定这是否是内存泄漏enter image description here

while ( true ) {
    
    System.out.println("["+new Date()+"] Running " + i);
    Dataset<Row> lastRoutesDs = groups;
    Dataset<Row> groupUnwind = groups.withColumn("id", explode(col("routeItems")));

    GraphFrame gf = new GraphFrame(groupUnwind, edgesDs);
    
    Dataset<Row> lvl1 = gf.aggregateMessages()
            .sendToSrc(when(
                    callUDF("contains_in_array_str", AggregateMessages.dst().getField("routeItems"),
                            AggregateMessages.src().getField("id")).equalTo(false),
                    struct(AggregateMessages.dst().getField("routeItems").as("routeItems"),
                            AggregateMessages.dst().getField("routeScores").as("routeScores"),
                            AggregateMessages.dst().getField("grpId").as("grpId"),
                            AggregateMessages.dst().getField("grpScore").as("grpScore"),
                            AggregateMessages.edge().getField("score").as("edgeScore"))))
            .agg(collect_set(AggregateMessages.msg()).as("incomings"))
            .withColumn("inItem", explode(col("incomings")))
            .groupBy("id", "inItem.grpId")
            .agg(first("inItem.routeItems").as("routeItems"), first("inItem.routeScores").as("routeScores"),
                    first("inItem.grpScore").as("grpScore"), collect_list("inItem.edgeScore").as("inScores"))
            .groupBy("grpId")
            .agg(bestRouteAgg.apply(col("routeItems"), col("routeScores"), col("inScores"), col("grpScore"),
                    col("id"), col("grpScore")).as("best"))
            .withColumn("newScore", callUDF("calcRouteScores", expr("size(best.routeItems)+1"),
                    col("best.routeScores"), col("best.inScores")))
            .withColumn("edgeCount", expr("size(best.routeScores)"))
            .persist(StorageLevel.MEMORY_AND_DISK());
    
    lvl1
            .filter("newScore > " + groupMaxScore)
            .withColumn("itr", lit(i))
            .select("grpId", "best.routeItems","best.routeScores", "best.grpScore", "edgeCount", "itr")
            .write()
            .mode(SaveMode.Append)
            .json(workspaceDir + "clusters-rank-collect");

    if (lvl1.count() == 0) {
        System.out.println("****** End reached " + i);
        break;
    }

    
    Dataset<Row> newGroups = lvl1.filter("newScore <= " + groupMaxScore)
            .withColumn("routeItems_new",
                    callUDF("merge2Array", col("best.routeItems"), array(col("best.newNode"))))
            .withColumn("routeScores_new",
                    callUDF("merge2ArrayDouble", col("best.routeScores"), col("best.inScores")))
            .select(col("grpId"), col("routeItems_new").as("routeItems"),
                    col("routeScores_new").as("routeScores"), col("newScore").as("grpScore"));
    
    if (i > 0 && (i % 2) == 0) {
        newGroups = newGroups
                .checkpoint();
    }
    
    newGroups = newGroups
            .persist(StorageLevel.DISK_ONLY());
    System.out.println( newGroups.count() );
    
    groups.unpersist();
    lastRoutesDs.unpersist();
    groupUnwind.unpersist();
    lvl1.unpersist();
    
    groups = newGroups;
    
    i++;
}
    

共 (0) 个答案