Skip to content

Commit

Permalink
Introduce the Client class (crossbario#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
om26er authored and oberstet committed Jun 17, 2017
1 parent 0d773d6 commit c735b15
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 81 deletions.
30 changes: 30 additions & 0 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.crossbar.autobahn.wamp;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.types.ExitInfo;

public class Client {
private final Session mSession;
private final List<ITransport> mTransports;
private final String mRealm;
private final List<?> mAuthenticators;

public Client(Session session, List<ITransport> transports, String realm, List<?> authenticators) {
mSession = session;
mTransports = transports;
mRealm = realm;
mAuthenticators = authenticators;
}

public CompletableFuture<ExitInfo> connect() {
CompletableFuture<ExitInfo> exitFuture = new CompletableFuture<>();
ExitInfo info = new ExitInfo();
mSession.addOnConnectListener(() -> mSession.join(mRealm, null));
mSession.addOnDisconnectListener(() -> exitFuture.complete(info));
mTransports.get(0).connect(mSession);
return exitFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import javax.net.ssl.SSLException;

Expand All @@ -11,7 +10,6 @@
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.types.CBORSerializer;
import io.crossbar.autobahn.wamp.messages.Hello;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -38,8 +36,10 @@ public class NettyTransport implements ITransport {

private Channel mChannel;
private ISerializer mSerializer;
private final String mUri;

public NettyTransport() {
public NettyTransport(String uri) {
mUri = uri;
mSerializer = new CBORSerializer();
}

Expand All @@ -66,21 +66,11 @@ private SslContext getSSLContext(String scheme) throws SSLException {
return null;
}

private String marshalSubProtocolsList(List<String> subProtocols) {
StringBuilder subProtoBuilder = new StringBuilder();
for (String proto : subProtocols) {
subProtoBuilder.append(proto);
subProtoBuilder.append(",");
}
String rawOutput = subProtoBuilder.toString();
return rawOutput.substring(0, rawOutput.length() - 1);
}

@Override
public void connect(String url, List<String> subProtocols, ITransportHandler transportHandler) {
public void connect(ITransportHandler transportHandler) {
URI uri;
try {
uri = new URI(url);
uri = new URI(mUri);
} catch (URISyntaxException e) {
e.printStackTrace();
return;
Expand All @@ -99,9 +89,8 @@ public void connect(String url, List<String> subProtocols, ITransportHandler tra

final NettyWebSocketClientHandler handler = new NettyWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, marshalSubProtocolsList(subProtocols),
true, new DefaultHttpHeaders()), this, transportHandler,
mSerializer);
uri, WebSocketVersion.V13, "wamp.2.cbor",true,
new DefaultHttpHeaders()), this, transportHandler, mSerializer);

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
Expand Down Expand Up @@ -132,11 +121,9 @@ protected void initChannel(SocketChannel ch) throws Exception {

@Override
public void send(IMessage message) {
if (message instanceof Hello) {
byte[] data = mSerializer.serialize(message.marshal());
WebSocketFrame frame = new BinaryWebSocketFrame(toByteBuf(data));
mChannel.writeAndFlush(frame);
}
byte[] data = mSerializer.serialize(message.marshal());
WebSocketFrame frame = new BinaryWebSocketFrame(toByteBuf(data));
mChannel.writeAndFlush(frame);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -22,12 +23,12 @@
public class NettyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private final WebSocketClientHandshaker mHandshaker;
private final NettyTransport mTransport;
private final ITransport mTransport;
private final ISerializer mSerializer;
private ChannelPromise mHandshakeFuture;
private ITransportHandler mTransportHandler;

public NettyWebSocketClientHandler(WebSocketClientHandshaker handshaker, NettyTransport transport,
public NettyWebSocketClientHandler(WebSocketClientHandshaker handshaker, ITransport transport,
ITransportHandler transportHandler, ISerializer serializer) {
mHandshaker = handshaker;
mTransport = transport;
Expand Down Expand Up @@ -60,6 +61,7 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
if (!mHandshaker.isHandshakeComplete()) {
mHandshaker.finishHandshake(ch, (FullHttpResponse) msg);
mHandshakeFuture.setSuccess();
mTransportHandler.onConnect(mTransport);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ public Playground() {
}

public void showTransportAttachment() {
NettyTransport transport = new NettyTransport();
List<String> protocols = new ArrayList<>();
protocols.add("wamp.2.cbor");
transport.connect("ws://192.168.1.3:8080/ws", protocols, this);
NettyTransport transport = new NettyTransport("ws://192.168.1.3:8080/ws");
transport.connect(this);
mSession.addOnJoinListener( details -> System.out.println("play with join details here"));
mSession.attach(transport);
CompletableFuture<SessionDetails> joinedFuture = mSession.join("realm1", null);
}

Expand Down
60 changes: 40 additions & 20 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.interfaces.ISession;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.messages.Call;
import io.crossbar.autobahn.wamp.messages.Hello;
import io.crossbar.autobahn.wamp.messages.Publish;
import io.crossbar.autobahn.wamp.messages.Register;
import io.crossbar.autobahn.wamp.messages.Subscribe;
import io.crossbar.autobahn.wamp.messages.Welcome;
import io.crossbar.autobahn.wamp.types.CallOptions;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.ComponentConfig;
import io.crossbar.autobahn.wamp.messages.Hello;
import io.crossbar.autobahn.wamp.types.IEventHandler;
import io.crossbar.autobahn.wamp.types.IInvocationHandler;
import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.types.Publication;
import io.crossbar.autobahn.wamp.types.PublishOptions;
import io.crossbar.autobahn.wamp.types.RegisterOptions;
import io.crossbar.autobahn.wamp.types.Registration;
import io.crossbar.autobahn.wamp.types.SessionDetails;
import io.crossbar.autobahn.wamp.types.SubscribeOptions;
import io.crossbar.autobahn.wamp.types.Subscription;
import io.crossbar.autobahn.wamp.messages.Welcome;


public class Session implements ISession, ITransportHandler {

private ITransport mTransport;

private ArrayList<OnJoinListener> mOnJoinListeners;
private ArrayList<OnLeaveListener> mOnLeaveListeners;
private ArrayList<OnConnectListener> mOnConnectListeners;
private ArrayList<OnDisconnectListener> mOnDisconnectListeners;
private ArrayList<OnUserErrorListener> mOnUserErrorListeners;
private final ArrayList<OnJoinListener> mOnJoinListeners;
private final ArrayList<OnLeaveListener> mOnLeaveListeners;
private final ArrayList<OnConnectListener> mOnConnectListeners;
private final ArrayList<OnDisconnectListener> mOnDisconnectListeners;
private final ArrayList<OnUserErrorListener> mOnUserErrorListeners;

private long mSessionID;
private boolean mGoodbyeSent;
Expand Down Expand Up @@ -62,22 +67,22 @@ public void onConnect(ITransport transport) {
// throw new Exception("already connected");
}
mTransport = transport;
mOnConnectListeners.forEach(OnConnectListener::onConnect);
}

@Override
public void onMessage(IMessage message) {
System.out.println(message);
if (mSessionID == 0) {
if (message instanceof Welcome) {
Welcome msg = (Welcome) message;
mSessionID = msg.session;
SessionDetails details = new SessionDetails(msg.realm, msg.session);
System.out.println("CONNECTEDDDDDDDDDDDDD");

mOnJoinListeners.forEach(onJoinListener -> onJoinListener.onJoin(details));
}
} else {
// Session is already established.
// Now that we have an active session handle all incoming messages here.
System.out.println(message);
}
// process the incoming WAMP message
}

@Override
Expand All @@ -96,25 +101,37 @@ public boolean isConnected() {

@Override
public CompletableFuture<Subscription> subscribe(String topic, IEventHandler handler, SubscribeOptions options) {
return null;
CompletableFuture<Subscription> future = new CompletableFuture<>();
long requestID = getRandomNumber();
mTransport.send(new Subscribe(requestID, topic, null, false));
return future;
}

@Override
public CompletableFuture<Publication> publish(String topic, List<Object> args, Map<String, Object> kwargs,
PublishOptions options) {
return null;
CompletableFuture<Publication> future = new CompletableFuture<>();
long requestID = getRandomNumber();
mTransport.send(new Publish(requestID, topic, args, kwargs, false, true));
return future;
}

@Override
public CompletableFuture<Registration> register(String procedure, IInvocationHandler endpoint,
RegisterOptions options) {
return null;
CompletableFuture<Registration> future = new CompletableFuture<>();
long requestID = getRandomNumber();
mTransport.send(new Register(requestID, procedure, null, null));
return future;
}

@Override
public CompletableFuture<CallResult> call(String procedure, List<Object> args, Map<String, Object> kwargs,
CallOptions options) {
return null;
CompletableFuture<CallResult> future = new CompletableFuture<>();
long requestID = getRandomNumber();
mTransport.send(new Call(requestID, procedure, args, kwargs));
return future;
}

@Override
Expand All @@ -123,17 +140,20 @@ public CompletableFuture<SessionDetails> join(String realm, List<String> authMet
mGoodbyeSent = false;
Map<String, Map> roles = new HashMap<>();
roles.put("publisher", new HashMap<>());
roles.put("subscriber", new HashMap<>());
roles.put("caller", new HashMap<>());
roles.put("callee", new HashMap<>());
mTransport.send(new Hello(realm, roles));
return null;
}

@Override
public void leave(String reason, String message) {

}

// FIXME: Remove this method. Only there for testing purposes.
public void attach(ITransport transport) {
mTransport = transport;
private long getRandomNumber() {
return ThreadLocalRandom.current().nextLong(0,9007199254740992L);
}

public OnJoinListener addOnJoinListener(OnJoinListener listener) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.crossbar.autobahn.wamp.interfaces;

import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.types.Challenge;

public interface IAuthenticator {
void onChallenge(Session session, Challenge challenge);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package io.crossbar.autobahn.wamp.interfaces;

import java.util.List;

// FIXME: data types to be discussed/changed.
public interface ITransport {

// this is the only method needed by Session ..
void send(IMessage message);

// .. not sure about the following, we'll see
void connect(String url, List<String> subProtocols, ITransportHandler transportHandler);
void connect(ITransportHandler transportHandler);

boolean isOpen();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static Invocation parse(List<Object> wmsg) {
throw new ProtocolError(String.format("invalid message length %s for INVOCATION", wmsg.size()));
}

long request = (long) wmsg.get(1);
int request = (int) wmsg.get(1);
long registration = (long) wmsg.get(2);
Map<String, Object> details = (Map<String, Object>) wmsg.get(3);
List<Object> args = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,24 @@ public class MessageMap {
static {
MESSAGE_TYPE_MAP.put(Hello.MESSAGE_TYPE, Hello.class);
MESSAGE_TYPE_MAP.put(Welcome.MESSAGE_TYPE, Welcome.class);
MESSAGE_TYPE_MAP.put(Abort.MESSAGE_TYPE, Abort.class);
MESSAGE_TYPE_MAP.put(Goodbye.MESSAGE_TYPE, Goodbye.class);
MESSAGE_TYPE_MAP.put(Error.MESSAGE_TYPE, Error.class);
MESSAGE_TYPE_MAP.put(Publish.MESSAGE_TYPE, Publish.class);
MESSAGE_TYPE_MAP.put(Published.MESSAGE_TYPE, Published.class);
MESSAGE_TYPE_MAP.put(Subscribe.MESSAGE_TYPE, Subscribe.class);
MESSAGE_TYPE_MAP.put(Subscribed.MESSAGE_TYPE, Subscribed.class);
MESSAGE_TYPE_MAP.put(Unsubscribe.MESSAGE_TYPE, Unsubscribe.class);
MESSAGE_TYPE_MAP.put(Unsubscribed.MESSAGE_TYPE, Unsubscribed.class);
MESSAGE_TYPE_MAP.put(Event.MESSAGE_TYPE, Event.class);
MESSAGE_TYPE_MAP.put(Call.MESSAGE_TYPE, Call.class);
MESSAGE_TYPE_MAP.put(Result.MESSAGE_TYPE, Result.class);
MESSAGE_TYPE_MAP.put(Register.MESSAGE_TYPE, Register.class);
MESSAGE_TYPE_MAP.put(Registered.MESSAGE_TYPE, Registered.class);
MESSAGE_TYPE_MAP.put(Unregister.MESSAGE_TYPE, Unregister.class);
MESSAGE_TYPE_MAP.put(Unregistered.MESSAGE_TYPE, Unregistered.class);
MESSAGE_TYPE_MAP.put(Invocation.MESSAGE_TYPE, Invocation.class);
MESSAGE_TYPE_MAP.put(Yield.MESSAGE_TYPE, Yield.class);
MESSAGE_TYPE_MAP.put(Interrupt.MESSAGE_TYPE, Interrupt.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.crossbar.autobahn.wamp.types;

import java.util.Map;

public class Challenge {
public final String method;
public final Map<String, Object> extra;

public Challenge(String method, Map<String, Object> extra) {
this.method = method;
this.extra = extra;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.crossbar.autobahn.wamp.types;

public class ExitInfo {
}
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
// Netty build does not need android dependencies.
if (project.properties.get('buildPlatform', 'android') == 'android') {
dependencies {
classpath 'com.android.tools.build:gradle:3.0.0-alpha3'
classpath 'com.android.tools.build:gradle:3.0.0-alpha4'
}
}
}
Expand Down
Loading

0 comments on commit c735b15

Please sign in to comment.