Skip to content

Commit

Permalink
first bunch (crossbario#228)
Browse files Browse the repository at this point in the history
* first bunch

* move android websocket support to subpackage

* more polish, add event details etc
  • Loading branch information
oberstet authored Jun 30, 2017
1 parent 96f8c4b commit 4e7beef
Show file tree
Hide file tree
Showing 31 changed files with 169 additions and 60 deletions.
26 changes: 21 additions & 5 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@

import io.crossbar.autobahn.wamp.exceptions.ApplicationError;
import io.crossbar.autobahn.wamp.exceptions.ProtocolError;

import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.interfaces.ISession;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.IEventHandler;
import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;

import io.crossbar.autobahn.wamp.messages.Call;
import io.crossbar.autobahn.wamp.messages.Error;
import io.crossbar.autobahn.wamp.messages.Event;
Expand All @@ -47,15 +51,16 @@
import io.crossbar.autobahn.wamp.messages.Subscribed;
import io.crossbar.autobahn.wamp.messages.Welcome;
import io.crossbar.autobahn.wamp.messages.Yield;

import io.crossbar.autobahn.wamp.requests.CallRequest;
import io.crossbar.autobahn.wamp.requests.PublishRequest;
import io.crossbar.autobahn.wamp.requests.RegisterRequest;
import io.crossbar.autobahn.wamp.requests.SubscribeRequest;

import io.crossbar.autobahn.wamp.types.CallOptions;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.crossbar.autobahn.wamp.types.IEventHandler;
import io.crossbar.autobahn.wamp.types.IInvocationHandler;
import io.crossbar.autobahn.wamp.types.EventDetails;
import io.crossbar.autobahn.wamp.types.InvocationDetails;
import io.crossbar.autobahn.wamp.types.InvocationResult;
import io.crossbar.autobahn.wamp.types.Publication;
Expand Down Expand Up @@ -246,11 +251,22 @@ private void onMessage(IMessage message) {
Event msg = (Event) message;
List<Subscription> subscriptions = mSubscriptions.getOrDefault(msg.subscription, null);
if (subscriptions != null) {

List<CompletableFuture<?>> futures = new ArrayList<>();

subscriptions.forEach(
subscription -> futures.add(
CompletableFuture.runAsync(() -> subscription.handler.run(msg.args, msg.kwargs),
getExecutor())));
subscription -> {
EventDetails details = new EventDetails(
subscription, subscription.topic, -1, null, null, this);
futures.add(
CompletableFuture.runAsync(
() -> subscription.handler.run(msg.args, msg.kwargs, details)
, getExecutor()
)
);
}
);

// Not really doing anything with the combined futures.
combineFutures(futures);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.crossbar.autobahn.wamp.types.Challenge;
import io.crossbar.autobahn.wamp.types.ChallengeResponse;


public interface IAuthenticator {
CompletableFuture<ChallengeResponse> onChallenge(Session session, Challenge challenge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp.types;
package io.crossbar.autobahn.wamp.interfaces;

import java.util.List;
import java.util.Map;

import io.crossbar.autobahn.wamp.types.EventDetails;


@FunctionalInterface
public interface IEventHandler<R> {
R run(List<Object> args, Map<String, Object> kwargs);
R run(List<Object> args,
Map<String, Object> kwargs,
EventDetails details);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp.types;
package io.crossbar.autobahn.wamp.interfaces;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.types.InvocationDetails;
import io.crossbar.autobahn.wamp.types.InvocationResult;


@FunctionalInterface
public interface IInvocationHandler {
CompletableFuture<InvocationResult> run(List<Object> args, Map<String, Object> kwargs,
CompletableFuture<InvocationResult> run(List<Object> args,
Map<String, Object> kwargs,
InvocationDetails details);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.List;


public interface IMessage {
List<Object> marshal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.fasterxml.jackson.core.type.TypeReference;

import io.crossbar.autobahn.wamp.Session;

import io.crossbar.autobahn.wamp.interfaces.IEventHandler;
import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;

import io.crossbar.autobahn.wamp.types.CallOptions;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.crossbar.autobahn.wamp.types.IEventHandler;
import io.crossbar.autobahn.wamp.types.IInvocationHandler;
import io.crossbar.autobahn.wamp.types.Publication;
import io.crossbar.autobahn.wamp.types.PublishOptions;
import io.crossbar.autobahn.wamp.types.RegisterOptions;
Expand All @@ -31,6 +33,7 @@
import io.crossbar.autobahn.wamp.types.SubscribeOptions;
import io.crossbar.autobahn.wamp.types.Subscription;


public interface ISession {

CompletableFuture<Subscription> subscribe(String topic, IEventHandler handler, SubscribeOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package io.crossbar.autobahn.wamp.interfaces;


public interface ITransportHandler {

void onConnect(ITransport transport, ISerializer serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.crossbar.autobahn.wamp.types.Publication;


public class PublishRequest extends Request {

public final CompletableFuture<Publication> onReply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.types.IInvocationHandler;
import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;
import io.crossbar.autobahn.wamp.types.Registration;


public class RegisterRequest extends Request {
public final CompletableFuture<Registration> onReply;
public final String procedure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package io.crossbar.autobahn.wamp.requests;


public class Request {

public final long request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.types.IEventHandler;
import io.crossbar.autobahn.wamp.interfaces.IEventHandler;
import io.crossbar.autobahn.wamp.types.Subscription;


public class SubscribeRequest extends Request {
public final String topic;
public final CompletableFuture<Subscription> onReply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp.types;
package io.crossbar.autobahn.wamp.serializers;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp;
package io.crossbar.autobahn.wamp.transports;

import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -45,7 +45,7 @@
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.types.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;


public class NettyTransport implements ITransport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp;
package io.crossbar.autobahn.wamp.transports;

import java.util.List;

import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -30,8 +26,14 @@
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;

import io.crossbar.autobahn.wamp.interfaces.IMessage;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;

import static io.crossbar.autobahn.wamp.messages.MessageMap.MESSAGE_TYPE_MAP;


public class NettyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private final WebSocketClientHandshaker mHandshaker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
///////////////////////////////////////////////////////////////////////////////
//
// AutobahnJava - http://crossbar.io/autobahn
//
// Copyright (c) Crossbar.io Technologies GmbH and contributors
//
// Licensed under the MIT License.
// http://www.opensource.org/licenses/mit-license.php
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.Session;


public class EventDetails {

// The subscription on which this event is delivered to.
public final Subscription subscription;

// The URI of the topic under the subscription.
public final String topic;

// The WAMP sessionid of the publisher.
public final long publisherSessionID;

// The WAMP authid of the publisher.
public final String publisherAuthID;

// The WAMP authrole of the publisher.
public final String publisherAuthRole;

// The WAMP session on which this event is delivered.
public final Session session;

public EventDetails(Subscription subscription, String topic, long publisherSessionID,
String publisherAuthID, String publisherAuthRole, Session session) {
this.subscription = subscription;
this.topic = topic;
this.publisherSessionID = publisherSessionID;
this.publisherAuthID = publisherAuthID;
this.publisherAuthRole = publisherAuthRole;
this.session = session;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class InvocationDetails {
// The WAMP authrole of the caller.
public final String callerAuthRole;

// The WAMP session on which this event is delivered.
public final Session session;

// FIXME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;


public class Registration {
public final long registration;
public final String procedure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.IEventHandler;


public class Subscription {
public final long subscription;
public final String topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn;
package io.crossbar.autobahn.websocket;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn;
package io.crossbar.autobahn.websocket;


public interface WebSocket {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn;
package io.crossbar.autobahn.websocket;

import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.os.Message;
import android.util.Log;

import org.apache.http.message.BasicNameValuePair;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -29,6 +22,15 @@
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;

import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.os.Message;
import android.util.Log;

import org.apache.http.message.BasicNameValuePair;


public class WebSocketConnection implements WebSocket {

private static final boolean DEBUG = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn;
package io.crossbar.autobahn.websocket;


/**
* WebSockets event handler. Users will usually provide an instance of a class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn;
package io.crossbar.autobahn.websocket;


public class WebSocketException extends Exception {

Expand Down
Loading

0 comments on commit 4e7beef

Please sign in to comment.