Skip to content

Commit

Permalink
Add log info for key-shared subscribe mode (apache#4463)
Browse files Browse the repository at this point in the history
* Add log info for key-shared subscribe mode

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored and merlimat committed Jun 4, 2019
1 parent d0dffd6 commit 4db4d53
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,10 +1056,11 @@ private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload)
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();

log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", remoteAddress,
send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
headersAndPayload.readableBytes());
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey());
}
msgMetadata.recycle();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}

int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
}
if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
Expand Down Expand Up @@ -172,6 +176,9 @@ private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
if (log.isDebugEnabled()) {
log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
}
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
}
Expand Down

0 comments on commit 4db4d53

Please sign in to comment.