From 5a9dbab613d2262948f67f2f02c1c1d74608c4fb Mon Sep 17 00:00:00 2001 From: wpl <1269223860@qq.com> Date: Thu, 13 Jun 2019 10:26:12 +0800 Subject: [PATCH] fix issue 4347 [ClientAPI]ReaderBuilder.loadConf() not working (#4382) --- .../pulsar/client/impl/ReaderBuilderImpl.java | 2 ++ .../impl/conf/ReaderConfigurationData.java | 3 ++ .../pulsar/client/impl/BuildersTest.java | 28 +++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index d74dc83b8c0cd..9c28e0321b408 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -96,7 +96,9 @@ public CompletableFuture> createAsync() { @Override public ReaderBuilder loadConf(Map config) { + MessageId startMessageId = conf.getStartMessageId(); conf = ConfigurationDataUtils.loadData(config, conf, ReaderConfigurationData.class); + conf.setStartMessageId(startMessageId); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 362c8f96c5c42..6645c9cb231cd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.conf; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.io.Serializable; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -31,6 +32,8 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private String topicName; + + @JsonIgnore private MessageId startMessageId; private int receiverQueueSize = 1000; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 4995706d6643b..0bb68c0ea44f3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -21,7 +21,12 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.testng.annotations.Test; public class BuildersTest { @@ -69,4 +74,27 @@ public void enableTlsTest() { assertEquals(builder.conf.isUseTls(), true); assertEquals(builder.conf.getServiceUrl(), "pulsar+ssl://service:6650"); } + + @Test + public void readerBuilderLoadConfTest() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + String topicName = "test_src"; + MessageId messageId = new MessageIdImpl(1, 2, 3); + Map config = new HashMap<>(); + config.put("topicName", topicName); + config.put("receiverQueueSize", 2000); + ReaderBuilderImpl builder = (ReaderBuilderImpl) client.newReader() + .startMessageId(messageId) + .loadConf(config); + + Class clazz = builder.getClass(); + Field conf = clazz.getDeclaredField("conf"); + conf.setAccessible(true); + Object obj = conf.get(builder); + assertTrue(obj instanceof ReaderConfigurationData); + if (obj instanceof ReaderConfigurationData) { + assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName); + assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId); + } + } }