有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

SpringWebFlux中的java实现“有效”,但我试图理解“为什么?”

“您认为是命令式的,第一行将执行第二行,这在webflux中并非如此。您必须考虑事件回调。”

我同意这一评估(我有很多“必须”做事情的经验),希望大家能帮助我纠正我如何看待解决方案空间。我发布了同一“功能”的三个不同版本,其中只有一个有效(并且,我愿意就如何修改该版本以更好地适应反应式/功能性实现发表评论)

在进行引用评估的人员的指导/帮助下,我能够获得DemoPOJOHandler。添加(服务器请求)“工作。代码以及调试级别的输出如下所示。我注意到的是,就在映射到mil.navy的httppost”/v2/DemoPOJO“和*”之后行,有一个来自反应器的条目。内蒂。频道FluxReceive声明“订阅入站接收器..”。这似乎是我在其他两次尝试中错过的关键动作

我的具体(尽管是“冗长的”)问题是:

我“明白”了,认为语句#1将被执行,然后语句#2等是解决方案空间的“命令式”视图。但是,在下面的例子中,这似乎是正在发生的行为。logger语句在08:38:34.217执行,随后是08:38:34.251的订阅,然后在08:39:34.267实例化DemoPOJO,然后一切都“正常”

但是,request.bodyToMono()...序列中的链接与命令式代码中的方法链接(例如,'Integer.toString())没有明显区别。indexOf()”),但lambda除外(或者,lambda的存在是“事物变化”的原因吗?)。因此,如果request.bodyToMono()...序列在理论上不需要“.then()”或“.switchIfEmpty()”,那么为什么核心request.bodyToMono()...序列不执行“^{”?我知道Mono没有被认购,但为什么要进行认购和将POJO添加到回购协议中似乎需要链中的附加声明

此代码成功执行

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                                 .then(ServerResponse.ok().build())
                                                 .switchIfEmpty(ServerResponse.badRequest()
                                                                              .contentType(MediaType.APPLICATION_JSON)
                                                                              .build());
    }
}


2019-07-25 08:38:34.144 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7a8a4d6a
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Increasing pending responses, now 1
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@579c20c6
2019-07-25 08:38:34.195 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [5f552130] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$258/1123559518@22a8277c
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:38:34.251 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.DemoPOJO( 666, foo_666, 10666 )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.toString()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonDecoder  : [5f552130] Decoded [666 :: foo_666 :: 10666]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOService   : DemoPOJOService.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO ) -> adding for id 666
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.272 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] Completed 200 OK
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP response frame
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:38:34.274 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Decreasing pending responses, now 0
2019-07-25 08:38:34.275 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP packet was sent, terminating the channel
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62610) connection closed
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection

此代码执行时“没有错误”,但从不订阅,因此从不执行链的“doOnSuccess(...)”部分。但是,似乎应该这样做?链接单独的返回的“魔力”是什么<将eem>语句添加到请求中。bodyToMono(…)'<带有“”的eem>语句,然后(…)

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
        return  ServerResponse.ok().build();
    }
}


2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62661) connection closed
2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3860465a
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] New http connection, requesting read
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] New http connection, requesting read
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:40:18.285 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Increasing pending responses, now 1
2019-07-25 08:40:18.289 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@7fa4fcbc
2019-07-25 08:40:18.297 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:40:18.315 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [51900c31] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$262/1446001495@27a07cfc
2019-07-25 08:40:18.316 DEBUG 17064 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:40:18.358 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] Completed 200 OK
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP response frame
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:40:18.360 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Decreasing pending responses, now 0
2019-07-25 08:40:18.361 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP packet was sent, terminating the channel
2019-07-25 08:40:18.366 DEBUG 17064 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+

这段代码只是用一个NPE来完成的。我失败的逻辑是“好吧,如果”doOnSuccess(…)“之所以没有发生,是因为没有订阅Mono,然后是“订阅”。显然,这不是解决方案。不太明显的是(对我来说,此时此刻),“为什么?“

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                          .subscribe();
        return  ServerResponse.ok().build();
    }
}


reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at mil.navy.demo.DemoPOJO.DemoPOJOHandler.lambda$add$2(DemoPOJOHandler.java:73) ~[classes/:na]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:311) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:155) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:390) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:197) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:338) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:350) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:399) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpServerOperations.cleanHandlerTerminate(HttpServerOperations.java:519) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpTrafficHandler.operationComplete(HttpTrafficHandler.java:314) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]

    (... lots of stuff deleted to fit posting constraints ...)

2019-07-25 10:44:43.212 DEBUG 10544 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0xa62e89df, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:64515] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: (port 64485) connection closed
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: close connection

共 (1) 个答案

  1. # 1 楼答案

    因为我是你们在顶部引用的那个人,所以我将尝试回答你们的问题

    首先,我们需要讨论“非阻塞”。什么是“非阻塞”?非阻塞是基于事件的。底层服务器Netty不负责为每个请求分配一个线程,而是负责处理事件链和事件队列

    因此,当有人订阅时,netty将设置一个基本的事件队列(排序),该队列的基本工作方式如下:

    x <- y <- z
    

    为了得到x,我们需要解y,但是为了得到y,我们需要解z。这就是人们在这种编程中通常所说的“函数”部分

    当人们开始使用反应式编程时,我看到的一个最常见的错误是他们不理解subscriber是调用client。spring应用程序是publisher,调用服务的每个client都是subscriber

    您不应在应用程序中订阅

    为什么您的发布应用程序要自行订阅?当你以人们通常理解的方式解释它时

    让我们看一下你们的例子,我将按相反的顺序:

    示例3:

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");
    
        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                          .subscribe();
        return  ServerResponse.ok().build();
    }
    

    在这里,我们以命令式的方式输入方法,我们给它一个请求,ServerRequest是一个具体的对象,但是一旦你做了bodyToMono,你就会返回一个Mono<DemoPOJO>,它反过来是一个包装的CompletableFuture,里面有一个计算(在请求中获取主体并将其放入dto)

    一旦完成此计算,Mono将进入success状态并触发链中后面的内容,因此doOnSuccess将被触发。当doOnSuccess完成时,它将返回Mono<Void>

    这就是你的问题所在,当doOnSuccess完成时,你subscribe。因此,您在这里所做的是,一旦有人将ServerRequest发布到您的应用程序,Netty(服务器)将建立一个事件链,并且在这个事件链中,应用程序将订阅自己

    在这里,链是由订阅自身的应用程序完成的。因此,应用程序是它自己的客户机

    示例2:

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");
    
        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
        return  ServerResponse.ok().build();
    }
    

    这里我们做的与例3相同,只是当事件链被设置时,请求被映射到DTO,然后我们在doOnSuccess中做一些事情,但随后该链被破坏doOnSuccess表示它已完成,但监听后没有任何内容

    所以这里的事件链是断裂的,它是不完整的。在你订阅之前什么都不会发生,但是由于链断了,没有人可以subscribe,因此什么都不会发生

    示例1:

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");
    
        return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                                 .then(ServerResponse.ok().build())
                                                 .switchIfEmpty(ServerResponse.badRequest()
                                                                              .contentType(MediaType.APPLICATION_JSON)
                                                                              .build());
    }
    

    在这里,链条是完整的。某物在发出某物的信号,即发出某物的信号,当某物完成时,下一件事将触发,下一件,下一件,下一件,下一件

    调用客户端发布数据,服务器设置eventchain,该链完成后,客户端订阅客户端订阅,然后事件链启动并触发所有回调并返回数据

    Flux<T>Mono<T>都是围绕monadCompletableFuture<T>的包装类Optional<T>Stream<T>也是单子,单子来自函数世界,就像编程语言Haskell一样。了解单子的工作原理的一个好方法是更多地了解单子

    如果你想对monads有更多的了解,我会无耻地插入我自己的文章:

    Write a Monad, in Java, seriously?

    我很好的阅读也是Intro to reactive programming我建议通读他们所有的例子