Skip to content

Commit

Permalink
netty ws
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Oct 30, 2023
1 parent d9db72d commit 4ec9bc8
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;

import java.util.function.Consumer;
Expand All @@ -29,16 +30,17 @@ public void send(String msg) {
}
}

public void connect(String host, int port) {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel channel) throws Exception {
protected void initChannel(@NonNull SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringEncoder());
Expand All @@ -48,8 +50,6 @@ protected void initChannel(SocketChannel channel) throws Exception {
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
Expand All @@ -63,7 +63,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
if (consumer != null) {
consumer.accept(String.valueOf(msg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

Expand All @@ -24,10 +25,14 @@ public class NettySampleServer {
private String group;

public void send(String msg) {
concept.send(msg);
if (msg.endsWith("\n")) {
concept.send(msg);
} else {
concept.send(msg + "\n");
}
}

public void start(int port) {
public void start(int port) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
Expand All @@ -36,8 +41,9 @@ public void start(int port) {
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel channel) throws Exception {
protected void initChannel(@NonNull SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringEncoder());
Expand All @@ -51,8 +57,6 @@ protected void initChannel(SocketChannel channel) throws Exception {
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Throwable e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.linyuzai.connection.loadbalance.netty.sample;

import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.netty.websocket.WebSocketNettyLoadBalanceHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

@Getter
@RequiredArgsConstructor
public class WebSocketNettySampleServer {

private final NettyLoadBalanceConcept concept;

@Setter
private String group;

public void send(String msg) {
concept.send(msg);
}

public void start(String path, int port) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(@NonNull SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
//WebSocketServerCompressionHandler
pipeline.addLast(new WebSocketServerProtocolHandler(path));
if (group == null) {
pipeline.addLast(new WebSocketNettyLoadBalanceHandler(concept));
} else {
pipeline.addLast(new WebSocketNettyLoadBalanceHandler(concept, group));
}
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.github.linyuzai.connection.loadbalance.netty.websocket;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyMessageCodecAdapter;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

public class WebSocketFrameNettyMessageCodecAdapter extends NettyMessageCodecAdapter {

@Override
public MessageEncoder getClientMessageEncoder(MessageEncoder encoder) {
return new WebSocketFrameMessageEncoder(encoder);
}

@Override
public MessageDecoder getClientMessageDecoder(MessageDecoder decoder) {
return new WebSocketFrameMessageDecoder(decoder);
}

@Getter
@RequiredArgsConstructor
public static class WebSocketFrameMessageEncoder implements MessageEncoder {

private final MessageEncoder encoder;

@Override
public Object encode(Message message, Connection connection, ConnectionLoadBalanceConcept concept) {
if (connection instanceof WebSocketNettyConnection) {
Object encoded = encoder.encode(message, connection, concept);
if (encoded instanceof String) {
return new TextWebSocketFrame((String) encoded);
} else if (encoded instanceof byte[]) {
return new BinaryWebSocketFrame(Unpooled.wrappedBuffer((byte[]) encoded));
}
}
return encoder.encode(message, connection, concept);
}
}

@Getter
@RequiredArgsConstructor
public static class WebSocketFrameMessageDecoder implements MessageDecoder {

private final MessageDecoder decoder;

@Override
public Message decode(Object message, Connection connection, ConnectionLoadBalanceConcept concept) {
if (connection instanceof WebSocketNettyConnection) {
if (message instanceof WebSocketFrame) {
if (message instanceof BinaryWebSocketFrame) {
return decoder.decode(((BinaryWebSocketFrame) message).content().array(),
connection, concept);
} else if (message instanceof TextWebSocketFrame) {
return decoder.decode(((TextWebSocketFrame) message).text(),
connection, concept);
}
}
}
return decoder.decode(message, connection, concept);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.github.linyuzai.connection.loadbalance.netty.websocket;

import com.github.linyuzai.connection.loadbalance.core.message.PingMessage;
import com.github.linyuzai.connection.loadbalance.core.message.PongMessage;
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyConnection;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;

import java.util.function.Consumer;

public class WebSocketNettyConnection extends NettyConnection {

public WebSocketNettyConnection(NettyConnection connection) {
super(connection.getChannel());
}

@Override
public void doPing(PingMessage message, Runnable onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
getChannel().writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(message.getPayload())))
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
onSuccess.run();
} else {
Throwable cause = future.cause();
onError.accept(cause);
}
onComplete.run();
});
}

@Override
public void doPong(PongMessage message, Runnable onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
getChannel().writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(message.getPayload())))
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
onSuccess.run();
} else {
Throwable cause = future.cause();
onError.accept(cause);
}
onComplete.run();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.linyuzai.connection.loadbalance.netty.websocket;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyConnection;
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceHandler;
import io.netty.channel.ChannelHandlerContext;

import java.util.Map;

public class WebSocketNettyLoadBalanceHandler extends NettyLoadBalanceHandler {

public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept, Map<Object, Object> metadata) {
super(concept, metadata);
}

public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept) {
super(concept);
}

public WebSocketNettyLoadBalanceHandler(NettyLoadBalanceConcept concept, String group) {
super(concept, group);
}

@Override
protected Connection create(ChannelHandlerContext ctx, Map<Object, Object> metadata) {
Connection connection = super.create(ctx, metadata);
return new WebSocketNettyConnection((NettyConnection) connection);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//TODO ContinuationWebSocketFrame
super.channelRead(ctx, msg);
}
}

0 comments on commit 4ec9bc8

Please sign in to comment.