有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用Java流使用数据库游标

我想使用Java流使用数据库游标。我希望Java流能够根据需要获取和处理行,并避免先在内存中加载所有500万行,然后再处理它们

是否可以在不将整个表加载到RAM的情况下使用它

到目前为止,我的代码看起来像:

Cursor<Product> products = DAO.selectCursor(...);

// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. Processing each row
  ...
}
// 3. Concluding (processing totals, stats, etc.)
double avg = total / count;
...

它确实工作得很好,但有点麻烦,我想利用流API


共 (1) 个答案

  1. # 1 楼答案

    首先,我们必须讨论如何从数据库中获取数据。如果您的目的是查看大量记录,并且不想在内存中同时加载所有记录,那么有两种选择:

    1. 将结果分页
    2. 让你的司机把结果分页

    如果已经有一个基于Cursor的迭代器,可以根据需要检索分页数据,那么可以使用JDK API中的SpliteratorsStreamSupport实用程序类将其转换为Stream

    Stream<Product> products = StreamSupport.stream(
                    Spliterators.spliteratorUnknownSize(cursor.iterator(),
                            Spliterator.NONNULL |
                                    Spliterator.ORDERED |
                                    Spliterator.IMMUTABLE), false)
    

    否则,你将不得不建立自己的东西

    驱动程序分页

    如果JDBC驱动程序支持fetch size属性,则可以执行以下操作:

    Connection con = ds.getConnection();
    con.setAutoCommit(false);
    PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
    stm.setFetchSize(1000);
    ResultSet rs = stm.executeQuery();
    

    此时,rs包含1000条记录的第一次获取,在您阅读上一页之前,它不会从数据库中获取更多记录

    所有这一切中最棘手的部分是,在读取完所有记录之前,您无法关闭任何资源(即连接、预处理语句和结果集),而且由于我们想要构建的流在默认情况下是惰性的,这意味着我们必须在处理完流之前保持所有这些资源的打开状态

    也许最简单的方法是围绕这个逻辑构建一个迭代器,当迭代器实际到达所有数据的末尾时,您可以关闭所有资源(即!rs.next()),或者另一种方法是在流关闭时完成所有工作(Stream.onClose()

    一旦我们有了迭代器,使用JDK API中的SpliteratorsStreamSupport实用程序类从迭代器中构建流就非常简单了

    我的基本实现看起来有点像这样。这只是为了举例说明。你可能想对你的特殊情况给予更多的关爱

    public Stream<String> getUsers() {
        DataSource ds = jdbcTemplate.getDataSource();
        try {
            Connection conn = ds.getConnection();
            conn.setAutoCommit(false);
            PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
            //fetch size is what guarantees only 1000 records at the time
            stm.setFetchSize(1000);
            ResultSet rs = stm.executeQuery();
    
            Iterator<String> sqlIter = new Iterator<>() {
                @Override
                public boolean hasNext() {
                    try {
                        return rs.next();
                    } catch (SQLException e) {
                        closeResources(conn, stm, rs);
                        throw new RuntimeException("Failed to read record from ResultSet", e);
                    }
                }
    
                @Override
                public String next() {
                    try {
                        return rs.getString("id");
                    } catch (SQLException e) {
                        closeResources(conn, stm, rs);
                        throw new RuntimeException("Failed to read record from ResultSet", e);
                    }
                }
            };
    
            //turn iterator into a stream
            return StreamSupport.stream(
                    Spliterators.spliteratorUnknownSize(sqlIter,
                            Spliterator.NONNULL |
                                    Spliterator.ORDERED |
                                    Spliterator.IMMUTABLE), false
            ).onClose(() -> {
                //make sure to close resources when done with the stream
                closeResources(conn, stm, rs);
            });
    
    
        } catch (SQLException e) {
            logger.error("Failed to process data", e);
            throw new RuntimeException(e);
        }
    }
    
    private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
        try (conn; ps; rs) {
            logger.info("Resources successfully closed");
        } catch (SQLException e) {
            logger.warn("Failed to properly close database sources", e);
        }
    }
    

    这里的关键点是要注意,我们返回的流应该运行一些onClose逻辑,因此,当我们使用流时,必须确保在使用完它后执行stream.close(),以确保我们关闭到目前为止保持活动的所有资源(即connstmrs

    最好的方法可能是使用资源进行尝试,这样尝试将负责关闭流

    try(Stream<String> users = userRepo.getUsers()){
        //print users to the main output retrieving 1K at the time
        users.forEach(System.out::println);
    }
    

    手动分页

    另一种方法是自己对结果进行分页,这取决于数据库,但使用诸如limit和offset之类的select子句,可以请求特定的记录页,处理它们,然后检索更多的记录

    select id from users LIMIT 1000 OFFSET 5
    

    在这种情况下,迭代器将消耗所有页面,完成后,请求下一个页面,直到在最后一个页面中找不到更多记录

    另一种方法的优点是,可以立即在迭代器中控制资源

    我不会开发一个这样的例子,留给你们去尝试