java在非Netty客户端中从Netty服务器获取响应
这是我关于StackOverflow的第一个问题,我希望我遵守了预期的标准
我已经从不再在这里工作的其他人那里接管了一些代码,我几乎被困在这里。我搜索并询问了一些同事(不幸的是,没有太多Java经验),但似乎没有人能帮助我。搜索对我也没什么帮助
我从一个客户机向Netty服务器发送Json请求,该客户机故意不使用Netty实现。目前,它只是一个简单的Javasocket,但目的是让Flask客户端向Netty服务器发送请求。请求到达(使用Javasocket和Python Flask),并在管道中得到正确处理,但我想向客户机发送响应,尽管我怀疑在代码中发送响应的位置,但我显然遗漏了一些东西,因为我没有得到任何响应。有什么建议吗
Javasocket客户端(请注意,此处代码段中省略了json1和json2字符串,因为它们相当长,但格式正确)。使用socket和相关输出流发布请求。响应部分(同一个socket的输入流)只是一些我有疑问的测试,但不确定如何做,否则(这就是为什么我把它保留在这里)。我已经看到了很多客户机实现Netty接口的例子,这似乎很好,但正如我所说的,我希望不使用Netty的客户机也能够收到响应(如果可能的话)
String serverResponse;
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 1000; i++) {
try {
Socket s = new Socket("localhost", 12000);
PrintWriter out = new PrintWriter(s.getOutputStream(), true);
out.write(json1 + i + json2);
out.flush();
// Testing only - trying to get the response back from the server
BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
while(true) {
if ((serverResponse = in.readLine()) != null) {
log.info("server says", serverResponse);
break;
}
}
out.close();
s.close();
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
Thread.sleep(2000);
}
MCTcpServer。java
/**
* Abstract TCP Server class. this class should be implemented in the subclass to implement an actual server.
*
* @param <R> The data to be read from the socket.
* @param <W> data to be written (in case of duplex) from the socket.
*/
public abstract class MFTcpServer<R, W> {
protected final AtomicBoolean started;
protected MFTcpServer() {
this.started = new AtomicBoolean();
}
/**
* Start the server.
*
* @param initializer the channel initializers. they will be called when a new client connects to the server.
* @return instance of tcp server
*/
public final MFTcpServer<R, W> start(ChannelInitializer<Channel> initializer) {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Server already started");
}
doStart(initializer);
return this;
}
/**
* Start the server and wait for all the threads to be finished before shutdown.
* @param initializer the channel initializers. they will be called when a new client connects to the server.
*/
public final void startAndAwait(ChannelInitializer<Channel> initializer) {
start(initializer);
awaitShutdown();
}
/**
* Shutdown the server
* @return true if successfully shutdown.
*/
public final boolean shutdown() {
return !started.compareAndSet(true, false) || doShutdown();
}
/**
* Wait for all the threads to be finished before shutdown.
*/
public abstract void awaitShutdown();
/**
* Do the shutdown now.
* @return true if successfully shutdown
*/
public abstract boolean doShutdown();
/**
* start the server
* @param initializer the channel initializers. they will be called when a new client connetcs to the server.
* @return instance of tcp server
*/
public abstract MFTcpServer<R, W> doStart(ChannelInitializer<Channel> initializer);
/**
*
* @return the port where the server is running.
*/
public abstract int getPort();
mfnetty4tcserver。java实际的服务器实现
public class MFNetty4TcpServer<R, W> extends MFTcpServer<R, W> {
private static final Logger logger = LoggerFactory.getLogger(MFNetty4TcpServer.class);
private static final int BOSS_THREAD_POOL_SIZE = 2;
private int port;
private ServerBootstrap bootstrap;
private ChannelFuture bindFuture;
/**
* The constructor.
*
* @param port port where to listen
*/
protected MFNetty4TcpServer(int port) {
this.port = port;
final NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
(BOSS_THREAD_POOL_SIZE));
final NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
(JsonProducerConfig.THREAD_POOL_SIZE));
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class);
}
@Override
public MFNetty4TcpServer<R, W> doStart(ChannelInitializer<Channel> initializer) {
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
if (initializer != null) {
ch.pipeline().addLast(initializer);
}
}
});
try {
bindFuture = bootstrap.bind(port).sync();
if (!bindFuture.isSuccess()) {
// Connection not successful
throw new RuntimeException(bindFuture.cause());
}
SocketAddress localAddress = bindFuture.channel().localAddress();
if (localAddress instanceof InetSocketAddress) {
port = ((InetSocketAddress) localAddress).getPort();
logger.info("Started server at port: " + port);
}
} catch (InterruptedException e) {
logger.error("Error waiting for binding server port: " + port, e);
}
return this;
}
@Override
public void awaitShutdown() {
try {
bindFuture.channel().closeFuture().await();
} catch (InterruptedException e) {
Thread.interrupted(); // Reset the interrupted status
logger.error("Interrupted while waiting for the server socket to close.", e);
}
}
@Override
public boolean doShutdown() {
try {
bindFuture.channel().close().sync();
return true;
} catch (InterruptedException e) {
logger.error("Failed to shutdown the server.", e);
return false;
}
}
@Override
public int getPort() {
return port;
}
/**
* Creates a tcp server at the defined port.
*
* @param port port to listen to
* @param <R> data to be read
* @param <W> data to be written back. Only in case of duplex connection.
* @return instance of tcp server.
*/
public static <R, W> MFTcpServer<R, W> create(int port) {
return new MFNetty4TcpServer<>(port);
}
}
JsonProducerConfig。java管道在这里设置
/**
* Spring Configuration class of the application.
*/
@Configuration
@Import({DatabusConfig.class})
public class JsonProducerConfig {
private static final Logger log = LoggerFactory.getLogger(JsonProducerConfig.class);
public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
public static final String TCP_SERVER = "tcpServer";
public static final String CHANNEL_PIPELINE_INITIALIZER = "channel_initializer";
public static final String MF_KAFKA_PRODUCER = "mf_kafka_producer";
public static final String JSON_AVRO_CONVERTOR = "jsonAvroConvertor";
@Value("#{systemProperties['tcpserver.port']?:'12000'}")
private String tcpServerPort;
@Bean(name = TCP_SERVER)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public MFTcpServer nettyTCPServer() {
return MFNetty4TcpServer.create(Integer.parseInt(tcpServerPort));
}
@Bean(name = MF_KAFKA_PRODUCER)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public MFKafkaProducer pushToKafka() {
return new MFKafkaProducer();
}
@Bean(name = JSON_AVRO_CONVERTOR)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public JsonAvroConvertor jsonAvroConvertor() {
return new JsonAvroConvertor();
}
/**
* This is where the pipeline is set for processing of events.
*
* @param jsonAvroConvertor converts json to avro
* @param kafkaProducer pushes to kafka
* @return chanenl initializers pipeline.
*/
@Bean(name = CHANNEL_PIPELINE_INITIALIZER)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ChannelInitializer<Channel> channelInitializers(JsonAvroConvertor jsonAvroConvertor,
MFKafkaProducer kafkaProducer) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
if (log.isInfoEnabled())
log.info("initChannel - initing channel...");
channel.pipeline().addLast(new NioEventLoopGroup(0, new DefaultEventExecutorGroup(THREAD_POOL_SIZE)));
channel.pipeline().addLast(new JsonObjectDecoder(1048576));
channel.pipeline().addLast(jsonAvroConvertor);
channel.pipeline().addLast(kafkaProducer);
if (log.isInfoEnabled())
log.info("channel = " + channel.toString());
}
};
}
}
JsonProducer。java主程序
public class JsonProducer {
private static final Logger log = LoggerFactory.getLogger(JsonProducer.class);
private static MFTcpServer tcpServer;
/**
* Main startup method
*
* @param args not used
*/
public static void main(String[] args) {
System.setProperty("solschema", "false");
try {
// the shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
if (tcpServer != null) {
tcpServer.shutdown();
}
}
));
AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(JsonProducerConfig.class);
tcpServer = (MFTcpServer) context.getBean(JsonProducerConfig.TCP_SERVER);
ChannelInitializer<Channel> channelInitializer = (ChannelInitializer<Channel>) context.
getBean(JsonProducerConfig.CHANNEL_PIPELINE_INITIALIZER);
tcpServer.startAndAwait(channelInitializer);
} catch (Exception t) {
log.error("Error while starting JsonProducer ", t);
System.exit(-1);
}
}
}
MFKafkaProducer。java作为管道中的最后一个通道。注意ctx。在channelRead方法中使用writeAndFlush(msg),我知道应该在该方法中启动响应。但那之后呢。在未来经营这个渠道时。isSuccess()的计算结果为false。response对象试图创建字符串响应
@ChannelHandler.Sharable
public class MFKafkaProducer extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(MFKafkaProducer.class);
@Resource
ApplicationContext context;
@Resource(name = DatabusConfig.ADMIN)
Admin admin;
private Map<String, IProducer> streams = new HashMap<>();
@PreDestroy
public void stop() {
removeAllStreams(); // then stop writing to producers
}
/**
* @param clickRecord the record to be pushed to kafka
* @throws Exception
*/
public void handle(GenericRecord clickRecord) throws Exception {
Utf8 clientId = null;
try {
clientId = (Utf8) clickRecord.get(SchemaUtil.APP_ID);
stream(producer(clientId.toString()), clickRecord);
} catch (Exception e) {
String message = "Could not push click data for clientId:" + clientId;
log.warn("handle - " + message + "!!!", e);
assert clientId != null;
removeStream(clientId.toString());
}
}
/**
* removes all the streams
*/
private void removeAllStreams() {
Set<String> strings = streams.keySet();
for (String clientId : strings) {
removeStream(clientId);
}
}
/**
* removes a particular stream
*
* @param clientId the stream to be removed
*/
private void removeStream(String clientId) {
Assert.notEmpty(streams);
IProducer producer = streams.get(clientId);
producer.stopProducer();
streams.remove(clientId);
}
/**
* @param producer the producer where data needs to be written
* @param clickRecord teh record to be written
*/
private void stream(IProducer producer, GenericRecord clickRecord) {
producer.send(clickRecord);
}
/**
* This will create a producer in case it is not already created.
* If already created return the already present one
*
* @param clientId stream id
* @return the producer instance
*/
private IProducer producer(String clientId) {
if (streams.containsKey(clientId)) {
return streams.get(clientId);
} else {
IProducer producer = admin.createKeyTopicProducer(SchemaUtil.APP_ID, "test_" + clientId, new ICallback() {
@Override
public void onSuccess(long offset) {
if (log.isInfoEnabled())
log.info("onSuccess - Data at offset:" + offset + " send.");
}
@Override
public void onError(long offset, Exception ex) {
if (log.isInfoEnabled())
log.info("onError - Data at offset:" + offset + " failed. Exception: ", ex);
}
@Override
public void onStreamClosed() {
log.warn("onStreamClosed - Stream:" + clientId + " closed.");
removeStream(clientId);
}
});
producer.startProducer();
streams.put(clientId, producer);
return producer;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.debug("KafkaProducer - channelRead() called with " + "ctx = [" + ctx + "], msg = [" + msg + "]");
if (msg instanceof GenericRecord) {
GenericRecord genericRecord = (GenericRecord) msg;
try {
handle(genericRecord);
log.debug("channelRead sending response");
Charset charset = Charset.defaultCharset();
ByteBuf response = Unpooled.copiedBuffer("Just a response", charset);
ChannelFuture future = ctx.writeAndFlush(msg);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess())
log.info("channelRead - future.operationComplete - Response has been delivered to all channels");
else
log.info("channelRead - future.operationComplete - Response has NOT been delivered to all channels");
}
});
} catch (Exception ex) {
log.error("Something went wrong processing the generic record: " + msg + "\n ", ex);
}
} else {
log.debug("KafkaProducer - msg not of Type Generic Record !!! " + msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
log.error("Something went wrong writing to Kafka: \n", cause);
ctx.close();
}
}
# 1 楼答案
使用ChannelFuture#cause()时,我注意到我并没有序列化ByteBuf对象,而是序列化了一个GenericRecord。使用
GenericRecord将转换为ButeBuf,并使用writeAndFlush方法发送响应
使用Socket实现的测试客户端不知何故从未真正收到响应,但通过使用SocketChannel,这也得到了解决