Skip to content

Commit

Permalink
connection loadbalance sse
Browse files Browse the repository at this point in the history
  • Loading branch information
Linyuzai committed Feb 26, 2024
1 parent bc3aeec commit 93ca3f4
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.linyuzai.connection.loadbalance.core.subscribe;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;

public abstract class ProtocolConnectionSubscriber<T extends Connection> extends ServerInstanceConnectionSubscriber<T> {

public abstract void setProtocol(String protocol);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.linyuzai.connection.loadbalance.core.subscribe;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public abstract class ProtocolConnectionSubscriberFactory<T extends Connection> extends AbstractConnectionSubscriberFactory {

private String protocol;

@Override
public ConnectionSubscriber create(String scope) {
ProtocolConnectionSubscriber<T> subscriber = doCreate(scope);
if (protocol != null && !protocol.isEmpty()) {
subscriber.setProtocol(protocol);
}
return subscriber;
}

public abstract ProtocolConnectionSubscriber<T> doCreate(String scope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,22 @@ public Connection getSubscriberConnection(ConnectionServer server, ConnectionLoa
return null;
}

public abstract void doSubscribe(ConnectionServer server, ConnectionLoadBalanceConcept concept,
/*public abstract void doSubscribe(ConnectionServer server, ConnectionLoadBalanceConcept concept,
Consumer<T> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete);*/

public void doSubscribe(ConnectionServer server, ConnectionLoadBalanceConcept concept,
Consumer<T> onSuccess, Consumer<Throwable> onError,
Runnable onComplete) {
URI uri = getUri(server);
doSubscribe(uri, concept, connection -> {
connection.getMetadata().put(ConnectionServer.class, server);
onSuccess.accept(connection);
}, onError, onComplete);
}

public abstract void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept,
Consumer<T> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.linyuzai.connection.loadbalance.sse.concept;

import com.github.linyuzai.connection.loadbalance.core.subscribe.ProtocolConnectionSubscriber;
import lombok.Getter;
import lombok.Setter;

/**
* SSE 连接订阅者。
* <p>
* SSE connection subscriber.
*/
@Getter
@Setter
public abstract class SseConnectionSubscriber<T extends SseConnection>
extends ProtocolConnectionSubscriber<T> {

private String protocol = "http";

@Override
public String getEndpoint() {
return SseLoadBalanceConcept.SUBSCRIBER_ENDPOINT;
}

public abstract String getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.linyuzai.connection.loadbalance.sse.concept;

import com.github.linyuzai.connection.loadbalance.core.subscribe.ProtocolConnectionSubscriberFactory;

/**
* SSE 连接订阅者工厂抽象类。
* <p>
* Abstract class of SSE connection subscriber factory.
*/
public abstract class SseConnectionSubscriberFactory<T extends SseConnection>
extends ProtocolConnectionSubscriberFactory<T> {

public SseConnectionSubscriberFactory() {
addScopes(SseScoped.NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ public class SseLoadBalanceConcept extends AbstractConnectionLoadBalanceConcept

public static final String ID = "sse";

/**
* 服务间订阅端点
*/
public static final String SUBSCRIBER_ENDPOINT = "/concept-sse-subscriber";

@Override
public String getId() {
return ID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.linyuzai.connection.loadbalance.sse.reactive;

import org.springframework.web.reactive.function.client.WebClient;

public interface ReactiveSseClientFactory {

WebClient create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.linyuzai.connection.loadbalance.sse.reactive;

import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.sse.concept.SseConnectionSubscriber;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.web.reactive.function.client.WebClient;

import java.net.URI;
import java.util.function.Consumer;

@Getter
@RequiredArgsConstructor
public class ReactiveSseConnectionSubscriber extends SseConnectionSubscriber<ReactiveSseConnection> {

private final ReactiveSseClientFactory sseClientFactory;

@Override
public void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept, Consumer<ReactiveSseConnection> onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
WebClient webClient = sseClientFactory.create();
webClient.get()
.uri(uri).retrieve()
.bodyToFlux(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
}, onError, onComplete);
}

@Override
public String getType() {
return "reactive";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.linyuzai.connection.loadbalance.sse.reactive;

import com.github.linyuzai.connection.loadbalance.sse.concept.SseConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.sse.concept.SseConnectionSubscriberFactory;
import lombok.Getter;
import lombok.Setter;

/**
* Reactive SSE 连接订阅器工厂。
* <p>
* Reactive SSE connection subscriber factory.
*/
@Getter
@Setter
public class ReactiveSseConnectionSubscriberFactory extends SseConnectionSubscriberFactory<ReactiveSseConnection> {

private ReactiveSseClientFactory sseClientFactory;

@Override
public SseConnectionSubscriber<ReactiveSseConnection> doCreate(String scope) {
return new ReactiveSseConnectionSubscriber(sseClientFactory);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package com.github.linyuzai.connection.loadbalance.websocket.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ServerInstanceConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ProtocolConnectionSubscriber;
import lombok.Getter;
import lombok.Setter;

import java.net.URI;
import java.util.function.Consumer;

/**
* ws 连接订阅者。
* <p>
Expand All @@ -17,26 +12,10 @@
@Getter
@Setter
public abstract class WebSocketConnectionSubscriber<T extends WebSocketConnection>
extends ServerInstanceConnectionSubscriber<T> {
extends ProtocolConnectionSubscriber<T> {

private String protocol = "ws";

@Override
public void doSubscribe(ConnectionServer server, ConnectionLoadBalanceConcept concept,
Consumer<T> onSuccess, Consumer<Throwable> onError,
Runnable onComplete) {
URI uri = getUri(server);
doSubscribe(uri, concept, connection -> {
connection.getMetadata().put(ConnectionServer.class, server);
onSuccess.accept(connection);
}, onError, onComplete);
}

public abstract void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept,
Consumer<T> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete);

@Override
public String getEndpoint() {
return WebSocketLoadBalanceConcept.SUBSCRIBER_ENDPOINT;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
package com.github.linyuzai.connection.loadbalance.websocket.concept;

import com.github.linyuzai.connection.loadbalance.core.subscribe.AbstractConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber;
import lombok.Getter;
import lombok.Setter;
import org.springframework.util.StringUtils;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ProtocolConnectionSubscriberFactory;

/**
* ws 连接订阅者工厂抽象类。
* <p>
* Abstract class of ws connection subscriber factory.
*/
@Getter
@Setter
public abstract class WebSocketConnectionSubscriberFactory<T extends WebSocketConnection>
extends AbstractConnectionSubscriberFactory {

private String protocol;
extends ProtocolConnectionSubscriberFactory<T> {

public WebSocketConnectionSubscriberFactory() {
addScopes(WebSocketScoped.NAME);
}

@Override
public ConnectionSubscriber create(String scope) {
WebSocketConnectionSubscriber<T> subscriber = doCreate(scope);
if (StringUtils.hasText(protocol)) {
subscriber.setProtocol(protocol);
}
return subscriber;
}

public abstract WebSocketConnectionSubscriber<T> doCreate(String scope);
}

0 comments on commit 93ca3f4

Please sign in to comment.