有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

`OneErrorContinue(…)`的java限制在变化中?

我有一个(可能是无限的)Flux源,它应该首先存储每条消息(例如,存储到数据库中),然后异步转发消息(例如,使用Spring WebClient

发生故障时的转发应该记录错误,而不完成源Flux

然而,我意识到流中的forward(flatMap(...))会在导致异常(例如reactor.retry.RetryExhaustedException)的256条消息之后阻止源Flux的执行

由于仅处理256条消息,因此断言失败的典型示例:

@Test
@SneakyThrows
public void sourceBlockAfter256Exceptions() {
    int numberOfRequests = 500;
    Set<Integer> sink = new HashSet<>();
    Flux
            .fromStream(IntStream.range(0, numberOfRequests).boxed())
            .map(sink::add)
            .flatMap(i -> Mono
                    // normally the forwards are contained here e.g. by means of Mono.when(...).thenReturn(...).retryWhen(...):
                    .error(new Exception("any"))
            )
            .onErrorContinue((throwable, o) -> log.error("Error", throwable))
            .subscribe();

    Thread.sleep(3000);
    Assertions.assertEquals(numberOfRequests, sink.size());
}

subscribe(...)内进行转发不会阻塞源Flux,但这肯定不是解决方案,因为我不可能希望丢失消息

问题:

  • 这里发生了什么事?(可能与存储在一位中的某些状态有关)
  • 如何才能正确地执行此操作

编辑:

根据下面的讨论,我构建了一个使用FluxMessageChannel的示例(据我所知,该示例适用于无限流,并且在256个错误后肯定不会阻塞),并且具有完全相同的行为:

@Test
@SneakyThrows
public void maxConnectionWithChannelTest() {
    int numberOfRequests = 500;
    Set<Integer> sink = new HashSet<>();

    FluxMessageChannel fluxMessageChannel = MessageChannels.flux().get();

    fluxMessageChannel.subscribeTo(
            Flux
                    .fromStream(IntStream
                            .range(0, numberOfRequests).boxed()
                            .map(i -> MessageBuilder.withPayload(i).build())
                    )
                    .map(Message::getPayload)
                    .map(sink::add)
                    .flatMap(i -> Mono.error(new Exception("whatever")))
    );

    Flux
            .from(fluxMessageChannel)
            .subscribe();

    Thread.sleep(3000);
    Assert.assertEquals(numberOfRequests, sink.size());
}

编辑:

我刚刚在反应堆核心项目中提出了一个问题:https://github.com/reactor/reactor-core/issues/2011


共 (0) 个答案