有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java r2dbc连接关闭异常

请帮助我理解并解决我在mssqlserver db上使用r2dbc获取SELECT SQL语句可以获取的许多记录中的第一条记录时,获取错误的原因

我的gradle相关性:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.security:spring-security-oauth2-client'
    implementation 'io.springfox:springfox-boot-starter:3.0.0'
    implementation 'org.springframework.data:spring-data-r2dbc'
    implementation 'io.r2dbc:r2dbc-mssql'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    annotationProcessor 'org.projectlombok:lombok'
}

我的代码(用xxxx屏蔽的敏感内容):[请理解,这不是生产准备好的代码,是处于非常初级阶段的POC的一部分。因此,大多数代码不符合生产准备好的标准。因此,请注意这一点。]

@Configuration
@EnableR2dbcRepositories(basePackages="com.xxxx.admin.db.repo")
public class DatabaseConfiguration extends AbstractR2dbcConfiguration
{
    
    @Autowired
    private Environment env;

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
        return new MssqlConnectionFactory(MssqlConnectionConfiguration.builder().host("xxxxxx")
                .database("xxxx").username(env.getProperty("xxxxx")).password(env.getProperty("xxxxxx")).build());
    }

}


@RestController
@RequestMapping("/api/v1/admin")
public class AdminController  {
    @GetMapping("/junk")
    public Mono<Map<String, Object>> getUser() {
        return tmoUserRepo.getUser("XXXXXXXXXXX");      
    }
}


@Repository
public class TmoUserRepo {
    
private static final String GET_USER_SQL = "SELECT xxxxxxx WHERE ID=:userId ORDER BY XXX DESC, YYY ASC, ZZZ ASC;" ; 

    @Autowired
    private DatabaseClient dbClient;
    public Mono<Map<String, Object>> getUser(String userId) {
        dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
        return null;
    }
}

场景1这会产生错误(dbClient是DatabaseClient的实例):

dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);

场景2这成功地完成了:

dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().all(System.out::println);

=========================

场景1发生的情况:以下内容在日志中打印两次:

2020-12-04 18:37:38.569 DEBUG 18636 --- [actor-tcp-nio-1] o.s.d.r2dbc.core.DefaultDatabaseClient   : Executing SQL statement [SELECT usr.name_first AS namefirst,    
    usr.name_last AS namelast,
<<.... rest of SQL...>>

错误消息:

2020-12-04 18:37:40.278 ERROR 18636 --- [actor-tcp-nio-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2020-12-04 18:37:40.280 ERROR 18636 --- [actor-tcp-nio-1] r.n.channel.ChannelOperationsHandler     : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] Error was received while reading the incoming data. The connection will be closed.

reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:635) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:95) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:406) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:196) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
    at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
    ... 32 common frames omitted

2020-12-04 18:37:40.280  WARN 18636 --- [actor-tcp-nio-1] reactor.netty.channel.FluxReceive        : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed

共 (1) 个答案

  1. # 1 楼答案

    简短回答

    你的问题可能在这里:

    dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
    return null;
    

    您可能需要将其更改为:

    return dbClient.execute(GET_USER_SQL)
                   .bind("userId",userId)
                   .fetch()
                   .first();
    

    长话短说

    做反应式编程与常规Java编程有很大不同,如果你尝试做常规Java编程,你会遇到困难

    首先,在反应式编程中不允许使用null值,因此永远不要返回null

    反应式编程适用于生产者消费者。您的应用程序是生产者,而呼叫客户端(web应用程序、移动应用程序、邮递员、curl等)是消费者。发起呼叫的人通常是消费者。消费者从向生产者订阅开始,因此客户端订阅服务器

    当这种情况发生时,服务器将从代码的末尾开始,向上遍历,直到找到生成值的内容(数据库)。在这期间,它将组装一种回调链。该零件称为装配阶段。当这个阶段完成时,你的应用程序将输出一个(如果是aMono)或多个(如果是aFlux)值

    所以除非有人订阅,否则什么都不会发生

    // Only a declaration, nothing happens
    Mono.just("Foobar");
    

    而:

    // Someone subscribes, chain is built and value is produced
    Mono.just("Foobar").subscribe(s -> System.out.println(s));
    

    这也适用于以下功能:

    public Mono<String> getFoobar() {
        return Mono.just("Foobar");
    }
    
    getFoobar().subscribe(s -> System.out.println(s));
    

    但如果你不想退货怎么办:

    public Mono<Void> getFoobar() {
        return Mono.just("Foobar").then(); 
        // then will throw away whatever is returned from the previous and instead return a Mono<Void> that will signal to the next part in the chain that it is done.
    }
    
    getFoobar().subscribe();
    

    或者您可以返回一个Mono.empty(),它将转换为Mono<Void>

    public Mono<Void> getFoobar() {
        return Mono.just("Foobar").flatMap(s -> {
            // Return an empty, which will translate to a Mono<Void>
            return Mono.empty();
        });
    }
    
    getFoobar().subscribe();
    

    所以,如果我们看看你的两个场景:

    dbClient.execute(GET_USER_SQL)
            .bind("userId",userId)
            .fetch()
            .first()
            .subscribe(System.out::println);
    

    在这里,正在订阅,所以它可以工作,但是当调用客户端想要订阅时,它就中断了

    dbClient.execute(GET_USER_SQL)
            .bind("userId",userId)
            .fetch()
            .first()
            .all(System.out::println);
    

    这是可行的,但取决于是否返回,值将被发送到调用客户端。需要删除all,这样就可以将first生成的值一直返回给客户端

    我已经尽我所能地解释了这一点,所有这些都可以在优秀的Reactor Documentation中阅读