From 4ec9bc894e704f014e9d7e74f3c1416f227fd261 Mon Sep 17 00:00:00 2001 From: tanghanzheng Date: Mon, 30 Oct 2023 16:21:31 +0800 Subject: [PATCH] netty ws --- .../netty/sample/NettySampleClient.java | 10 +-- .../netty/sample/NettySampleServer.java | 14 ++-- .../sample/WebSocketNettySampleServer.java | 61 ++++++++++++++++ ...ebSocketFrameNettyMessageCodecAdapter.java | 70 +++++++++++++++++++ .../websocket/WebSocketNettyConnection.java | 46 ++++++++++++ .../WebSocketNettyLoadBalanceHandler.java | 36 ++++++++++ 6 files changed, 227 insertions(+), 10 deletions(-) create mode 100644 concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/WebSocketNettySampleServer.java create mode 100644 concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketFrameNettyMessageCodecAdapter.java create mode 100644 concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyConnection.java create mode 100644 concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyLoadBalanceHandler.java diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleClient.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleClient.java index 6c009f624..5111938ef 100644 --- a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleClient.java +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleClient.java @@ -9,6 +9,7 @@ import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import java.util.function.Consumer; @@ -29,7 +30,7 @@ public void send(String msg) { } } - public void connect(String host, int port) { + public void connect(String host, int port) throws InterruptedException { EventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -37,8 +38,9 @@ public void connect(String host, int port) { .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { + @Override - protected void initChannel(SocketChannel channel) throws Exception { + protected void initChannel(@NonNull SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringEncoder()); @@ -48,8 +50,6 @@ protected void initChannel(SocketChannel channel) throws Exception { }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); - } catch (InterruptedException e) { - e.printStackTrace(); } finally { worker.shutdownGracefully(); } @@ -63,7 +63,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception { if (consumer != null) { consumer.accept(String.valueOf(msg)); } diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleServer.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleServer.java index afdf0c223..a36182747 100644 --- a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleServer.java +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/NettySampleServer.java @@ -11,6 +11,7 @@ import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -24,10 +25,14 @@ public class NettySampleServer { private String group; public void send(String msg) { - concept.send(msg); + if (msg.endsWith("\n")) { + concept.send(msg); + } else { + concept.send(msg + "\n"); + } } - public void start(int port) { + public void start(int port) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { @@ -36,8 +41,9 @@ public void start(int port) { .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { + @Override - protected void initChannel(SocketChannel channel) throws Exception { + protected void initChannel(@NonNull SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringEncoder()); @@ -51,8 +57,6 @@ protected void initChannel(SocketChannel channel) throws Exception { }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); - } catch (Throwable e) { - e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/WebSocketNettySampleServer.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/WebSocketNettySampleServer.java new file mode 100644 index 000000000..397e828a4 --- /dev/null +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/sample/WebSocketNettySampleServer.java @@ -0,0 +1,61 @@ +package com.github.linyuzai.connection.loadbalance.netty.sample; + +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceConcept; +import com.github.linyuzai.connection.loadbalance.netty.websocket.WebSocketNettyLoadBalanceHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + +@Getter +@RequiredArgsConstructor +public class WebSocketNettySampleServer { + + private final NettyLoadBalanceConcept concept; + + @Setter + private String group; + + public void send(String msg) { + concept.send(msg); + } + + public void start(String path, int port) throws InterruptedException { + EventLoopGroup boss = new NioEventLoopGroup(1); + EventLoopGroup worker = new NioEventLoopGroup(); + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(boss, worker) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(@NonNull SocketChannel channel) { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(65536)); + //WebSocketServerCompressionHandler + pipeline.addLast(new WebSocketServerProtocolHandler(path)); + if (group == null) { + pipeline.addLast(new WebSocketNettyLoadBalanceHandler(concept)); + } else { + pipeline.addLast(new WebSocketNettyLoadBalanceHandler(concept, group)); + } + } + }); + ChannelFuture future = bootstrap.bind(port).sync(); + future.channel().closeFuture().sync(); + } finally { + boss.shutdownGracefully(); + worker.shutdownGracefully(); + } + } +} diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketFrameNettyMessageCodecAdapter.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketFrameNettyMessageCodecAdapter.java new file mode 100644 index 000000000..759aab3ad --- /dev/null +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketFrameNettyMessageCodecAdapter.java @@ -0,0 +1,70 @@ +package com.github.linyuzai.connection.loadbalance.netty.websocket; + +import com.github.linyuzai.connection.loadbalance.core.concept.Connection; +import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept; +import com.github.linyuzai.connection.loadbalance.core.message.Message; +import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder; +import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder; +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyMessageCodecAdapter; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +public class WebSocketFrameNettyMessageCodecAdapter extends NettyMessageCodecAdapter { + + @Override + public MessageEncoder getClientMessageEncoder(MessageEncoder encoder) { + return new WebSocketFrameMessageEncoder(encoder); + } + + @Override + public MessageDecoder getClientMessageDecoder(MessageDecoder decoder) { + return new WebSocketFrameMessageDecoder(decoder); + } + + @Getter + @RequiredArgsConstructor + public static class WebSocketFrameMessageEncoder implements MessageEncoder { + + private final MessageEncoder encoder; + + @Override + public Object encode(Message message, Connection connection, ConnectionLoadBalanceConcept concept) { + if (connection instanceof WebSocketNettyConnection) { + Object encoded = encoder.encode(message, connection, concept); + if (encoded instanceof String) { + return new TextWebSocketFrame((String) encoded); + } else if (encoded instanceof byte[]) { + return new BinaryWebSocketFrame(Unpooled.wrappedBuffer((byte[]) encoded)); + } + } + return encoder.encode(message, connection, concept); + } + } + + @Getter + @RequiredArgsConstructor + public static class WebSocketFrameMessageDecoder implements MessageDecoder { + + private final MessageDecoder decoder; + + @Override + public Message decode(Object message, Connection connection, ConnectionLoadBalanceConcept concept) { + if (connection instanceof WebSocketNettyConnection) { + if (message instanceof WebSocketFrame) { + if (message instanceof BinaryWebSocketFrame) { + return decoder.decode(((BinaryWebSocketFrame) message).content().array(), + connection, concept); + } else if (message instanceof TextWebSocketFrame) { + return decoder.decode(((TextWebSocketFrame) message).text(), + connection, concept); + } + } + } + return decoder.decode(message, connection, concept); + } + } +} diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyConnection.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyConnection.java new file mode 100644 index 000000000..b63008316 --- /dev/null +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyConnection.java @@ -0,0 +1,46 @@ +package com.github.linyuzai.connection.loadbalance.netty.websocket; + +import com.github.linyuzai.connection.loadbalance.core.message.PingMessage; +import com.github.linyuzai.connection.loadbalance.core.message.PongMessage; +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyConnection; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; + +import java.util.function.Consumer; + +public class WebSocketNettyConnection extends NettyConnection { + + public WebSocketNettyConnection(NettyConnection connection) { + super(connection.getChannel()); + } + + @Override + public void doPing(PingMessage message, Runnable onSuccess, Consumer onError, Runnable onComplete) { + getChannel().writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(message.getPayload()))) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + onSuccess.run(); + } else { + Throwable cause = future.cause(); + onError.accept(cause); + } + onComplete.run(); + }); + } + + @Override + public void doPong(PongMessage message, Runnable onSuccess, Consumer onError, Runnable onComplete) { + getChannel().writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(message.getPayload()))) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + onSuccess.run(); + } else { + Throwable cause = future.cause(); + onError.accept(cause); + } + onComplete.run(); + }); + } +} diff --git a/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyLoadBalanceHandler.java b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyLoadBalanceHandler.java new file mode 100644 index 000000000..1b0cc7d3d --- /dev/null +++ b/concept-connection-loadbalance/concept-connection-loadbalance-netty/src/main/java/com/github/linyuzai/connection/loadbalance/netty/websocket/WebSocketNettyLoadBalanceHandler.java @@ -0,0 +1,36 @@ +package com.github.linyuzai.connection.loadbalance.netty.websocket; + +import com.github.linyuzai.connection.loadbalance.core.concept.Connection; +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyConnection; +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceConcept; +import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceHandler; +import io.netty.channel.ChannelHandlerContext; + +import java.util.Map; + +public class WebSocketNettyLoadBalanceHandler extends NettyLoadBalanceHandler { + + public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept, Map metadata) { + super(concept, metadata); + } + + public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept) { + super(concept); + } + + public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept, String group) { + super(concept, group); + } + + @Override + protected Connection create(ChannelHandlerContext ctx, Map metadata) { + Connection connection = super.create(ctx, metadata); + return new WebSocketNettyConnection((NettyConnection) connection); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //TODO ContinuationWebSocketFrame + super.channelRead(ctx, msg); + } +}