Skip to content

Commit

Permalink
[client] Provide a clock for generating publish timestamp for produce…
Browse files Browse the repository at this point in the history
…rs (apache#4562)

*Motivation*

Currently producers uses `System.currentTimeMillis()` as publish timestamp by default.
However at some use cases, producers would like to a different way for generating publish timestamp.
E.g. in a database use case, a producer might be use HLC (Hybrid Logic Clock) as publish timestamp;
in integration tests, it might require the producer to use a deterministic way to generate publish timestamp.

*Changes*

This PR introduces a `clock` in building the client. This allows applications to override the system clock
with its own implementation.

*Verify the change*

Add unit test to test customized clock in both batch and non-batch cases.
  • Loading branch information
sijie authored Jun 25, 2019
1 parent eee5767 commit 7397b96
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
Expand All @@ -41,6 +42,9 @@
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -57,9 +61,11 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -106,6 +112,140 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testPublishTimestampBatchDisabled() throws Exception {

log.info("-- Starting {} test --", methodName);

AtomicLong ticker = new AtomicLong(0);

Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}

@Override
public Clock withZone(ZoneId zone) {
return this;
}

@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}

@Override
public long millis() {
return ticker.incrementAndGet();
}
};

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();

final String topic = "persistent://my-property/my-ns/test-publish-timestamp";

@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L + i, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}

@Test
public void testPublishTimestampBatchEnabled() throws Exception {

log.info("-- Starting {} test --", methodName);

AtomicLong ticker = new AtomicLong(0);

Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}

@Override
public Clock withZone(ZoneId zone) {
return this;
}

@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}

@Override
public long millis() {
return ticker.incrementAndGet();
}
};

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();

final String topic = "persistent://my-property/my-ns/test-publish-timestamp";

@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();

final int numMessages = 5;

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(10 * numMessages)
.create();

for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}

@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -376,4 +377,20 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);

/**
* The clock used by the pulsar client.
*
* <p>The clock is currently used by producer for setting publish timestamps.
* {@link Clock#millis()} is called to retrieve current timestamp as the publish
* timestamp when producers produce messages. The default clock is a system default zone
* clock. So the publish timestamp is same as calling {@link System#currentTimeMillis()}.
*
* <p>Warning: the clock is used for TTL enforcement and timestamp based seeks.
* so be aware of the impacts if you are going to use a different clock.
*
* @param clock the clock used by the pulsar client to retrieve time information
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -223,4 +224,10 @@ public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
public ClientConfigurationData getClientConfigurationData() {
return conf;
}

@Override
public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void sendAsync(Message<T> message, SendCallback callback) {
sequenceId = msgMetadataBuilder.getSequenceId();
}
if (!msgMetadataBuilder.hasPublishTime()) {
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
msgMetadataBuilder.setPublishTime(client.getClientClock().millis());

checkArgument(!msgMetadataBuilder.hasProducerName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.StringUtils.isBlank;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -33,7 +32,12 @@
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.*;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -69,7 +73,6 @@
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -115,6 +118,8 @@ public SchemaInfoProvider load(String topicName) {
}
});

private final Clock clientClock;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, getEventLoopGroup(conf));
}
Expand All @@ -131,6 +136,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
this.clientClock = conf.getClock();
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
Expand All @@ -157,6 +163,11 @@ public ClientConfigurationData getConfiguration() {
return conf;
}

@VisibleForTesting
public Clock getClientClock() {
return clientClock;
}

@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.client.impl.conf;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.time.Clock;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -70,6 +70,9 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);

@JsonIgnore
private Clock clock = Clock.systemDefaultZone();

public Authentication getAuthentication() {
if (authentication == null) {
this.authentication = new AuthenticationDisabled();
Expand Down

0 comments on commit 7397b96

Please sign in to comment.