Skip to content

Commit

Permalink
sse load balance reactive
Browse files Browse the repository at this point in the history
  • Loading branch information
Linyuzai committed Apr 30, 2024
1 parent 7160774 commit da2bcfe
Show file tree
Hide file tree
Showing 38 changed files with 193 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.kafka.KafkaTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.rabbitmq.RabbitFanoutConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.rabbitmq.RabbitMessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.reactive.ReactiveRedisMessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.reactive.ReactiveRedisTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.RedisMessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.RedisTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.reactive.ReactiveRedisMessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redis.reactive.ReactiveRedisTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redisson.RedissonTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redisson.reactive.ReactiveRedissonTopicConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.MasterSlave;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.AbstractMasterSlaveConnectionSubscriber;
import lombok.*;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.linyuzai.connection.loadbalance.autoconfigure.subscribe.redisson;

import com.github.linyuzai.connection.loadbalance.core.concept.AliveForeverConnection;

import lombok.Getter;
import lombok.Setter;
import org.redisson.api.RTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.AbstractMasterSlaveConnectionSubscriber;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.redisson.api.*;
import org.redisson.api.RShardedTopicReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import reactor.core.Disposable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import lombok.Getter;
import lombok.Setter;
import org.redisson.RedissonReactive;
import org.redisson.api.*;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServerManager;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServerManagerFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.*;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.*;
import lombok.Getter;
import lombok.NonNull;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.github.linyuzai.connection.loadbalance.core.concept;

import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategy;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendInterceptor;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategy;
import lombok.NonNull;

import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.linyuzai.connection.loadbalance.core.extension;

import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelector;
import com.github.linyuzai.connection.loadbalance.core.select.MetadataSelector;
import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelector;

/**
* 分组连接选择器。
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.linyuzai.connection.loadbalance.core.extension;

import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelector;
import com.github.linyuzai.connection.loadbalance.core.select.MetadataSelector;
import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelector;

/**
* 用户连接选择器。
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.linyuzai.connection.loadbalance.core.message;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.event.ErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.event.TimestampEvent;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.message.*;
import com.github.linyuzai.connection.loadbalance.core.message.BinaryMessage;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.TextMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptDestroyEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptInitializeEvent;
import com.github.linyuzai.connection.loadbalance.core.scope.AbstractScoped;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscribeErrorEvent;
import lombok.Getter;
import lombok.Setter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.repository.ConnectionRepository;
import com.github.linyuzai.connection.loadbalance.core.scope.Scoped;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionCloseInterceptor;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.message.*;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendInterceptor;
import com.github.linyuzai.connection.loadbalance.core.message.PingMessage;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.extension.GroupSelector;
import io.netty.channel.*;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import com.github.linyuzai.connection.loadbalance.netty.concept.NettyLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.netty.websocket.WebSocketNettyLoadBalanceHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package com.github.linyuzai.connection.loadbalance.sse.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.AbstractConnection;
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.message.PingMessage;
import com.github.linyuzai.connection.loadbalance.core.message.PongMessage;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import lombok.Getter;
import lombok.Setter;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Consumer;

import static com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer.LB_HOST_PORT;

@Getter
@Setter
public abstract class SseConnection extends AbstractConnection {
Expand All @@ -32,4 +38,50 @@ public void doPing(PingMessage message, Runnable onSuccess, Consumer<Throwable>
public void doPong(PongMessage message, Runnable onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
onComplete.run();
}

public void closeObservable() {
if (isObservableType()) {
String lbHostPost = (String) getMetadata().get(LB_HOST_PORT);
if (lbHostPost == null) {
return;
}
concept.onClose(this, Close.NOT_ALIVE);
Collection<Connection> connections = concept.getConnectionRepository()
.select(Type.SUBSCRIBER);
for (Connection connection : connections) {
ConnectionServer server = (ConnectionServer) connection.getMetadata().get(ConnectionServer.class);
if (server == null) {
continue;
}
String url = ConnectionServer.url(server);
if (Objects.equals(url, lbHostPost)) {
concept.onClose(connection, Close.NOT_ALIVE);
break;
}
}
}
}

public void closeSubscriber() {
if (isSubscriberType()) {
ConnectionServer server = (ConnectionServer) getMetadata().get(ConnectionServer.class);
if (server == null) {
return;
}
concept.onClose(this, Close.NOT_ALIVE);
Collection<Connection> connections = concept.getConnectionRepository()
.select(Type.OBSERVABLE);
for (Connection connection : connections) {
String lbHostPost = (String) getMetadata().get(LB_HOST_PORT);
if (lbHostPost == null) {
continue;
}
String url = ConnectionServer.url(server);
if (Objects.equals(url, lbHostPost)) {
concept.onClose(connection, Close.NOT_ALIVE);
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.github.linyuzai.connection.loadbalance.sse.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEstablishEvent;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ProtocolConnectionSubscriber;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer.LB_HOST_PORT;

Expand All @@ -27,7 +23,7 @@ public abstract class SseConnectionSubscriber<T extends SseConnection>

private String protocol = "http";

private Map<String, Boolean> connectingServers = new ConcurrentHashMap<>();
//private Map<String, Boolean> connectingServers = new ConcurrentHashMap<>();

/*@Override
public boolean interceptSubscribe(ConnectionServer server, ConnectionLoadBalanceConcept concept) {
Expand All @@ -44,6 +40,7 @@ public boolean interceptSubscribe(ConnectionServer server, ConnectionLoadBalance
}
}*/

@Override
public Map<String, String> getParams(ConnectionLoadBalanceConcept concept) {
ConnectionServer local = concept.getConnectionServerManager().getLocal();
String params = ConnectionServer.url(local);
Expand All @@ -59,7 +56,7 @@ public String getEndpoint() {

public abstract String getType();

@Deprecated
/*@Deprecated
@Getter
@RequiredArgsConstructor
public class ConnectingListener implements SseEventListener {
Expand All @@ -79,5 +76,5 @@ public void onEvent(Object event, ConnectionLoadBalanceConcept concept) {
}
}
}
}
}*/
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.linyuzai.connection.loadbalance.sse.reactive;

import com.github.linyuzai.connection.loadbalance.core.message.MessageTransportException;
import com.github.linyuzai.connection.loadbalance.sse.concept.SseConnection;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.publisher.FluxSink;

import java.util.function.Consumer;
Expand All @@ -17,17 +19,32 @@ public class ReactiveSseConnection extends SseConnection {
@SuppressWarnings("unchecked")
@Override
public void doSend(Object message, Runnable onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
if (message instanceof ServerSentEvent) {
fluxSink.next((ServerSentEvent<Object>) message);
} else {
//fluxSink.next()
try {
if (message instanceof ServerSentEvent) {
fluxSink.next((ServerSentEvent<Object>) message);
onSuccess.run();
} else {
throw new IllegalArgumentException("Message is not a ServerSentEvent");
}
} catch (WebClientException e) {
closeObservable();
onError.accept(new MessageTransportException(e));
} catch (Throwable e) {
onError.accept(e);
} finally {
onComplete.run();
}
}

@Override
public void doClose(Object reason, Runnable onSuccess, Consumer<Throwable> onError, Runnable onComplete) {
fluxSink.complete();
//onSuccess.run();
//onComplete.run();
try {
fluxSink.complete();
onSuccess.run();
} catch (Throwable e) {
onError.accept(e);
} finally {
onComplete.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ public void doSubscribe(URI uri, ConnectionLoadBalanceConcept concept, Consumer<
.retrieve()
.bodyToFlux(String.class)
.doOnSubscribe(subscription -> onSuccess.accept(connection))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
}, onError, onComplete);
.subscribe(message -> concept.onMessage(connection, message), e -> {
connection.closeSubscriber();
onError.accept(e);
}, onComplete);
connection.setDisposable(disposable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.util.Map;

@Getter
@Setter
@RestController
Expand All @@ -26,13 +31,14 @@ public class ReactiveSseLoadBalanceEndpoint {
private final SseLoadBalanceConcept concept;

@GetMapping
public Flux<ServerSentEvent<Object>> loadBalanceEndpoint() {
Object id = sseIdGenerator.generateId(null);
public Flux<ServerSentEvent<Object>> loadBalanceEndpoint(@RequestParam Map<Object, Object> params) {
Object id = sseIdGenerator.generateId(params);
return sseFluxFactory.create(Flux.create((FluxSink<ServerSentEvent<Object>> fluxSink) -> {
ReactiveSseCreateRequest request = new ReactiveSseCreateRequest(id, SseLoadBalanceConcept.SUBSCRIBER_ENDPOINT, fluxSink);
ReactiveSseConnection connection = new ReactiveSseConnection(fluxSink);
connection.setCreateRequest(request);
connection.setType(Connection.Type.OBSERVABLE);
connection.getMetadata().putAll(params);
concept.onEstablish(connection);
}))
.doOnError(it -> concept.onError(id, Connection.Type.OBSERVABLE, it))
Expand Down
Loading

0 comments on commit da2bcfe

Please sign in to comment.