From 4db4d532fb6d27f64243a652c715c1660b994102 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Tue, 4 Jun 2019 22:17:33 +0800 Subject: [PATCH] Add log info for key-shared subscribe mode (#4463) * Add log info for key-shared subscribe mode Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 9 +++++---- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 7 +++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9447af19fa6c5..84729706eb088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1056,10 +1056,11 @@ private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) headersAndPayload.markReaderIndex(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); headersAndPayload.resetReaderIndex(); - - log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", remoteAddress, - send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), - headersAndPayload.readableBytes()); + if (log.isDebugEnabled()) { + log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}", + remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), + headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey()); + } msgMetadata.recycle(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 154e87049b130..0cbe354ab549e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -100,6 +100,10 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits()); + if (log.isDebugEnabled()) { + log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}", + name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType); + } if (messagesForC > 0) { // remove positions first from replay list first : sendMessages recycles entries List subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC)); @@ -172,6 +176,9 @@ private byte[] peekStickyKey(ByteBuf metadataAndPayload) { MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.resetReaderIndex(); String key = metadata.getPartitionKey(); + if (log.isDebugEnabled()) { + log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey()); + } if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) { return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes(); }