Skip to content

Commit

Permalink
注释补充
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Jul 17, 2023
1 parent 0509b33 commit 13f21a4
Show file tree
Hide file tree
Showing 24 changed files with 235 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import java.util.Objects;
import java.util.function.Consumer;

/**
* 连接订阅者抽象类。
* <p>
* Abstract class of {@link ConnectionSubscriber}.
*/
public abstract class AbstractConnectionSubscriber implements ConnectionSubscriber {

public static final String DELIMITER = "_";
Expand All @@ -23,6 +28,7 @@ public void subscribe(Consumer<Connection> onSuccess,
ConnectionLoadBalanceConcept concept) {
ConnectionServer local = concept.getConnectionServerManager().getLocal();
//单体应用不需要转发
//Single application does not need to forward
if (local == null) {
onComplete.run();
return;
Expand All @@ -39,9 +45,11 @@ public void subscribe(Consumer<Connection> onSuccess,
if (existSubscriber != null) {
if (existSubscriber.isAlive()) {
//如果连接还存活则直接返回
//If the connection is still alive, just return
return;
} else {
//否则关闭连接
//Otherwise, close the connection
existSubscriber.close(Connection.Close.NOT_ALIVE);
}
}
Expand All @@ -60,9 +68,11 @@ public void subscribe(Consumer<Connection> onSuccess,
if (existObservable != null) {
if (existObservable.isAlive()) {
//如果连接还存活则直接返回
//If the connection is still alive, just return
return;
} else {
//否则关闭连接
//Otherwise, close the connection
existObservable.close(Connection.Close.NOT_ALIVE);
}
}
Expand All @@ -78,7 +88,7 @@ public void subscribe(Consumer<Connection> onSuccess,
}

/**
* LBConnection_[websocket/netty]_${serviceId}_${host:port}_[redisson/redis/rabbit/kakfa]
* LBConnection_[websocket/netty]_${serviceId}_${host:port}_[redisson/redis/rabbit/kafka]
*/
protected String getId(String topic, String from, ConnectionServer subscribe) {
return topic + DELIMITER + from + DELIMITER + subscribe.getServiceId();
Expand Down Expand Up @@ -108,15 +118,30 @@ protected MessageIdempotentVerifier getMessageIdempotentVerifier(ConnectionLoadB
return concept.getMessageIdempotentVerifier();
}

/**
* 创建订阅者连接。
* <p>
* Create subscriber connection.
*/
protected abstract Connection createSubscriber(String id,
String topic,
Map<Object, Object> context,
ConnectionLoadBalanceConcept concept);

/**
* 创建可观察者连接。
* <p>
* Create observable connection.
*/
protected abstract Connection createObservable(String id,
String topic,
Map<Object, Object> context,
ConnectionLoadBalanceConcept concept);

/**
* 获取订阅者连接服务。
* <p>
* Get subscriber connection server.
*/
protected abstract ConnectionServer getSubscribeServer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import com.github.linyuzai.connection.loadbalance.core.scope.AbstractScopedFactory;

/**
* 连接订阅者工厂。
* <p>
* Factory of {@link ConnectionSubscriber}.
*/
public abstract class AbstractConnectionSubscriberFactory extends AbstractScopedFactory<ConnectionSubscriber>
implements ConnectionSubscriberFactory {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import lombok.Getter;

/**
* 连接订阅异常。
* <p>
* Connection subscribe exception.
*/
@Getter
public class ConnectionServerSubscribeException extends ConnectionLoadBalanceException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import com.github.linyuzai.connection.loadbalance.core.event.ErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.event.TimestampEvent;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* 连接订阅异常事件
* 连接订阅异常事件。
* <p>
* Event will be published when subscribe error.
*/
@Getter
@AllArgsConstructor
@RequiredArgsConstructor
public class ConnectionSubscribeErrorEvent extends TimestampEvent implements ErrorEvent {

private final Throwable error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;

/**
* 连接订阅处理器
* 连接订阅处理器。
* 当接收到服务实例信息后,对该服务实例反向连接。
* <p>
* 当接收到服务实例信息后
* <p>
* 对该服务实例反向连接
* Connection subscribe handler.
* When receive server instance info, reverse connect to the server instance.
*/
public class ConnectionSubscribeHandler extends AbstractScoped implements MessageReceiveEventListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;

/**
* 连接订阅日志
* 连接订阅日志。
* <p>
* Connection subscribe logger.
*/
public class ConnectionSubscribeLogger extends AbstractScoped implements ConnectionEventListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,77 @@
import java.util.function.Consumer;

/**
* 连接订阅者
* 连接订阅者。
* 可以理解为服务实例对其他服务的消息进行监听。
* <p>
* 可以理解为服务实例对其他服务的消息进行监听
* <p>
* 或是监听 Redis 和 MQ
* Subscriber of connection.
* It can be understood that the service instance listens to the messages of other services.
*/
public interface ConnectionSubscriber {

/**
* 订阅成功。
* <p>
* Subscribe success.
*/
static Consumer<Connection> onSubscribeSuccess(ConnectionLoadBalanceConcept concept) {
return concept::onEstablish;
}

/**
* 订阅失败。
* <p>
* Subscribe error.
*/
static Consumer<Throwable> onSubscribeError(ConnectionLoadBalanceConcept concept) {
return e -> concept.getEventPublisher().publish(new ConnectionSubscribeErrorEvent(e));
}

/**
* 订阅。
* <p>
* Subscribe.
*/
default void subscribe(ConnectionLoadBalanceConcept concept) {
subscribe(onSubscribeSuccess(concept), onSubscribeError(concept), () -> {
}, concept);
}

/**
* 订阅。
* <p>
* Subscribe.
*/
default void subscribe() {
subscribe(null);
}

/**
* 订阅。
* <p>
* Subscribe.
*/
void subscribe(Consumer<Connection> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete,
ConnectionLoadBalanceConcept concept);

/**
* 订阅。
* <p>
* Subscribe.
*/
default void subscribe(Consumer<Connection> onSuccess,
Consumer<Throwable> onError,
Runnable onComplete) {
subscribe(onSuccess, onError, onComplete, null);
}

/**
* 连接订阅器代理。
* <p>
* Delegate of connection subscriber.
*/
@Getter
@RequiredArgsConstructor
class Delegate implements ConnectionSubscriber {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.github.linyuzai.connection.loadbalance.core.scope.ScopedFactory;

/**
* 连接订阅者工厂。
* <p>
* Factory of {@link ConnectionSubscriber}.
*/
public interface ConnectionSubscriberFactory extends ScopedFactory<ConnectionSubscriber> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

import java.util.function.Consumer;

/**
* 空连接订阅者工厂。
* <p>
* Factory of {@link EmptyConnectionSubscriber}.
*/
public class EmptyConnectionSubscriberFactory extends AbstractScopedFactory<ConnectionSubscriber>
implements ConnectionSubscriberFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.nio.ByteBuffer;

/**
* 连接订阅消息解码器
* 连接订阅消息解码器。
* <p>
* Connection subscribe message decoder.
*/
@Getter
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import java.util.function.Consumer;

/**
* 来接订阅者的抽象类
* 服务实例连接订阅者。
* <p>
* Subscriber of connection of service instance.
*/
public abstract class ServerInstanceConnectionSubscriber<T extends Connection> implements ConnectionSubscriber {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import lombok.NoArgsConstructor;

/**
* 订阅消息
* 订阅消息。
* 用于发送服务信息和反向连接。
* <p>
* 用于发送服务信息和反向连接
* Message of subscribe.
* Used to send server info and reverse connection.
*/
@NoArgsConstructor
public class SubscribeMessage extends AbstractMessage<ConnectionServer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import lombok.Getter;
import lombok.Setter;

/**
* 主从连接订阅者抽象类。
* <p>
* Abstract master slave connection subscriber.
*/
@Getter
@Setter
public abstract class AbstractMasterSlaveConnectionSubscriber extends AbstractConnectionSubscriber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
import java.util.Map;
import java.util.function.Consumer;

/**
* 主固定连接订阅者。
* <p>
* Master fixed connection subscriber.
*/
@Getter
@RequiredArgsConstructor
public class MasterFixedConnectionSubscriber implements ConnectionSubscriber {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave;

/**
* 主从。
* <p>
* Master slave.
*/
public enum MasterSlave {

MASTER, SLAVE
Expand Down
Loading

0 comments on commit 13f21a4

Please sign in to comment.