Skip to content

Commit

Permalink
Fix issue in STOMP broker relay
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Jul 15, 2013
1 parent 5d20b75 commit e0d393e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public String getUser() {
}

public String getTargetDestination(String sessionId) {
return (this.targetDestination != null) ? this.targetDestination + "/" + sessionId : null;
return (this.targetDestination != null) ? this.targetDestination + sessionId : null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -232,7 +231,7 @@ public void stop() {
}
this.running = false;
try {
this.tcpClient.close().await(5000, TimeUnit.MILLISECONDS);
this.tcpClient.close().await();
}
catch (Throwable t) {
logger.error("Failed to close reactor TCP client", t);
Expand Down Expand Up @@ -321,12 +320,13 @@ protected void handleInternal(Message<?> message, SimpMessageType messageType, S
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
RelaySession session = this.relaySessions.remove(sessionId);
if (session != null) {
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
}
session.forward(message);
return;
}
session.forward(message);
}
else {
RelaySession session = this.relaySessions.get(sessionId);
Expand Down Expand Up @@ -404,16 +404,9 @@ private void readStompFrame(String stompFrame) {
}
return;
}
if (StompCommand.ERROR == headers.getStompCommand()) {
if (logger.isDebugEnabled()) {
logger.warn("STOMP ERROR: " + headers.getMessage() + ". Removing session id=" + this.sessionId);
}
relaySessions.remove(this.sessionId);
}

headers.setSessionId(this.sessionId);
message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();

sendMessageToClient(message);
}

Expand Down Expand Up @@ -455,25 +448,13 @@ public void forward(Message<?> message) {
}

private boolean forwardInternal(Message<?> message, TcpConnection<String, String> connection) {

try {
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
}
byte[] bytes = stompMessageConverter.fromMessage(message);
connection.send(new String(bytes, Charset.forName("UTF-8")));
}
catch (Throwable ex) {
logger.error("Forward failed message id=" + message.getHeaders().getId(), ex);
try {
connection.close();
}
catch (Throwable t) {
// ignore
}
sendError(this.sessionId, "Failed to forward message " + message + ": " + ex.getMessage());
return false;
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
}
byte[] bytes = stompMessageConverter.fromMessage(message);
connection.send(new String(bytes, Charset.forName("UTF-8")));

// TODO: detect if send fails and send ERROR downstream (except on DISCONNECT)
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,11 @@ public void handleMessage(Message<?> message) {
WebSocketSession session = this.sessions.get(sessionId);
if (session == null) {
// TODO: failed message delivery mechanism
logger.error("Ignoring message, session not found: " + sessionId);
logger.error("Ignoring message, sessionId not found: " + message);
return;
}

if (headers.getSubscriptionId() == null) {
if (StompCommand.MESSAGE.equals(headers.getStompCommand()) && (headers.getSubscriptionId() == null)) {
// TODO: failed message delivery mechanism
logger.error("Ignoring message, no subscriptionId header: " + message);
return;
Expand Down

0 comments on commit e0d393e

Please sign in to comment.