Skip to content

Commit

Permalink
POJO enhancements (crossbario#233)
Browse files Browse the repository at this point in the history
* make Session.call() code reusable

* Publish a single object (or POJO)

* add publish all args convenience API

* API: publish no args and no options

* API: publish args only; no options

* Session.call API: Add convenience all args method

* Changes to subscription handler API

* Fix build

* More code reuse

* extend Session.subscribe() API.

* API: register a procedure that takes a single argument

* More convenience for Session.register()

* POJO enabled Session.register() is functional now

* Session.subscribe convenience methods functional
  • Loading branch information
om26er authored and oberstet committed Jul 5, 2017
1 parent 7c2a2cf commit d58ae62
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 110 deletions.
243 changes: 172 additions & 71 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@


@FunctionalInterface
public interface IEventHandler<R> {
R run(List<Object> args,
Map<String, Object> kwargs,
EventDetails details);
public interface IEventHandler {
void accept(List<Object> args,
Map<String, Object> kwargs,
EventDetails details);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

@FunctionalInterface
public interface IInvocationHandler {
CompletableFuture<InvocationResult> run(List<Object> args,
Map<String, Object> kwargs,
InvocationDetails details);
CompletableFuture<InvocationResult> apply(List<Object> args,
Map<String, Object> kwargs,
InvocationDetails details);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.types.CallOptions;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.crossbar.autobahn.wamp.types.EventDetails;
import io.crossbar.autobahn.wamp.types.InvocationDetails;
import io.crossbar.autobahn.wamp.types.InvocationResult;
import io.crossbar.autobahn.wamp.types.Publication;
import io.crossbar.autobahn.wamp.types.PublishOptions;
import io.crossbar.autobahn.wamp.types.RegisterOptions;
Expand All @@ -34,12 +41,45 @@ public interface ISession {

CompletableFuture<Subscription> subscribe(String topic, IEventHandler handler, SubscribeOptions options);

<T> CompletableFuture<Subscription> subscribe(String topic, Consumer<T> handler, SubscribeOptions options);

<T> CompletableFuture<Subscription> subscribe(String topic, BiConsumer<T, EventDetails> handler,
SubscribeOptions options);

<T, U> CompletableFuture<Subscription> subscribe(String topic, TriConsumer<T, U, EventDetails> handler,
SubscribeOptions options);

CompletableFuture<Publication> publish(String topic,
List<Object> args,
Map<String, Object> kwargs,
PublishOptions options);

CompletableFuture<Registration> register(String procedure, IInvocationHandler endpoint, RegisterOptions options);
CompletableFuture<Publication> publish(String topic, Object object, PublishOptions options);

CompletableFuture<Publication> publish(String topic, PublishOptions options, Object... objects);

CompletableFuture<Publication> publish(String topic, Object... objects);

CompletableFuture<Publication> publish(String topic, PublishOptions options);

CompletableFuture<Publication> publish(String topic);

CompletableFuture<Registration> register(String procedure, IInvocationHandler endpoint,
RegisterOptions options);

<T> CompletableFuture<Registration> register(String procedure,
Function<T, CompletableFuture<InvocationResult>> endpoint,
RegisterOptions options);

<T> CompletableFuture<Registration> register(String procedure,
BiFunction<T, InvocationDetails,
CompletableFuture<InvocationResult>> endpoint,
RegisterOptions options);

<T, U> CompletableFuture<Registration> register(String procedure,
TriFunction<T, U, InvocationDetails,
CompletableFuture<InvocationResult>> endpoint,
RegisterOptions options);

CompletableFuture<CallResult> call(String procedure,
List<Object> args,
Expand All @@ -51,12 +91,12 @@ <T> CompletableFuture<T> call(String procedure,
Map<String, Object> kwargs,
TypeReference<T> resultType,
CallOptions options);
/*

<T> CompletableFuture<T> call(String procedure,
TypeReference<T> resultType,
CallOptions options,
Object... args);
*/

CompletableFuture<SessionDetails> join(String realm, List<String> authMethods);

void leave(String reason, String message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.crossbar.autobahn.wamp.interfaces;


@FunctionalInterface
public interface TriConsumer<T, U, V> {
void accept(T var1, U var2, V var3);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.crossbar.autobahn.wamp.interfaces;

@FunctionalInterface
public interface TriFunction<T, U, V, R> {
R apply(T var1, U var2, V var3);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@

import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;
import io.crossbar.autobahn.wamp.types.Registration;


public class RegisterRequest extends Request {
public final CompletableFuture<Registration> onReply;
public final String procedure;
public final IInvocationHandler endpoint;
public final Object endpoint;

public RegisterRequest(long request, CompletableFuture<Registration> onReply, String procedure,
IInvocationHandler endpoint) {
Object endpoint) {
super(request);
this.onReply = onReply;
this.procedure = procedure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@

import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.interfaces.IEventHandler;
import io.crossbar.autobahn.wamp.types.Subscription;


public class SubscribeRequest extends Request {
public final String topic;
public final CompletableFuture<Subscription> onReply;
public final IEventHandler<?> handler;
public final Object handler;

public SubscribeRequest(long request, String topic, CompletableFuture<Subscription> onReply,
IEventHandler<?> handler) {
Object handler) {
super(request);
this.topic = topic;
this.onReply = onReply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@


public class InvocationResult {
public List<Object> results;
public Map<String, Object> kwresults;
public final List<Object> results;
public final Map<String, Object> kwresults;

/// Default constructor.
public InvocationResult() {
results = null;
kwresults = null;
}

/// Copy constructor.
Expand All @@ -34,16 +36,19 @@ public InvocationResult(InvocationResult other) {
public InvocationResult(Object result) {
this.results = new ArrayList<>();
this.results.add(result);
this.kwresults = null;
}

/// Constructor for positional-only results.
public InvocationResult(List<Object> results) {
this.results = results;
this.kwresults = null;
}

/// Constructor for keyword-based-only results.
public InvocationResult(Map<String, Object> kwresults) {
this.kwresults = kwresults;
this.results = null;
}

/// Constructor for results that have both positional and keyword-based results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.IInvocationHandler;


public class Registration {
public final long registration;
public final String procedure;
public final IInvocationHandler endpoint;
public final Object endpoint;

public Registration(long registration, String procedure, IInvocationHandler endpoint) {
public Registration(long registration, String procedure, Object endpoint) {
this.registration = registration;
this.procedure = procedure;
this.endpoint = endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.IEventHandler;


public class Subscription {
public final long subscription;
public final String topic;
public final IEventHandler<?> handler;
public final Object handler;

public Subscription(long subscription, String topic, IEventHandler<?> handler) {
public Subscription(long subscription, String topic, Object handler) {
this.subscription = subscription;
this.topic = topic;
this.handler = handler;
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
// Netty build does not need android dependencies.
if (project.properties.get('buildPlatform', 'android') == 'android') {
dependencies {
classpath 'com.android.tools.build:gradle:3.0.0-alpha4'
classpath 'com.android.tools.build:gradle:3.0.0-alpha5'
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package io.crossbar.autobahn.demogallery.netty;

import com.fasterxml.jackson.core.type.TypeReference;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -20,21 +22,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.core.type.TypeReference;

import io.crossbar.autobahn.wamp.Client;
import io.crossbar.autobahn.wamp.Session;

import io.crossbar.autobahn.wamp.transports.NettyTransport;

import io.crossbar.autobahn.wamp.auth.AnonymousAuth;

import io.crossbar.autobahn.wamp.interfaces.IAuthenticator;
import io.crossbar.autobahn.wamp.interfaces.ITransport;

import io.crossbar.autobahn.wamp.transports.NettyTransport;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.ExitInfo;
import io.crossbar.autobahn.wamp.types.EventDetails;
import io.crossbar.autobahn.wamp.types.ExitInfo;
import io.crossbar.autobahn.wamp.types.InvocationDetails;
import io.crossbar.autobahn.wamp.types.InvocationResult;
import io.crossbar.autobahn.wamp.types.Publication;
Expand Down Expand Up @@ -318,9 +314,12 @@ private CompletableFuture<InvocationResult> add2(List<Object> args, Map<String,


// this handler will process incoming events for the topic we subscribe it to
private Void onCounter(List<Object> args, Map<String, Object> kwargs, EventDetails details) {
private void onCounter(List<Object> args, Map<String, Object> kwargs, EventDetails details) {
System.out.println("received counter: " + args.get(0));
return null;
}

private void onCounterSimple(String object, EventDetails details) {
System.out.println("received counter: " + object);
}


Expand Down

0 comments on commit d58ae62

Please sign in to comment.