Skip to content

Commit

Permalink
WAMP: Fire onLeave() when session is lost (crossbario#390)
Browse files Browse the repository at this point in the history
* android: call onLeave listeners before onDisconnect

* netty: call onLeave listeners before onDisconnect
  • Loading branch information
om26er authored Apr 6, 2018
1 parent 3a46e7d commit cd3d446
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package io.crossbar.autobahn.utils;

public class Globals {
public static final boolean DEBUG = false;
public static final boolean DEBUG = true;
}
14 changes: 14 additions & 0 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ public void onMessage(byte[] payload, boolean isBinary) throws Exception {
}
}

@Override
public void onLeave(CloseDetails details) {
LOGGER.d("onLeave(), reason=" + details.reason);

List<CompletableFuture<?>> futures = new ArrayList<>();
for (OnLeaveListener listener: mOnLeaveListeners) {
futures.add(runAsync(() -> listener.onLeave(this, details), mExecutor));
}
CompletableFuture d = combineFutures(futures);
d.thenRunAsync(() -> {
LOGGER.d("Notified all Session.onLeave listeners.");
}, mExecutor);
}

private void onPreSessionMessage(IMessage message) throws Exception {
if (message instanceof Welcome) {
Welcome msg = (Welcome) message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
package io.crossbar.autobahn.wamp.interfaces;


import io.crossbar.autobahn.wamp.types.CloseDetails;

public interface ITransportHandler {

void onConnect(ITransport transport, ISerializer serializer) throws Exception;

void onMessage(byte[] payload, boolean isBinary) throws Exception;

void onLeave(CloseDetails details);

void onDisconnect(boolean wasClean);

boolean isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@
package io.crossbar.autobahn.wamp.transports;

import java.util.List;
import java.util.logging.Logger;

import io.crossbar.autobahn.utils.ABLogger;
import io.crossbar.autobahn.utils.IABLogger;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.JSONSerializer;
import io.crossbar.autobahn.wamp.serializers.MessagePackSerializer;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.crossbar.autobahn.wamp.types.TransportOptions;
import io.crossbar.autobahn.websocket.WebSocketConnection;
import io.crossbar.autobahn.websocket.WebSocketConnectionHandler;
import io.crossbar.autobahn.websocket.interfaces.IWebSocketConnectionHandler;
import io.crossbar.autobahn.websocket.types.ConnectionResponse;
import io.crossbar.autobahn.websocket.types.WebSocketOptions;

public class AndroidWebSocket implements ITransport {

private static final Logger LOGGER = Logger.getLogger(AndroidWebSocket.class.getName());
public static final IABLogger LOGGER = ABLogger.getLogger(AndroidWebSocket.class.getName());
private static final String[] SERIALIZERS_DEFAULT = new String[] {
CBORSerializer.NAME, MessagePackSerializer.NAME, JSONSerializer.NAME};

Expand Down Expand Up @@ -78,7 +81,7 @@ public void connect(ITransportHandler transportHandler, TransportOptions options

@Override
public void onConnect(ConnectionResponse response) {
LOGGER.info(String.format("Negotiated serializer=%s", response.protocol));
LOGGER.d(String.format("Negotiated serializer=%s", response.protocol));
try {
mSerializer = initializeSerializer(response.protocol);
} catch (Exception e) {
Expand All @@ -97,6 +100,16 @@ public void onOpen() {

@Override
public void onClose(int code, String reason) {
switch (code) {
case IWebSocketConnectionHandler.CLOSE_CONNECTION_LOST:
transportHandler.onLeave(
new CloseDetails(CloseDetails.REASON_TRANSPORT_LOST, null));
break;
default:
transportHandler.onLeave(
new CloseDetails(CloseDetails.REASON_DEFAULT, null));
}
LOGGER.d(String.format("Disconnected, code=%s, reasons=%s", code, reason));
transportHandler.onDisconnect(code == 1000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.JSONSerializer;
import io.crossbar.autobahn.wamp.serializers.MessagePackSerializer;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.crossbar.autobahn.wamp.types.TransportOptions;
import io.crossbar.autobahn.wamp.types.WebSocketOptions;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -202,7 +203,7 @@ public boolean isOpen() {
public void close() throws Exception {
LOGGER.v("close()");
if (mHandler != null && mChannel != null) {
mHandler.close(mChannel, true);
mHandler.close(mChannel, true, new CloseDetails(CloseDetails.REASON_DEFAULT, null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
package io.crossbar.autobahn.wamp.transports;

import io.crossbar.autobahn.utils.ABLogger;
import io.crossbar.autobahn.utils.Globals;
import io.crossbar.autobahn.utils.IABLogger;
import io.crossbar.autobahn.wamp.interfaces.ISerializer;
import io.crossbar.autobahn.wamp.interfaces.ITransport;
import io.crossbar.autobahn.wamp.interfaces.ITransportHandler;
import io.crossbar.autobahn.wamp.serializers.CBORSerializer;
import io.crossbar.autobahn.wamp.serializers.JSONSerializer;
import io.crossbar.autobahn.wamp.serializers.MessagePackSerializer;
import io.crossbar.autobahn.wamp.types.CloseDetails;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -37,7 +39,6 @@


public class NettyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private static final IABLogger LOGGER = ABLogger.getLogger(
NettyWebSocketClientHandler.class.getName());

Expand All @@ -47,6 +48,8 @@ public class NettyWebSocketClientHandler extends SimpleChannelInboundHandler<Obj
private ITransportHandler mTransportHandler;
private boolean mWasCleanClose;

private CloseDetails mCloseDetails;

NettyWebSocketClientHandler(WebSocketClientHandshaker handshaker, ITransport transport,
ITransportHandler transportHandler) {
mHandshaker = handshaker;
Expand All @@ -71,6 +74,7 @@ public void channelActive(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
LOGGER.i("WebSocket Client disconnected!");
mTransportHandler.onLeave(mCloseDetails);
mTransportHandler.onDisconnect(mWasCleanClose);
}

Expand Down Expand Up @@ -124,7 +128,8 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
LOGGER.d(String.format(
"Received Close frame, code=%s, reason=%s",
closeWebSocketFrame.statusCode(), closeWebSocketFrame.reasonText()));
close(ctx, closeWebSocketFrame.statusCode() == 1000);
close(ctx, closeWebSocketFrame.statusCode() == 1000,
new CloseDetails(CloseDetails.REASON_DEFAULT, null));
}
}

Expand All @@ -133,7 +138,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
close(ctx, false);
close(ctx, false, new CloseDetails(CloseDetails.REASON_TRANSPORT_LOST, null));
} else if (event.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingWebSocketFrame());
}
Expand All @@ -142,21 +147,25 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (Globals.DEBUG) {
cause.printStackTrace();
}
if (!mHandshakeFuture.isDone()) {
mHandshakeFuture.setFailure(cause);
}
close(ctx, false);
close(ctx, false, new CloseDetails(CloseDetails.REASON_TRANSPORT_LOST, null));
}

private void close(ChannelHandlerContext context, boolean wasClean) {
private void close(ChannelHandlerContext context, boolean wasClean, CloseDetails details) {
context.close();
mWasCleanClose = wasClean;
mCloseDetails = details;
}

void close(Channel channel, boolean wasClean) {
void close(Channel channel, boolean wasClean, CloseDetails details) {
channel.close();
mWasCleanClose = wasClean;
mCloseDetails = details;
}

private ISerializer initializeSerializer(String negotiatedSerializer) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
package io.crossbar.autobahn.wamp.types;

public class CloseDetails {

public static final String REASON_DEFAULT = "wamp.close.normal";
public static final String REASON_TRANSPORT_LOST = "wamp.close.transport_lost";

public final String reason;
public final String message;

Expand Down

0 comments on commit cd3d446

Please sign in to comment.