有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

两个Quarkus服务之间的非阻塞数据流(Java中带有Mutiny的Vert.x)

更新

在解决了一些与主要问题无关的问题后,我修复了示例代码中的小错误,这些问题仍然是关于服务之间的非阻塞流

背景信息

我正在Quarkus下移植SpringWebFlux服务。该服务在多个大型数据集上运行长时间搜索,并在流量(文本/事件流)中返回可用的部分结果

问题

现在我正试图使用叛变多与垂直。Quarkus下的x,但无法计算消费者服务如何在不阻塞的情况下接收此流

在所有示例中,消费者要么是JS前端页面,要么生产者的内容类型是application/json,在多个json对象中发送之前(在我的应用程序中没有任何意义),它似乎会一直闪烁,直到多个json对象完成为止

问题

  1. 如何使用Mutiny风格的Vert接收文本/事件流。x网络客户端
  2. 如果问题是WebClient无法接收连续数据流:在两个Quarkus服务之间传输数据的标准方式是什么

这里是一个简化的示例

测试实体

public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}

制作人1。简单无限流->;挂起

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

制作人2。Vertx路径无限流->;挂起

@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

制作人3。简单有限流->;阻塞直至完成

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

消费者

在生产者和消费者双方都尝试了多种不同的解决方案,但在每种情况下,流都会阻塞,直到完成或无限期挂起,而不传输无限流的数据。我用httpie得到了相同的结果。以下是最新的迭代:

WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));

共 (1) 个答案

  1. # 1 楼答案

    维特。x Web客户端不能与SSE一起工作(没有配置)。 从https://vertx.io/docs/vertx-web-client/java/

    Responses are fully buffered, use BodyCodec.pipe to pipe the response to a write stream

    它会等待响应完成。您可以使用原始垂直。x HTTP客户端或使用pipe编解码器。例子在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses上给出

    或者,您可以使用SSE客户端,例如: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34