Skip to content

Commit

Permalink
fix issue 4347 [ClientAPI]ReaderBuilder.loadConf() not working (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
ambition119 authored and codelipenghui committed Jun 13, 2019
1 parent ce7e44f commit 5a9dbab
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public CompletableFuture<Reader<T>> createAsync() {

@Override
public ReaderBuilder<T> loadConf(Map<String, Object> config) {
MessageId startMessageId = conf.getStartMessageId();
conf = ConfigurationDataUtils.loadData(config, conf, ReaderConfigurationData.class);
conf.setStartMessageId(startMessageId);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.conf;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
Expand All @@ -31,6 +32,8 @@
public class ReaderConfigurationData<T> implements Serializable, Cloneable {

private String topicName;

@JsonIgnore
private MessageId startMessageId;

private int receiverQueueSize = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.testng.annotations.Test;

public class BuildersTest {
Expand Down Expand Up @@ -69,4 +74,27 @@ public void enableTlsTest() {
assertEquals(builder.conf.isUseTls(), true);
assertEquals(builder.conf.getServiceUrl(), "pulsar+ssl://service:6650");
}

@Test
public void readerBuilderLoadConfTest() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
String topicName = "test_src";
MessageId messageId = new MessageIdImpl(1, 2, 3);
Map<String, Object> config = new HashMap<>();
config.put("topicName", topicName);
config.put("receiverQueueSize", 2000);
ReaderBuilderImpl<byte[]> builder = (ReaderBuilderImpl<byte[]>) client.newReader()
.startMessageId(messageId)
.loadConf(config);

Class<?> clazz = builder.getClass();
Field conf = clazz.getDeclaredField("conf");
conf.setAccessible(true);
Object obj = conf.get(builder);
assertTrue(obj instanceof ReaderConfigurationData);
if (obj instanceof ReaderConfigurationData) {
assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName);
assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId);
}
}
}

0 comments on commit 5a9dbab

Please sign in to comment.