diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index e7209a17c2674..130a35bdaede2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.spy; @@ -41,6 +42,9 @@ import java.lang.reflect.Modifier; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -57,9 +61,11 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -106,6 +112,140 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testPublishTimestampBatchDisabled() throws Exception { + + log.info("-- Starting {} test --", methodName); + + AtomicLong ticker = new AtomicLong(0); + + Clock clock = new Clock() { + @Override + public ZoneId getZone() { + return ZoneId.systemDefault(); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + + @Override + public long millis() { + return ticker.incrementAndGet(); + } + }; + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .clock(clock) + .build(); + + final String topic = "persistent://my-property/my-ns/test-publish-timestamp"; + + @Cleanup + Consumer consumer = newPulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .subscribe(); + + @Cleanup + Producer producer = newPulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.newMessage() + .value(("value-" + i).getBytes(UTF_8)) + .eventTime((i + 1) * 100L) + .sendAsync(); + } + producer.flush(); + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(); + log.info("Received message '{}'.", new String(msg.getValue(), UTF_8)); + assertEquals(1L + i, msg.getPublishTime()); + assertEquals(100L * (i + 1), msg.getEventTime()); + } + } + + @Test + public void testPublishTimestampBatchEnabled() throws Exception { + + log.info("-- Starting {} test --", methodName); + + AtomicLong ticker = new AtomicLong(0); + + Clock clock = new Clock() { + @Override + public ZoneId getZone() { + return ZoneId.systemDefault(); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + + @Override + public long millis() { + return ticker.incrementAndGet(); + } + }; + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .clock(clock) + .build(); + + final String topic = "persistent://my-property/my-ns/test-publish-timestamp"; + + @Cleanup + Consumer consumer = newPulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .subscribe(); + + final int numMessages = 5; + + @Cleanup + Producer producer = newPulsarClient.newProducer() + .topic(topic) + .enableBatching(true) + .batchingMaxMessages(10 * numMessages) + .create(); + + for (int i = 0; i < numMessages; i++) { + producer.newMessage() + .value(("value-" + i).getBytes(UTF_8)) + .eventTime((i + 1) * 100L) + .sendAsync(); + } + producer.flush(); + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(); + log.info("Received message '{}'.", new String(msg.getValue(), UTF_8)); + assertEquals(1L, msg.getPublishTime()); + assertEquals(100L * (i + 1), msg.getEventTime()); + } + } + @DataProvider(name = "batch") public Object[][] codecProvider() { return new Object[][] { { 0 }, { 1000 } }; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index eb02c3f25ec3e..9e6cc7c2de2fa 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import java.time.Clock; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -376,4 +377,20 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @return the client builder instance */ ClientBuilder maxBackoffInterval(long duration, TimeUnit unit); + + /** + * The clock used by the pulsar client. + * + *

The clock is currently used by producer for setting publish timestamps. + * {@link Clock#millis()} is called to retrieve current timestamp as the publish + * timestamp when producers produce messages. The default clock is a system default zone + * clock. So the publish timestamp is same as calling {@link System#currentTimeMillis()}. + * + *

Warning: the clock is used for TTL enforcement and timestamp based seeks. + * so be aware of the impacts if you are going to use a different clock. + * + * @param clock the clock used by the pulsar client to retrieve time information + * @return the client builder instance + */ + ClientBuilder clock(Clock clock); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 013e94d8819bd..c65b7c2c23d1d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.time.Clock; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -223,4 +224,10 @@ public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) { public ClientConfigurationData getClientConfigurationData() { return conf; } + + @Override + public ClientBuilder clock(Clock clock) { + conf.setClock(clock); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 48656f8f4360a..7d446a055d016 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -352,7 +352,7 @@ public void sendAsync(Message message, SendCallback callback) { sequenceId = msgMetadataBuilder.getSequenceId(); } if (!msgMetadataBuilder.hasPublishTime()) { - msgMetadataBuilder.setPublishTime(System.currentTimeMillis()); + msgMetadataBuilder.setPublishTime(client.getClientClock().millis()); checkArgument(!msgMetadataBuilder.hasProducerName()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 764d25c52010a..d345da54d7c29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.StringUtils.isBlank; import com.google.common.annotations.VisibleForTesting; @@ -33,7 +32,12 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.*; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -69,7 +73,6 @@ import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider; import org.apache.pulsar.client.util.ExecutorProvider; -import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -115,6 +118,8 @@ public SchemaInfoProvider load(String topicName) { } }); + private final Clock clientClock; + public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { this(conf, getEventLoopGroup(conf)); } @@ -131,6 +136,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr this.eventLoopGroup = eventLoopGroup; setAuth(conf); this.conf = conf; + this.clientClock = conf.getClock(); conf.getAuthentication().start(); this.cnxPool = cnxPool; externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener")); @@ -157,6 +163,11 @@ public ClientConfigurationData getConfiguration() { return conf; } + @VisibleForTesting + public Clock getClientClock() { + return clientClock; + } + @Override public ProducerBuilder newProducer() { return new ProducerBuilderImpl<>(this, Schema.BYTES); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 3ab86382c81f8..e944896f99650 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -19,8 +19,8 @@ package org.apache.pulsar.client.impl.conf; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.time.Clock; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.Authentication; @@ -70,6 +70,9 @@ public class ClientConfigurationData implements Serializable, Cloneable { private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100); private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30); + @JsonIgnore + private Clock clock = Clock.systemDefaultZone(); + public Authentication getAuthentication() { if (authentication == null) { this.authentication = new AuthenticationDisabled();