Skip to content

Commit

Permalink
新增(Servlet/Reactive)WebSocketClientFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Oct 8, 2023
1 parent 4b429f8 commit 9bda3b5
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public Message decode(Object message, Connection connection, ConnectionLoadBalan
} else if (message instanceof ByteBuffer) {
return new BinaryMessage((ByteBuffer) message);
} else if (message instanceof byte[]) {
return new BinaryMessage(ByteBuffer.wrap((byte[]) message));
return new BinaryMessage((byte[]) message);
} else {
throw new MessageDecodeException(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.linyuzai.connection.loadbalance.websocket.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.AbstractConnectionLoadBalanceConcept;
import lombok.Getter;

/**
* ws 负载均衡概念。
Expand Down Expand Up @@ -33,12 +34,9 @@ public static String formatPrefix(String prefix) {
return builder.toString();
}

@Getter
private static WebSocketLoadBalanceConcept instance;

public static WebSocketLoadBalanceConcept getInstance() {
return instance;
}

public void holdInstance() {
instance = this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.github.linyuzai.connection.loadbalance.websocket.reactive;

import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketLoadBalanceException;
import lombok.SneakyThrows;
import org.springframework.util.ClassUtils;
import org.springframework.web.reactive.socket.client.*;
import org.xnio.OptionMap;
import org.xnio.Xnio;
import org.xnio.XnioWorker;

public class DefaultReactiveWebSocketClientFactory implements ReactiveWebSocketClientFactory {

private static final boolean tomcatPresent;

private static final boolean jettyPresent;

private static final boolean undertowPresent;

private static final boolean reactorNettyPresent;

static {
ClassLoader loader = ReactiveWebSocketConnectionSubscriber.class.getClassLoader();
tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.WsWebSocketContainer", loader);
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.client.WebSocketClient", loader);
undertowPresent = ClassUtils.isPresent("io.undertow.websockets.core.WebSocketChannel", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.websocket.WebsocketInbound", loader);
}

@SneakyThrows
@Override
public WebSocketClient create() {
if (reactorNettyPresent) {
return new ReactorNettyWebSocketClient();
} else if (undertowPresent) {
XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.builder().getMap());
return new UndertowWebSocketClient(worker);
} else if (jettyPresent) {
return new JettyWebSocketClient();
} else if (tomcatPresent) {
return new TomcatWebSocketClient();
} else {
throw new WebSocketLoadBalanceException("No suitable client found");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.linyuzai.connection.loadbalance.websocket.reactive;

import org.springframework.web.reactive.socket.client.WebSocketClient;

public interface ReactiveWebSocketClientFactory {

WebSocketClient create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketLoadBalanceException;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.util.ClassUtils;
import lombok.*;
import org.springframework.web.reactive.socket.client.*;
import org.xnio.OptionMap;
import org.xnio.Xnio;
import org.xnio.XnioWorker;

import java.net.URI;
import java.util.function.Consumer;
Expand All @@ -20,56 +14,29 @@
* <p>
* {@link ReactiveWebSocketConnection} connection subscriber.
*/
@NoArgsConstructor
@Getter
@RequiredArgsConstructor
public class ReactiveWebSocketConnectionSubscriber extends
WebSocketConnectionSubscriber<ReactiveWebSocketConnection> {

private static final boolean tomcatPresent;

private static final boolean jettyPresent;

private static final boolean undertowPresent;

private static final boolean reactorNettyPresent;

static {
ClassLoader loader = ReactiveWebSocketConnectionSubscriber.class.getClassLoader();
tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.WsWebSocketContainer", loader);
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.client.WebSocketClient", loader);
undertowPresent = ClassUtils.isPresent("io.undertow.websockets.core.WebSocketChannel", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.websocket.WebsocketInbound", loader);
}
private final ReactiveWebSocketClientFactory webSocketClientFactory;

@Override
public void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept,
Consumer<ReactiveWebSocketConnection> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete) {
WebSocketClient client = newWebSocketClient();
WebSocketClient client = webSocketClientFactory.create();
ReactiveWebSocketSubscriberHandler handler =
new ReactiveWebSocketSubscriberHandler(concept, (session, sink) -> {
ReactiveWebSocketConnection connection = new ReactiveWebSocketConnection(session, sink);
connection.setType(Connection.Type.SUBSCRIBER);
onSuccess.accept(connection);
});
client.execute(uri, handler).subscribe(unused -> {
}, onError, onComplete);
}

@SneakyThrows
public WebSocketClient newWebSocketClient() {
if (reactorNettyPresent) {
return new ReactorNettyWebSocketClient();
} else if (undertowPresent) {
XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.builder().getMap());
return new UndertowWebSocketClient(worker);
} else if (jettyPresent) {
return new JettyWebSocketClient();
} else if (tomcatPresent) {
return new TomcatWebSocketClient();
} else {
throw new WebSocketLoadBalanceException("No suitable client found");
}
client.execute(uri, handler)
.doOnError(onError)
.doOnTerminate(onComplete)
.subscribe();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriberFactory;
import lombok.Getter;
import lombok.Setter;

/**
* Reactive WebSocket 连接订阅器工厂。
* <p>
* Reactive WebSocket connection subscriber factory.
*/
@Getter
@Setter
public class ReactiveWebSocketConnectionSubscriberFactory extends WebSocketConnectionSubscriberFactory<ReactiveWebSocketConnection> {

private ReactiveWebSocketClientFactory webSocketClientFactory;

@Override
public WebSocketConnectionSubscriber<ReactiveWebSocketConnection> doCreate(String scope) {
return new ReactiveWebSocketConnectionSubscriber();
return new ReactiveWebSocketConnectionSubscriber(webSocketClientFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.linyuzai.connection.loadbalance.websocket.servlet;

import org.springframework.util.ClassUtils;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

public class DefaultServletWebSocketClientFactory implements ServletWebSocketClientFactory {

private static final boolean jettyPresent;

static {
ClassLoader loader = ServletWebSocketConnectionSubscriber.class.getClassLoader();
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.client.WebSocketClient", loader);
}

@Override
public WebSocketClient create() {
if (jettyPresent) {
return new JettyWebSocketClient();
} else {
return new StandardWebSocketClient();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.linyuzai.connection.loadbalance.websocket.servlet;

import org.springframework.web.socket.client.WebSocketClient;

public interface ServletWebSocketClientFactory {

WebSocketClient create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.websocket.javax.ContainerWebSocketConnectionSubscriber;
import lombok.NoArgsConstructor;
import org.springframework.util.ClassUtils;
import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriber;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.WebSocketConnectionManager;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

import java.net.URI;
import java.util.function.Consumer;
Expand All @@ -18,24 +16,20 @@
* <p>
* {@link ServletWebSocketConnection} connection subscriber.
*/
@NoArgsConstructor
@Getter
@RequiredArgsConstructor
public class ServletWebSocketConnectionSubscriber extends
ContainerWebSocketConnectionSubscriber<ServletWebSocketConnection> {
WebSocketConnectionSubscriber<ServletWebSocketConnection> {

private static final boolean jettyPresent;

static {
ClassLoader loader = ServletWebSocketConnectionSubscriber.class.getClassLoader();
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.client.WebSocketClient", loader);
}
private final ServletWebSocketClientFactory webSocketClientFactory;

@Override
public void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept,
Consumer<ServletWebSocketConnection> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete) {
try {
WebSocketClient client = newWebSocketClient();
WebSocketClient client = webSocketClientFactory.create();
ServletWebSocketSubscriberHandler handler = new ServletWebSocketSubscriberHandler(concept, session -> {
ServletWebSocketConnection connection = new ServletWebSocketConnection(session);
connection.setType(Connection.Type.SUBSCRIBER);
Expand All @@ -50,14 +44,6 @@ public void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept,
}
}

public WebSocketClient newWebSocketClient() {
if (jettyPresent) {
return new JettyWebSocketClient();
} else {
return new StandardWebSocketClient(getContainer());
}
}

@Override
public String getType() {
return "servlet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.websocket.concept.WebSocketConnectionSubscriberFactory;
import lombok.Getter;
import lombok.Setter;

/**
* {@link ServletWebSocketConnection} 订阅者工厂。
* <p>
* {@link ServletWebSocketConnection} subscriber factory.
*/
@Getter
@Setter
public class ServletWebSocketConnectionSubscriberFactory extends WebSocketConnectionSubscriberFactory<ServletWebSocketConnection> {

private ServletWebSocketClientFactory webSocketClientFactory;

@Override
public WebSocketConnectionSubscriber<ServletWebSocketConnection> doCreate(String scope) {
return new ServletWebSocketConnectionSubscriber();
return new ServletWebSocketConnectionSubscriber(webSocketClientFactory);
}
}
Loading

0 comments on commit 9bda3b5

Please sign in to comment.