java r2dbc连接关闭异常
请帮助我理解并解决我在mssqlserver db上使用r2dbc获取SELECT SQL语句可以获取的许多记录中的第一条记录时,获取错误的原因
我的gradle相关性:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.security:spring-security-oauth2-client'
implementation 'io.springfox:springfox-boot-starter:3.0.0'
implementation 'org.springframework.data:spring-data-r2dbc'
implementation 'io.r2dbc:r2dbc-mssql'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
annotationProcessor 'org.projectlombok:lombok'
}
我的代码(用xxxx屏蔽的敏感内容):[请理解,这不是生产准备好的代码,是处于非常初级阶段的POC的一部分。因此,大多数代码不符合生产准备好的标准。因此,请注意这一点。]
@Configuration
@EnableR2dbcRepositories(basePackages="com.xxxx.admin.db.repo")
public class DatabaseConfiguration extends AbstractR2dbcConfiguration
{
@Autowired
private Environment env;
@Bean
@Override
public ConnectionFactory connectionFactory() {
return new MssqlConnectionFactory(MssqlConnectionConfiguration.builder().host("xxxxxx")
.database("xxxx").username(env.getProperty("xxxxx")).password(env.getProperty("xxxxxx")).build());
}
}
@RestController
@RequestMapping("/api/v1/admin")
public class AdminController {
@GetMapping("/junk")
public Mono<Map<String, Object>> getUser() {
return tmoUserRepo.getUser("XXXXXXXXXXX");
}
}
@Repository
public class TmoUserRepo {
private static final String GET_USER_SQL = "SELECT xxxxxxx WHERE ID=:userId ORDER BY XXX DESC, YYY ASC, ZZZ ASC;" ;
@Autowired
private DatabaseClient dbClient;
public Mono<Map<String, Object>> getUser(String userId) {
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
return null;
}
}
场景1这会产生错误(dbClient是DatabaseClient的实例):
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
场景2这成功地完成了:
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().all(System.out::println);
=========================
场景1发生的情况:以下内容在日志中打印两次:
2020-12-04 18:37:38.569 DEBUG 18636 --- [actor-tcp-nio-1] o.s.d.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT usr.name_first AS namefirst,
usr.name_last AS namelast,
<<.... rest of SQL...>>
错误消息:
2020-12-04 18:37:40.278 ERROR 18636 --- [actor-tcp-nio-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2020-12-04 18:37:40.280 ERROR 18636 --- [actor-tcp-nio-1] r.n.channel.ChannelOperationsHandler : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] Error was received while reading the incoming data. The connection will be closed.
reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:635) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:95) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:406) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:196) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
... 32 common frames omitted
2020-12-04 18:37:40.280 WARN 18636 --- [actor-tcp-nio-1] reactor.netty.channel.FluxReceive : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
# 1 楼答案
简短回答
你的问题可能在这里:
您可能需要将其更改为:
长话短说
做反应式编程与常规Java编程有很大不同,如果你尝试做常规Java编程,你会遇到困难
首先,在反应式编程中不允许使用
null
值,因此永远不要返回null
反应式编程适用于生产者和消费者。您的应用程序是生产者,而呼叫客户端(web应用程序、移动应用程序、邮递员、curl等)是消费者。发起呼叫的人通常是消费者。消费者从向生产者订阅开始,因此客户端订阅服务器
当这种情况发生时,服务器将从代码的末尾开始,向上遍历,直到找到生成值的内容(数据库)。在这期间,它将组装一种回调链。该零件称为装配阶段。当这个阶段完成时,你的应用程序将输出一个(如果是a
Mono
)或多个(如果是aFlux
)值所以除非有人订阅,否则什么都不会发生
而:
这也适用于以下功能:
但如果你不想退货怎么办:
或者您可以返回一个
Mono.empty()
,它将转换为Mono<Void>
所以,如果我们看看你的两个场景:
在这里,你正在订阅,所以它可以工作,但是当调用客户端想要订阅时,它就中断了
这是可行的,但取决于是否返回,值将被发送到调用客户端。需要删除
all
,这样就可以将first
生成的值一直返回给客户端我已经尽我所能地解释了这一点,所有这些都可以在优秀的Reactor Documentation中阅读