有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

仅含WebSocket的java graphqlspringbootstarter应用程序

我正在使用spring boot starter webflux 2.5.6com构建一个graphql应用程序。graphql java kickstart:graphql spring启动程序:12.0.0。 此时,应用程序运行良好,因为com。graphql java kickstart很容易启动。 通过http请求,我可以调用查询并运行变体,甚至可以通过WebSocket上的订阅创建和获取更新

但对于我的应用程序,查询和变异也必须通过websocket运行

似乎在com中。graphql java kickstart:graphql spring启动程序您只能将订阅端点配置为websocket。 通过“extends Endpoint”和“@ServerEndpoint”添加额外的websocket完全没有效果

我还尝试添加自己的HandlerMapping:

@PostConstruct
public void init()
{
    Map<String, Object> map = new HashMap<String, Object>(
            ((SimpleUrlHandlerMapping) webSocketHandlerMapping).getUrlMap());
    
    map.put("/mysocket", myWebSocketHandler);
    //map.put("/graphql", myWebSocketHandler);

    ((SimpleUrlHandlerMapping) webSocketHandlerMapping).setUrlMap(map);
    ((SimpleUrlHandlerMapping) webSocketHandlerMapping).initApplicationContext();

}

这似乎适用于/mysocket主题,但如何为/graphql启用它,似乎已经有一个处理程序正在侦听:

WARN 12168 --- [ctor-http-nio-2] notprivacysafe.graphql.GraphQL : Query failed to parse : ''

以及如何将websocket与GraphQLmutationResolver连接


共 (1) 个答案

  1. # 1 楼答案

    我解决这个问题的切入点是创建一个RestController,并将ServerWebExchange连接到WebSocketService中的WebSocketHandler,如下所示:

    @RestController
    @RequestMapping("/")
    public class WebSocketController
    {
    
        private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);
    
        private final GraphQLObjectMapper objectMapper;
    
        private final GraphQLInvoker graphQLInvoker;
    
        private final GraphQLSpringInvocationInputFactory invocationInputFactory;
    
        private final WebSocketService service;
    
        @Autowired
        public WebSocketController(GraphQLObjectMapper objectMapper, GraphQLInvoker graphQLInvoker,
                GraphQLSpringInvocationInputFactory invocationInputFactory, WebSocketService service)
        {
            this.objectMapper = objectMapper;
            this.graphQLInvoker = graphQLInvoker;
            this.invocationInputFactory = invocationInputFactory;
            this.service = service;
        }
    
        @GetMapping("${graphql.websocket.path:graphql-ws}")
        public Mono<Void> getMono(ServerWebExchange exchange)
        {
            logger.debug("New connection via GET");
            return service.handleRequest(exchange,
                    new GraphQLWebsocketMessageConsumer(exchange, objectMapper, graphQLInvoker, invocationInputFactory));
        }
    
        @PostMapping("${graphql.websocket.path:graphql-ws}")
        public Mono<Void> postMono(ServerWebExchange exchange)
        {
            ...
        }
    
    }
    

    在这个原型状态中,WebSocketHandler还实现了Consumer,调用它来处理每个WebSocketMessage

    public class GraphQLWebsocketMessageConsumer implements Consumer<String>, WebSocketHandler
    {
    
        private static final Logger logger = LoggerFactory.getLogger(GraphQLWebsocketMessageConsumer.class);
    
        private final ServerWebExchange swe;
    
        private final GraphQLObjectMapper objectMapper;
    
        private final GraphQLInvoker graphQLInvoker;
    
        private final GraphQLSpringInvocationInputFactory invocationInputFactory;
    
        private final Sinks.Many<String> publisher;
    
        public GraphQLWebsocketMessageConsumer(ServerWebExchange swe, GraphQLObjectMapper objectMapper,
                GraphQLInvoker graphQLInvoker, GraphQLSpringInvocationInputFactory invocationInputFactory)
        {
            ...
            publisher = Sinks.many().multicast().directBestEffort();
        }
    
        @Override
        public Mono<Void> handle(WebSocketSession webSocketSession)
        {
            Mono<Void> input = webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(this).then();
            Mono<Void> sender = webSocketSession.send(publisher.asFlux().map(webSocketSession::textMessage));
            return Mono.zip(input, sender).then();
        }
    
        @Override
        public void accept(String body)
        {
            try
            {
                String query = extractQuery(body);
                if(query == null)
                {
                    return;
                }
                GraphQLRequest request = objectMapper.readGraphQLRequest(query);
                GraphQLSingleInvocationInput invocationInput = invocationInputFactory.create(request, swe);
                Mono<ExecutionResult> executionResult = Mono.fromCompletionStage(graphQLInvoker.executeAsync(invocationInput));
                Mono<String> jsonResult = executionResult.map(objectMapper::serializeResultAsJson);
                jsonResult.subscribe(publisher::tryEmitNext);
            } catch (Exception e)
            {
                ...
            }
        }
    
        @SuppressWarnings("unchecked")
        private String extractQuery(final String query) throws Exception
        {
            Map<String, Object> map = (Map<String, Object>) objectMapper.getJacksonMapper().readValue(query, Map.class);
            ...
            return queryPart;
        }
    
        @Override
        public List<String> getSubProtocols()
        {
            logger.debug("getSubProtocols called");
            return Collections.singletonList("graphql-ws");
        }
    
    }
    

    该解决方案尚未涉及身份验证或会话处理等安全方面