Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-5481
Browse files Browse the repository at this point in the history
Add some additional logs to MQTT at the trace level.
  • Loading branch information
tabish121 committed Jan 6, 2015
1 parent 4bf5d0f commit 7ebc6ce
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ void onMQTTDisconnect() throws MQTTProtocolException {

void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
Topic[] topics = command.topics();
if (topics != null) {
byte[] qos = new byte[topics.length];
Expand Down Expand Up @@ -415,6 +417,8 @@ public void onActiveMQCommand(Command command) throws Exception {
consumerAcks.put(publish.messageId(), ack);
}
}
LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
publish.messageId(), clientId, connectionInfo.getConnectionId());
getMQTTTransport().sendToMQTT(publish.encode());
if (ack != null && !sub.expectAck(publish)) {
getMQTTTransport().sendToActiveMQ(ack);
Expand All @@ -433,6 +437,8 @@ public void onActiveMQCommand(Command command) throws Exception {

void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
message.onSend();
Expand All @@ -441,6 +447,8 @@ void onMQTTPublish(PUBLISH command) throws IOException, JMSException {

void onMQTTPubAck(PUBACK command) {
short messageId = command.messageId();
LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
messageId, clientId, connectionInfo.getConnectionId());
packetIdGenerator.ackPacketId(getClientId(), messageId);
MessageAck ack;
synchronized (consumerAcks) {
Expand Down Expand Up @@ -489,6 +497,8 @@ ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
msg.setProducerId(producerId);
MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
msg.setMessageId(id);
LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
Expand Down Expand Up @@ -582,6 +592,8 @@ public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSEx
result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
}
}
LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
return result;
}

Expand Down Expand Up @@ -691,6 +703,8 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw
} else {
PUBACK ack = new PUBACK();
ack.messageId(command.messageId());
LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
Expand All @@ -707,6 +721,8 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw
synchronized (publisherRecs) {
publisherRecs.put(command.messageId(), ack);
}
LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
Expand Down

0 comments on commit 7ebc6ce

Please sign in to comment.