Skip to content

Commit

Permalink
connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Apr 26, 2022
1 parent 331172a commit 54ce5e2
Show file tree
Hide file tree
Showing 30 changed files with 190 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
@Setter
public abstract class AbstractConnection implements Connection {

private final Map<Object, Object> metadata = new LinkedHashMap<>();
protected final Map<Object, Object> metadata = new LinkedHashMap<>();

@Setter(AccessLevel.PRIVATE)
private String type;
protected String type;

@NonNull
private MessageEncoder messageEncoder;
protected MessageEncoder messageEncoder;

@NonNull
private MessageDecoder messageDecoder;
protected MessageDecoder messageDecoder;

@NonNull
private ConnectionLoadBalanceConcept concept;
protected ConnectionLoadBalanceConcept concept;

public AbstractConnection(String type) {
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,23 @@ public AbstractConnectionLoadBalanceConcept(ConnectionServerProvider connectionS
MessageCodecAdapter messageCodecAdapter,
List<MessageFactory> messageFactories,
ConnectionEventPublisher eventPublisher) {
this.connectionServerProvider = connectionServerProvider;
this.connectionSubscriber = connectionSubscriber;
this.connectionFactories = connectionFactories;
this.connectionSelectors = connectionSelectors;
this.messageCodecAdapter = messageCodecAdapter;
this.messageFactories = messageFactories;
this.eventPublisher = eventPublisher;
this.connectionServerProvider = applyAware(connectionServerProvider);
this.connectionSubscriber = applyAware(connectionSubscriber);
this.connectionFactories = applyAware(connectionFactories);
this.connectionSelectors = applyAware(connectionSelectors);
this.messageCodecAdapter = applyAware(messageCodecAdapter);
this.messageFactories = applyAware(messageFactories);
this.eventPublisher = applyAware(eventPublisher);
}

public <T> T applyAware(T o) {
if (o instanceof ConnectionLoadBalanceConceptAware) {
((ConnectionLoadBalanceConceptAware) o).setConnectionLoadBalanceConcept(this);
}
if (o instanceof Collection) {
((Collection<?>) o).forEach(this::applyAware);
}
return o;
}

@Override
Expand Down Expand Up @@ -95,7 +105,7 @@ public Connection open(Object o, Map<Object, Object> metadata) {

@Override
public void open(Connection connection) {
//TODO Aware
applyAware(connection);
String type = connection.getType();
if (type == null) {
throw new NoConnectionTypeException(connection);
Expand Down Expand Up @@ -301,7 +311,7 @@ public Message createMessage(Object msg) {
if (message == null) {
throw new ConnectionLoadBalanceException("Message can not be created with " + msg);
}
return message;
return applyAware(message);
}

public MessageFactory getMessageFactory(Object msg) {
Expand All @@ -315,7 +325,7 @@ public MessageFactory getMessageFactory(Object msg) {

@Override
public void publish(Object event) {
eventPublisher.publish(event);
eventPublisher.publish(applyAware(event));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;

import java.net.URI;
import java.util.Map;

public interface Connection {

String URI = "uri";

Object getId();

String getType();

URI getUri();

Map<Object, Object> getMetadata();

MessageEncoder getMessageEncoder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.linyuzai.connection.loadbalance.core.concept;

import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.linyuzai.connection.loadbalance.core.concept;

public interface ConnectionLoadBalanceConceptAware<T extends ConnectionLoadBalanceConcept> {

void setConnectionLoadBalanceConcept(T concept);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import lombok.Getter;
import lombok.NonNull;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

@Getter
Expand Down Expand Up @@ -37,10 +39,16 @@ public String getType() {
return connection == null ? null : connection.getType();
}

@Override
public URI getUri() {
Connection connection = get();
return connection == null ? null : connection.getUri();
}

@Override
public Map<Object, Object> getMetadata() {
Connection connection = get();
return connection == null ? null : connection.getMetadata();
return connection == null ? Collections.emptyMap() : connection.getMetadata();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import lombok.Getter;

import java.net.URI;

@Getter
public class IdConnection extends AbstractConnection {

Expand All @@ -28,6 +30,11 @@ public void doSend(Object message) {

}

@Override
public URI getUri() {
return null;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public MessageEncoder getMessageEncoder(String type) {
case Connection.Type.CLIENT:
return getClientMessageEncoder();
case Connection.Type.SUBSCRIBER:
return getSubscriberMessageEncoder();
return getSubscribeMessageEncoder();
case Connection.Type.OBSERVABLE:
return getObservableMessageEncoder();
return getForwardMessageEncoder();
default:
return getUndefinedTypeMessageEncoder(type);
}
Expand All @@ -26,9 +26,9 @@ public MessageDecoder getMessageDecoder(String type) {
case Connection.Type.CLIENT:
return getClientMessageDecoder();
case Connection.Type.SUBSCRIBER:
return getSubscriberMessageDecoder();
return getForwardMessageDecoder();
case Connection.Type.OBSERVABLE:
return getObservableMessageDecoder();
return getSubscribeMessageDecoder();
default:
return getUndefinedTypeMessageDecoder(type);
}
Expand All @@ -49,32 +49,32 @@ public MessageDecoder getMessageDecoder(String type) {
public abstract MessageDecoder getClientMessageDecoder();

/**
* 订阅者发送服务信息的消息编码器
* 订阅时发送服务信息的消息编码器
*
* @return 消息编码器
*/
public abstract MessageEncoder getSubscriberMessageEncoder();
public abstract MessageEncoder getSubscribeMessageEncoder();

/**
* 订阅者接收消息转发的消息解码器
* 订阅时接收服务信息的消息解码器
*
* @return 消息解码器
*/
public abstract MessageDecoder getSubscriberMessageDecoder();
public abstract MessageDecoder getSubscribeMessageDecoder();

/**
* 被观察者转发消息的消息编码器
* 转发消息的消息编码器
*
* @return 消息编码器
*/
public abstract MessageEncoder getObservableMessageEncoder();
public abstract MessageEncoder getForwardMessageEncoder();

/**
* 被观察者接收服务信息的消息解码器
* 接收消息转发的消息解码器
*
* @return 消息解码器
*/
public abstract MessageDecoder getObservableMessageDecoder();
public abstract MessageDecoder getForwardMessageDecoder();

public MessageEncoder getUndefinedTypeMessageEncoder(String type) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.linyuzai.connection.loadbalance.core.select;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.Connections;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

@Getter
@AllArgsConstructor
public abstract class MessageHeaderSelector extends AbstractConnectionSelector {

private String name;

@Override
public boolean support(Message message) {
return message.getHeaders().containsKey(name);
}

@Override
public Connection doSelect(Message message, Collection<Connection> connections) {
String header = message.getHeaders().get(name);
List<Connection> list = connections.stream()
.filter(it -> match(it, header))
.collect(Collectors.toList());
return Connections.of(list);
}

public abstract boolean match(Connection connection, String header);
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
package com.github.linyuzai.connection.loadbalance.core.select;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.Connections;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Getter
@AllArgsConstructor
public class MetadataSelector extends AbstractConnectionSelector {
public class MetadataSelector extends MessageHeaderSelector {

private String name;

@Override
public boolean support(Message message) {
return message.getHeaders().containsKey(name);
public MetadataSelector(String name) {
super(name);
}

@Override
public Connection doSelect(Message message, Collection<Connection> connections) {
String header = message.getHeaders().get(name);
List<Connection> list = connections.stream()
.filter(it -> match(it.getMetadata().get(name), header))
.collect(Collectors.toList());
return Connections.of(list);
}

public boolean match(Object metadata, String header) {
return Objects.equals(metadata, header);
public boolean match(Connection connection, String header) {
return Objects.equals(connection.getMetadata().get(getName()), header);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.linyuzai.connection.loadbalance.core.select;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;

import java.util.Objects;

public class UriPathSelector extends MessageHeaderSelector {

public static final String KEY = "uri_path";

public UriPathSelector() {
super(KEY);
}

@Override
public boolean match(Connection connection, String header) {
return Objects.equals(connection.getUri().getPath(), header);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.github.linyuzai.connection.loadbalance.websocket.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.AbstractConnection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConceptAware;
import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapter;

import java.util.Map;

public abstract class WebSocketConnection extends AbstractConnection {
public abstract class WebSocketConnection extends AbstractConnection
implements ConnectionLoadBalanceConceptAware<WebSocketLoadBalanceConcept> {

public WebSocketConnection(String type) {
super(type);
Expand All @@ -13,4 +16,12 @@ public WebSocketConnection(String type) {
public WebSocketConnection(String type, Map<Object, Object> metadata) {
super(type, metadata);
}

@Override
public void setConnectionLoadBalanceConcept(WebSocketLoadBalanceConcept concept) {
MessageCodecAdapter adapter = concept.getMessageCodecAdapter();
setMessageEncoder(adapter.getMessageEncoder(getType()));
setMessageDecoder(adapter.getMessageDecoder(getType()));
setConcept(concept);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
package com.github.linyuzai.connection.loadbalance.websocket.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.AbstractConnectionFactory;
import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapter;

import java.util.Map;

public abstract class WebSocketConnectionFactory<T extends WebSocketConnection>
extends AbstractConnectionFactory<T, WebSocketLoadBalanceConcept> {

@Override
public T doCreate(Object o, Map<Object, Object> metadata, WebSocketLoadBalanceConcept concept) {
T connection = doCreate(o, metadata);
MessageCodecAdapter adapter = concept.getMessageCodecAdapter();
connection.setMessageEncoder(adapter.getMessageEncoder(Connection.Type.CLIENT));
connection.setMessageDecoder(adapter.getMessageDecoder(Connection.Type.CLIENT));
connection.setConcept(concept);
return connection;
}

public abstract T doCreate(Object o, Map<Object, Object> metadata);
}
Loading

0 comments on commit 54ce5e2

Please sign in to comment.