Skip to content

Commit

Permalink
Fix peek message metadata broker while enable broker entry metadata. (a…
Browse files Browse the repository at this point in the history
…pache#9255)

### Motivation

Fix peek message metadata broker while enable broker entry metadata.

When enabled the broker entry metadata, following error occurs:

```
22:09:57.802 [broker-topic-workers-OrderedScheduler-4-0:org.apache.pulsar.common.protocol.Commands@1658] ERROR org.apache.pulsar.common.protocol.Commands - [PersistentSubscription{topic=persistent://public/default/__consumer_offsets-partition-0, name=reader-31a9742e6c}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:415) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1653) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:82) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:232) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:178) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_261]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_261]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
```

The root cause is peeking message metadata does not skip the broker entry metadata.
  • Loading branch information
codelipenghui authored Jan 25, 2021
1 parent 93748d7 commit 60161ed
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,8 @@ public void resetCloseFuture() {
// noop
}

public static final String NONE_KEY = "NONE_KEY";

protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadataAndPayload.markReaderIndex();
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
byte[] key = NONE_KEY.getBytes();
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
return metadata.getPartitionKey().getBytes();
}
return key;
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}

protected void addMessageToReplay(long ledgerId, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.assertj.core.util.Sets;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test for the broker entry metadata.
*/
public class BrokerEntryMetadataE2ETest extends BrokerTestBase {

@DataProvider(name = "subscriptionTypes")
public static Object[] subscriptionTypes() {
return new Object[] {
SubscriptionType.Exclusive,
SubscriptionType.Failover,
SubscriptionType.Shared,
SubscriptionType.Key_Shared
};
}

@BeforeClass
protected void setup() throws Exception {
conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
));
baseSetup();
}

@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
}

@Test(dataProvider = "subscriptionTypes")
public void testProduceAndConsume(SubscriptionType subType) throws Exception {
final String topic = newTopicName();
final int messages = 10;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(subType)
.subscriptionName("my-sub")
.subscribe();

for (int i = 0; i < messages; i++) {
producer.send(String.valueOf(i).getBytes());
}

int receives = 0;
for (int i = 0; i < messages; i++) {
Message<byte[]> received = consumer.receive();
++ receives;
Assert.assertEquals(i, Integer.valueOf(new String(received.getValue())).intValue());
}

Assert.assertEquals(messages, receives);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelec
.value(i)
.send();
}
int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
int slot = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
if (slot <= 20000) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import io.netty.util.concurrent.FastThreadLocal;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1654,6 +1654,7 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);

Expand All @@ -1664,6 +1665,24 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
}
}

private static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
try {
int readerIdx = metadataAndPayload.readerIndex();
skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
}
} catch (Throwable t) {
log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t);
}
return Commands.NONE_KEY;
}

public static int getCurrentProtocolVersion() {
// Return the last ProtocolVersion enum value
return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getValue();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-common/src/main/resources/findbugsExclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
<Match>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Bug pattern="MS_EXPOSE_REP"/>
</Match>
<Match>
<Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
Expand Down

0 comments on commit 60161ed

Please sign in to comment.