Skip to content

Commit

Permalink
Extend WAMP transports to be configurable (crossbario#385)
Browse files Browse the repository at this point in the history
* Extend WAMP transports to be configurable

* Update android dependencies
  • Loading branch information
om26er authored and oberstet committed Mar 29, 2018
1 parent 7cfc290 commit 12fdc31
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 73 deletions.
6 changes: 3 additions & 3 deletions autobahn/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ bintray {

if (project.IS_ANDROID) {
android {
compileSdkVersion 26
buildToolsVersion '26.0.2'
compileSdkVersion 27
buildToolsVersion '27.0.3'
defaultConfig {
minSdkVersion 24
targetSdkVersion 26
targetSdkVersion 27
}
buildTypes {
release {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@

package io.crossbar.autobahn.wamp.interfaces;

import io.crossbar.autobahn.wamp.types.TransportOptions;

public interface ITransport {

void send(byte[] payload, boolean isBinary);

void connect(ITransportHandler transportHandler) throws Exception;

void connect(ITransportHandler transportHandler, TransportOptions options) throws Exception;

boolean isOpen();

void close() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.JSONSerializer;
import io.crossbar.autobahn.wamp.serializers.MessagePackSerializer;
import io.crossbar.autobahn.wamp.types.TransportOptions;
import io.crossbar.autobahn.websocket.WebSocketConnection;
import io.crossbar.autobahn.websocket.WebSocketConnectionHandler;
import io.crossbar.autobahn.websocket.types.ConnectionResponse;
import io.crossbar.autobahn.websocket.types.WebSocketOptions;

public class AndroidWebSocket implements ITransport {

Expand Down Expand Up @@ -60,6 +62,18 @@ public void send(byte[] payload, boolean isBinary) {

@Override
public void connect(ITransportHandler transportHandler) throws Exception {
connect(transportHandler, new TransportOptions());
}

@Override
public void connect(ITransportHandler transportHandler, TransportOptions options)
throws Exception {

WebSocketOptions webSocketOptions = new WebSocketOptions();
webSocketOptions.setAutoPingInterval(options.getAutoPingInterval());
webSocketOptions.setAutoPingTimeout(options.getAutoPingTimeout());
webSocketOptions.setMaxFramePayloadSize(options.getMaxFramePayloadSize());

mConnection.connect(mUri, getSerializers(), new WebSocketConnectionHandler() {

@Override
Expand Down Expand Up @@ -103,7 +117,7 @@ public void onMessage(byte[] payload, boolean isBinary) {
e.printStackTrace();
}
}
});
}, webSocketOptions, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import java.net.URI;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;
Expand All @@ -25,6 +24,7 @@
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.JSONSerializer;
import io.crossbar.autobahn.wamp.serializers.MessagePackSerializer;
import io.crossbar.autobahn.wamp.types.TransportOptions;
import io.crossbar.autobahn.wamp.types.WebSocketOptions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -63,32 +63,41 @@ public class NettyWebSocket implements ITransport {
private NettyWebSocketClientHandler mHandler;
private final String mUri;

private Executor mExecutor;
private WebSocketOptions mOptions;
private List<String> mSerializers;
private String mSerializers;

public NettyWebSocket(String uri) {
mUri = uri;
this(uri, (WebSocketOptions) null);
}

public NettyWebSocket(String uri, List<String> serializers) {
mUri = uri;
mSerializers = serializers;
this(uri, serializers, null);
}

@Deprecated
public NettyWebSocket(String uri, WebSocketOptions options) {
this(uri);
mOptions = options;
this(uri, null, options);
}

@Deprecated
public NettyWebSocket(String uri, List<String> serializers, WebSocketOptions options) {
mUri = uri;
mSerializers = serializers;
mOptions = options;
}

private WebSocketOptions getOptions() {
return mOptions == null ? new WebSocketOptions() : mOptions;
if (serializers == null) {
mSerializers = SERIALIZERS_DEFAULT;
} else {
StringBuilder result = new StringBuilder();
for (String serializer: serializers) {
result.append(serializer).append(",");
}
mSerializers = result.toString();
}

if (options == null) {
mOptions = new WebSocketOptions();
} else {
mOptions = options;
}
}

private int validateURIAndGetPort(URI uri) {
Expand All @@ -112,19 +121,26 @@ private SslContext getSSLContext(String scheme) throws SSLException {
InsecureTrustManagerFactory.INSTANCE).build() : null;
}

private String getSerializers() {
if (mSerializers != null) {
StringBuilder result = new StringBuilder();
for (String serializer: mSerializers) {
result.append(serializer).append(",");
}
return result.toString();
}
return SERIALIZERS_DEFAULT;
@Override
public void connect(ITransportHandler transportHandler) throws Exception {
connect(transportHandler, new TransportOptions());
}

@Override
public void connect(ITransportHandler transportHandler) throws Exception {
public void connect(ITransportHandler transportHandler, TransportOptions options)
throws Exception {

if (options == null) {
if (mOptions == null) {
options = new TransportOptions();
} else {
options = new TransportOptions();
options.setAutoPingInterval(mOptions.getAutoPingInterval());
options.setAutoPingTimeout(mOptions.getAutoPingTimeout());
options.setMaxFramePayloadSize(mOptions.getMaxFramePayloadSize());
}
}

URI uri;
uri = new URI(mUri);
int port = validateURIAndGetPort(uri);
Expand All @@ -134,14 +150,16 @@ public void connect(ITransportHandler transportHandler) throws Exception {
final SslContext sslContext = getSSLContext(scheme);

WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, getSerializers(), true,
new DefaultHttpHeaders(), getOptions().getMaxFramePayloadSize());
mHandler = new NettyWebSocketClientHandler(handshaker,this, transportHandler);
uri, WebSocketVersion.V13, mSerializers, true,
new DefaultHttpHeaders(), options.getMaxFramePayloadSize());
mHandler = new NettyWebSocketClientHandler(handshaker, this, transportHandler);

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);

TransportOptions opt = options;
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -153,7 +171,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
new IdleStateHandler(15, 10, 20, TimeUnit.SECONDS),
new IdleStateHandler(
opt.getAutoPingInterval() + opt.getAutoPingTimeout(),
opt.getAutoPingInterval(), 0, TimeUnit.SECONDS),
mHandler);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
///////////////////////////////////////////////////////////////////////////////
//
// AutobahnJava - http://crossbar.io/autobahn
//
// Copyright (c) Crossbar.io Technologies GmbH and contributors
//
// Licensed under the MIT License.
// http://www.opensource.org/licenses/mit-license.php
//
///////////////////////////////////////////////////////////////////////////////

package io.crossbar.autobahn.wamp.types;

public class TransportOptions {
private int mMaxFramePayloadSize;
private int mAutoPingInterval;
private int mAutoPingTimeout;

public TransportOptions() {
mMaxFramePayloadSize = 128 * 1024;
mAutoPingInterval = 10;
mAutoPingTimeout = 5;
}

/**
* Set maximum frame payload size that will be accepted
* when receiving.
* <p>
* DEFAULT: 4MB
*
* @param size Maximum size in octets for frame payload.
*/
public void setMaxFramePayloadSize(int size) {
if (size > 0) {
mMaxFramePayloadSize = size;
}
}

/**
* Get maximum frame payload size that will be accepted
* when receiving.
*
* @return Maximum size in octets for frame payload.
*/
public int getMaxFramePayloadSize() {
return mMaxFramePayloadSize;
}

public void setAutoPingInterval(int seconds) {
mAutoPingInterval = seconds;
}

public int getAutoPingInterval() {
return mAutoPingInterval;
}

public void setAutoPingTimeout(int seconds) {
mAutoPingTimeout = seconds;
}

public int getAutoPingTimeout() {
return mAutoPingTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,7 @@

package io.crossbar.autobahn.wamp.types;

public class WebSocketOptions {
@Deprecated
public class WebSocketOptions extends TransportOptions {

private int mMaxFramePayloadSize;

public WebSocketOptions() {
mMaxFramePayloadSize = 128 * 1024;
}

/**
* Set maximum frame payload size that will be accepted
* when receiving.
* <p>
* DEFAULT: 4MB
*
* @param size Maximum size in octets for frame payload.
*/
public void setMaxFramePayloadSize(int size) {
if (size > 0) {
mMaxFramePayloadSize = size;
}
}

/**
* Get maximum frame payload size that will be accepted
* when receiving.
*
* @return Maximum size in octets for frame payload.
*/
public int getMaxFramePayloadSize() {
return mMaxFramePayloadSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,20 @@ public class WebSocketConnection implements IWebSocket {

private ScheduledExecutorService mExecutor;
// FIXME: make this configurable.
private final long mIdleTimeout = 10;
private final int mAutoPingInterval = 10;
private final int mAutoPingTimeout = 5;

private final Runnable mAutoPinger = new Runnable() {
@Override
public void run() {
if (mReader != null && mReader.getTimeSinceLastRead() >= mIdleTimeout - 1) {
if (mReader != null && mReader.getTimeSinceLastRead() >= mAutoPingInterval - 1) {
sendPing();
mExecutor.schedule(() -> {
if (mReader.getTimeSinceLastRead() < mIdleTimeout) {
if (mReader.getTimeSinceLastRead() < mAutoPingInterval) {
return;
}
forward(new ConnectionLost("AutoPing timed out."));
}, 2, TimeUnit.SECONDS);
}, mAutoPingTimeout, TimeUnit.SECONDS);
}
}
};
Expand Down Expand Up @@ -605,7 +606,7 @@ public void handleMessage(Message msg) {

if (serverHandshake.mSuccess) {
if (mWsHandler != null) {
mExecutor.scheduleAtFixedRate(mAutoPinger, mIdleTimeout, mIdleTimeout, TimeUnit.SECONDS);
mExecutor.scheduleAtFixedRate(mAutoPinger, mAutoPingInterval, mAutoPingInterval, TimeUnit.SECONDS);
String protocol = getOrDefault(serverHandshake.headers,
"Sec-WebSocket-Protocol", null);
mWsHandler.setConnection(WebSocketConnection.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class WebSocketOptions {
private boolean mMaskClientFrames;
private int mReconnectInterval;
private String[] mTlsProtocols;

private int mAutoPingInterval;
private int mAutoPingTimeout;

/**
* Construct default options.
Expand All @@ -46,6 +47,8 @@ public WebSocketOptions() {
mMaskClientFrames = true;
mReconnectInterval = 0; // no reconnection by default
mTlsProtocols = null;
mAutoPingInterval = 10;
mAutoPingTimeout = 5;
}

/**
Expand All @@ -65,6 +68,8 @@ public WebSocketOptions(WebSocketOptions other) {
mMaskClientFrames = other.mMaskClientFrames;
mReconnectInterval = other.mReconnectInterval;
mTlsProtocols = other.mTlsProtocols;
mAutoPingInterval = other.mAutoPingInterval;
mAutoPingTimeout = other.mAutoPingTimeout;
}

/**
Expand Down Expand Up @@ -281,4 +286,20 @@ public String[] getTLSEnabledProtocols() {
public void setTLSEnabledProtocols(String[] protocols) {
this.mTlsProtocols = protocols;
}

public void setAutoPingInterval(int seconds) {
mAutoPingInterval = seconds;
}

public int getAutoPingInterval() {
return mAutoPingInterval;
}

public void setAutoPingTimeout(int seconds) {
mAutoPingTimeout = seconds;
}

public int getAutoPingTimeout() {
return mAutoPingTimeout;
}
}
Loading

0 comments on commit 12fdc31

Please sign in to comment.