diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index e821dbc4e9e..5f34f17fece 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -342,6 +342,8 @@ void onMQTTDisconnect() throws MQTTProtocolException { void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { checkConnected(); + LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); Topic[] topics = command.topics(); if (topics != null) { byte[] qos = new byte[topics.length]; @@ -415,6 +417,8 @@ public void onActiveMQCommand(Command command) throws Exception { consumerAcks.put(publish.messageId(), ack); } } + LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", + publish.messageId(), clientId, connectionInfo.getConnectionId()); getMQTTTransport().sendToMQTT(publish.encode()); if (ack != null && !sub.expectAck(publish)) { getMQTTTransport().sendToActiveMQ(ack); @@ -433,6 +437,8 @@ public void onActiveMQCommand(Command command) throws Exception { void onMQTTPublish(PUBLISH command) throws IOException, JMSException { checkConnected(); + LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); @@ -441,6 +447,8 @@ void onMQTTPublish(PUBLISH command) throws IOException, JMSException { void onMQTTPubAck(PUBACK command) { short messageId = command.messageId(); + LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", + messageId, clientId, connectionInfo.getConnectionId()); packetIdGenerator.ackPacketId(getClientId(), messageId); MessageAck ack; synchronized (consumerAcks) { @@ -489,6 +497,8 @@ ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { msg.setProducerId(producerId); MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); msg.setMessageId(id); + LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", + command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); msg.setTimestamp(System.currentTimeMillis()); msg.setPriority((byte) Message.DEFAULT_PRIORITY); msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); @@ -582,6 +592,8 @@ public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSEx result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); } } + LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", + result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); return result; } @@ -691,6 +703,8 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw } else { PUBACK ack = new PUBACK(); ack.messageId(command.messageId()); + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); converter.getMQTTTransport().sendToMQTT(ack.encode()); } } @@ -707,6 +721,8 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw synchronized (publisherRecs) { publisherRecs.put(command.messageId(), ack); } + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); converter.getMQTTTransport().sendToMQTT(ack.encode()); } }