有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java在非Netty客户端中从Netty服务器获取响应

这是我关于StackOverflow的第一个问题,我希望我遵守了预期的标准

我已经从不再在这里工作的其他人那里接管了一些代码,我几乎被困在这里。我搜索并询问了一些同事(不幸的是,没有太多Java经验),但似乎没有人能帮助我。搜索对我也没什么帮助

我从一个客户机向Netty服务器发送Json请求,该客户机故意不使用Netty实现。目前,它只是一个简单的Javasocket,但目的是让Flask客户端向Netty服务器发送请求。请求到达(使用Javasocket和Python Flask),并在管道中得到正确处理,但我想向客户机发送响应,尽管我怀疑在代码中发送响应的位置,但我显然遗漏了一些东西,因为我没有得到任何响应。有什么建议吗

Javasocket客户端(请注意,此处代码段中省略了json1和json2字符串,因为它们相当长,但格式正确)。使用socket和相关输出流发布请求。响应部分(同一个socket的输入流)只是一些我有疑问的测试,但不确定如何做,否则(这就是为什么我把它保留在这里)。我已经看到了很多客户机实现Netty接口的例子,这似乎很好,但正如我所说的,我希望不使用Netty的客户机也能够收到响应(如果可能的话)

String serverResponse;

for (int j = 0; j < 100; j++) {
    for (int i = 0; i < 1000; i++) {
        try {
            Socket s = new Socket("localhost", 12000);
            PrintWriter out = new PrintWriter(s.getOutputStream(), true);
            out.write(json1 + i + json2);
            out.flush();

            // Testing only - trying to get the response back from the server
            BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
            while(true) {
                if ((serverResponse = in.readLine()) != null) {
                    log.info("server says", serverResponse);
                    break;
                }
            }

            out.close();
            s.close();
            Thread.sleep(1000);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    Thread.sleep(2000);
}

MCTcpServer。java

/**
 * Abstract TCP Server class. this class should be implemented in the subclass to implement an actual server.
 *
 * @param <R> The data to be read from the socket.
 * @param <W> data to be written (in case of duplex) from the socket.
 */

public abstract class MFTcpServer<R, W> {

    protected final AtomicBoolean started;

    protected MFTcpServer() {
        this.started = new AtomicBoolean();
    }

    /**
     * Start the server.
     *
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     * @return instance of tcp server
     */
    public final MFTcpServer<R, W> start(ChannelInitializer<Channel> initializer) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Server already started");
        }

        doStart(initializer);
        return this;
    }

    /**
     * Start the server and wait for all the threads to be finished before shutdown.
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     */
    public final void startAndAwait(ChannelInitializer<Channel> initializer) {
        start(initializer);
        awaitShutdown();
    }

    /**
     * Shutdown the server
     * @return true if successfully shutdown.
     */
    public final boolean shutdown() {
        return !started.compareAndSet(true, false) || doShutdown();
    }

    /**
     * Wait for all the threads to be finished before shutdown.
     */
    public abstract void awaitShutdown();

    /**
     * Do the shutdown now.
     * @return true if successfully shutdown
     */
    public abstract boolean doShutdown();

    /**
     * start the server
     * @param initializer the channel initializers. they will be called when a new client connetcs to the server.
     * @return instance of tcp server
     */
    public abstract MFTcpServer<R, W> doStart(ChannelInitializer<Channel> initializer);

    /**
     *
     * @return the port where the server is running.
     */
    public abstract int getPort();

mfnetty4tcserver。java实际的服务器实现

public class MFNetty4TcpServer<R, W> extends MFTcpServer<R, W> {

    private static final Logger logger = LoggerFactory.getLogger(MFNetty4TcpServer.class);
    private static final int BOSS_THREAD_POOL_SIZE = 2;

    private int port;
    private ServerBootstrap bootstrap;
    private ChannelFuture bindFuture;

    /**
     * The constructor.
     *
     * @param port port where to listen
     */
    protected MFNetty4TcpServer(int port) {
        this.port = port;
        final NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (BOSS_THREAD_POOL_SIZE));
        final NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (JsonProducerConfig.THREAD_POOL_SIZE));

        bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class);
    }

    @Override
    public MFNetty4TcpServer<R, W> doStart(ChannelInitializer<Channel> initializer) {
        bootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {

                if (initializer != null) {
                    ch.pipeline().addLast(initializer);
                }
            }
        });

        try {
            bindFuture = bootstrap.bind(port).sync();
            if (!bindFuture.isSuccess()) {
                // Connection not successful
                throw new RuntimeException(bindFuture.cause());
            }
            SocketAddress localAddress = bindFuture.channel().localAddress();
            if (localAddress instanceof InetSocketAddress) {
                port = ((InetSocketAddress) localAddress).getPort();
                logger.info("Started server at port: " + port);
            }

        } catch (InterruptedException e) {
            logger.error("Error waiting for binding server port: " + port, e);
        }

        return this;
    }

    @Override
    public void awaitShutdown() {
        try {
            bindFuture.channel().closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted(); // Reset the interrupted status
            logger.error("Interrupted while waiting for the server socket to close.", e);
        }
    }

    @Override
    public boolean doShutdown() {
        try {
            bindFuture.channel().close().sync();
            return true;
        } catch (InterruptedException e) {
            logger.error("Failed to shutdown the server.", e);
            return false;
        }
    }

    @Override
    public int getPort() {
        return port;
    }

    /**
     * Creates a tcp server at the defined port.
     *
     * @param port port to listen to
     * @param <R>  data to be read
     * @param <W>  data to be written back. Only in case of duplex connection.
     * @return instance of tcp server.
     */
    public static <R, W> MFTcpServer<R, W> create(int port) {
        return new MFNetty4TcpServer<>(port);
    }

}

JsonProducerConfig。java管道在这里设置

/**
 * Spring Configuration class of the application.
 */
@Configuration
@Import({DatabusConfig.class})
public class JsonProducerConfig {

    private static final Logger log = LoggerFactory.getLogger(JsonProducerConfig.class);

    public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    public static final String TCP_SERVER = "tcpServer";
    public static final String CHANNEL_PIPELINE_INITIALIZER = "channel_initializer";
    public static final String MF_KAFKA_PRODUCER = "mf_kafka_producer";
    public static final String JSON_AVRO_CONVERTOR = "jsonAvroConvertor";

    @Value("#{systemProperties['tcpserver.port']?:'12000'}")
    private String tcpServerPort;

    @Bean(name = TCP_SERVER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFTcpServer nettyTCPServer() {
        return MFNetty4TcpServer.create(Integer.parseInt(tcpServerPort));
    }

    @Bean(name = MF_KAFKA_PRODUCER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFKafkaProducer pushToKafka() {
        return new MFKafkaProducer();
    }

    @Bean(name = JSON_AVRO_CONVERTOR)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public JsonAvroConvertor jsonAvroConvertor() {
        return new JsonAvroConvertor();
    }

    /**
     * This is where the pipeline is set for processing of events.
     *
     * @param jsonAvroConvertor converts json to avro
     * @param kafkaProducer     pushes to kafka
     * @return chanenl initializers pipeline.
     */
    @Bean(name = CHANNEL_PIPELINE_INITIALIZER)
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public ChannelInitializer<Channel> channelInitializers(JsonAvroConvertor jsonAvroConvertor,
                                                           MFKafkaProducer kafkaProducer) {
        return new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {

                if (log.isInfoEnabled())
                    log.info("initChannel - initing channel...");

                channel.pipeline().addLast(new NioEventLoopGroup(0, new DefaultEventExecutorGroup(THREAD_POOL_SIZE)));
                channel.pipeline().addLast(new JsonObjectDecoder(1048576));
                channel.pipeline().addLast(jsonAvroConvertor);
                channel.pipeline().addLast(kafkaProducer);

                if (log.isInfoEnabled())
                    log.info("channel = " + channel.toString());
            }
        };
    }

}

JsonProducer。java主程序

public class JsonProducer {

    private static final Logger log = LoggerFactory.getLogger(JsonProducer.class);

    private static MFTcpServer tcpServer;

    /**
     * Main startup method
     *
     * @param args not used
     */
    public static void main(String[] args) {
        System.setProperty("solschema", "false");

        try {

            // the shutdown hook.
            Runtime.getRuntime().addShutdownHook(new Thread(
                    () -> {
                        if (tcpServer != null) {
                            tcpServer.shutdown();
                        }
                    }
            ));

            AnnotationConfigApplicationContext context = new
                    AnnotationConfigApplicationContext(JsonProducerConfig.class);

            tcpServer = (MFTcpServer) context.getBean(JsonProducerConfig.TCP_SERVER);

            ChannelInitializer<Channel> channelInitializer = (ChannelInitializer<Channel>) context.
                    getBean(JsonProducerConfig.CHANNEL_PIPELINE_INITIALIZER);

            tcpServer.startAndAwait(channelInitializer);

        } catch (Exception t) {
            log.error("Error while starting JsonProducer ", t);
            System.exit(-1);
        }
    }
}

MFKafkaProducer。java作为管道中的最后一个通道。注意ctx。在channelRead方法中使用writeAndFlush(msg),我知道应该在该方法中启动响应。但那之后呢。在未来经营这个渠道时。isSuccess()的计算结果为false。response对象试图创建字符串响应

@ChannelHandler.Sharable
public class MFKafkaProducer extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(MFKafkaProducer.class);

    @Resource
    ApplicationContext context;

    @Resource(name = DatabusConfig.ADMIN)
    Admin admin;

    private Map<String, IProducer> streams = new HashMap<>();

    @PreDestroy
    public void stop() {
        removeAllStreams(); // then stop writing to producers
    }

    /**
     * @param clickRecord the record to be pushed to kafka
     * @throws Exception
     */
    public void handle(GenericRecord clickRecord) throws Exception {
        Utf8 clientId = null;
        try {
            clientId = (Utf8) clickRecord.get(SchemaUtil.APP_ID);
            stream(producer(clientId.toString()), clickRecord);
        } catch (Exception e) {
            String message = "Could not push click data for clientId:" + clientId;
            log.warn("handle - " + message + "!!!", e);
            assert clientId != null;
            removeStream(clientId.toString());
        }
    }

    /**
     * removes all the streams
     */
    private void removeAllStreams() {
        Set<String> strings = streams.keySet();

        for (String clientId : strings) {
            removeStream(clientId);
        }
    }

    /**
     * removes a particular stream
     *
     * @param clientId the stream to be removed
     */
    private void removeStream(String clientId) {
        Assert.notEmpty(streams);
        IProducer producer = streams.get(clientId);
        producer.stopProducer();
        streams.remove(clientId);
    }

    /**
     * @param producer    the producer where data needs to be written
     * @param clickRecord teh record to be written
     */
    private void stream(IProducer producer, GenericRecord clickRecord) {
        producer.send(clickRecord);
    }

    /**
     * This will create a producer in case it is not already created.
     * If already created return the already present one
     *
     * @param clientId stream id
     * @return the producer instance
     */
    private IProducer producer(String clientId) {
        if (streams.containsKey(clientId)) {
            return streams.get(clientId);
        } else {
            IProducer producer = admin.createKeyTopicProducer(SchemaUtil.APP_ID, "test_" + clientId, new ICallback() {
                @Override
                public void onSuccess(long offset) {
                    if (log.isInfoEnabled())
                        log.info("onSuccess - Data at offset:" + offset + " send.");
                }

                @Override
                public void onError(long offset, Exception ex) {
                    if (log.isInfoEnabled())
                        log.info("onError - Data at offset:" + offset + " failed. Exception: ", ex);
                }

                @Override
                public void onStreamClosed() {
                    log.warn("onStreamClosed - Stream:" + clientId + " closed.");
                    removeStream(clientId);
                }
            });
            producer.startProducer();
            streams.put(clientId, producer);
            return producer;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.debug("KafkaProducer - channelRead() called with " + "ctx = [" + ctx + "], msg = [" + msg + "]");

        if (msg instanceof GenericRecord) {
            GenericRecord genericRecord = (GenericRecord) msg;
            try {
                handle(genericRecord);
                log.debug("channelRead sending response");
                Charset charset = Charset.defaultCharset();
                ByteBuf response = Unpooled.copiedBuffer("Just a response", charset);
                ChannelFuture future = ctx.writeAndFlush(msg);
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess())
                            log.info("channelRead - future.operationComplete - Response has been delivered to all channels");
                        else
                            log.info("channelRead - future.operationComplete - Response has NOT been delivered to all channels");
                    }
                });
            } catch (Exception ex) {
                log.error("Something went wrong processing the generic record: " + msg + "\n ", ex);
            }
        } else {
            log.debug("KafkaProducer - msg not of Type Generic Record !!! " + msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        log.error("Something went wrong writing to Kafka: \n", cause);
        ctx.close();
    }

}

共 (1) 个答案

  1. # 1 楼答案

    使用ChannelFuture#cause()时,我注意到我并没有序列化ByteBuf对象,而是序列化了一个GenericRecord。使用

    ByteBuf response = Unpooled.copiedBuffer(genericRecord.toString(), charset); 
    ChannelFuture future = ctx.writeAndFlush(response);

    GenericRecord将转换为ButeBuf,并使用writeAndFlush方法发送响应

    使用Socket实现的测试客户端不知何故从未真正收到响应,但通过使用SocketChannel,这也得到了解决