Skip to content
This repository has been archived by the owner on Sep 23, 2019. It is now read-only.

Commit

Permalink
SAMZA-775: add consumer size-based fetch threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaxini authored and nickpan47 committed Nov 24, 2015
1 parent acd340e commit 72a558c
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 19 deletions.
25 changes: 25 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,31 @@ <h1>Samza Configuration Reference</h1>
</td>
</tr>

<tr>
<td class="property" id="systems-samza-fetch-threshold-bytes">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold.bytes</td>
<td class="default">-1</td>
<td class="description">
When consuming streams from Kafka, a Samza container maintains an in-memory buffer
for incoming messages in order to increase throughput (the stream task can continue
processing buffered messages while new messages are fetched from Kafka). This
parameter determines the total size of messages we aim to buffer across all stream
partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered
prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this.
This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes
limit + size of max message in the partition for a given stream. If the value of this property is > 0
then this takes precedence over systems.<span class="system">system-name</span>.samza.fetch.threshold.<br>
For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered,
then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage
can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops
below 1000, a fetch request will be executed to get more data for it.

Increasing this parameter will decrease the latency between when a queue is drained of messages and when new
messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory.

The default value is -1, which means this is not used.
</td>
</tr>

<tr>
<td class="property" id="task-checkpoint-system">task.checkpoint.system</td>
<td class="default"></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class IncomingMessageEnvelope {
private final String offset;
private final Object key;
private final Object message;
private final int size;

/**
* Constructs a new IncomingMessageEnvelope from specified components.
Expand All @@ -38,10 +39,24 @@ public class IncomingMessageEnvelope {
* @param message A deserialized message received from the partition offset.
*/
public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message) {
this(systemStreamPartition, offset, key, message, 0);
}

/**
* Constructs a new IncomingMessageEnvelope from specified components.
* @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
* from which the stream came, and the partition of the stream from which the message was received.
* @param offset The offset in the partition that the message was received from.
* @param key A deserialized key received from the partition offset.
* @param message A deserialized message received from the partition offset.
* @param size size of the message and key in bytes.
*/
public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) {
this.systemStreamPartition = systemStreamPartition;
this.offset = offset;
this.key = key;
this.message = message;
this.size = size;
}

public SystemStreamPartition getSystemStreamPartition() {
Expand All @@ -60,6 +75,10 @@ public Object getMessage() {
return message;
}

public int getSize() {
return size;
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;

import java.util.concurrent.atomic.AtomicLong;

/**
* <p>
* BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
Expand All @@ -63,17 +65,15 @@
public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final BlockingEnvelopeMapMetrics metrics;
private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
protected final boolean fetchLimitByBytesEnabled;

public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
}

public BlockingEnvelopeMap(Clock clock) {
this(new NoOpMetricsRegistry(), clock);
}

public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
this(metricsRegistry, new Clock() {
public long currentTimeMillis() {
Expand All @@ -83,15 +83,18 @@ public long currentTimeMillis() {
}

public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
this(metricsRegistry, clock, null);
this(metricsRegistry, clock, null, false);
}

public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
// Created when size is disabled for code simplification, and as the overhead is negligible.
this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
}

/**
Expand All @@ -100,6 +103,8 @@ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String
public void register(SystemStreamPartition systemStreamPartition, String offset) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
// Created when size is disabled for code simplification, and the overhead is negligible.
bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
}

protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
Expand Down Expand Up @@ -150,12 +155,24 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<System

if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
if (fetchLimitByBytesEnabled) {
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}
}

return messagesToReturn;
}

private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) {
long outgoingListBytes = 0;
for (IncomingMessageEnvelope envelope : outgoingList) {
outgoingListBytes += envelope.getSize();
}
// subtract the size of the messages dequeued.
bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1 * outgoingListBytes);
}

/**
* Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
* queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
Expand All @@ -166,6 +183,9 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<System
*/
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
bufferedMessages.get(systemStreamPartition).put(envelope);
if (fetchLimitByBytesEnabled) {
bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
}
}

/**
Expand Down Expand Up @@ -198,6 +218,16 @@ public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
}
}

public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);

if (sizeInBytes == null) {
throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
} else {
return sizeInBytes.get();
}
}

protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
return noMoreMessage.put(systemStreamPartition, isAtHead);
Expand Down Expand Up @@ -232,6 +262,9 @@ public void initMetrics(SystemStreamPartition systemStreamPartition) {
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));

metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
if (fetchLimitByBytesEnabled) {
metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
}
}

public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
Expand Down Expand Up @@ -271,4 +304,25 @@ public Integer getValue() {
return envelopes.size();
}
}

public class BufferSizeGauge extends Gauge<Long> {
private final SystemStreamPartition systemStreamPartition;

public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) {
super(name, 0L);

this.systemStreamPartition = systemStreamPartition;
}

@Override
public Long getValue() {
AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);

if (sizeInBytes == null) {
return 0L;
}

return sizeInBytes.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
public class TestBlockingEnvelopeMap {
private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null);
private static final IncomingMessageEnvelope ENVELOPE_WITH_SIZE = new IncomingMessageEnvelope(SSP, null, null, null, 100);
private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
private static final Clock CLOCK = new Clock() {
public long currentTimeMillis() {
return System.currentTimeMillis();
}
};

static {
FETCH.add(SSP);
Expand Down Expand Up @@ -78,6 +84,35 @@ public void testShouldGetSomeMessages() throws InterruptedException {
envelopes = map.poll(FETCH, 0);
assertEquals(1, envelopes.size());
assertEquals(2, envelopes.get(SSP).size());

// Size info.
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}

@Test
public void testNoSizeComputation() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
map.put(SSP, ENVELOPE);
map.put(SSP, ENVELOPE);
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);

// Size info.
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}

@Test
public void testSizeComputation() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(true);
map.register(SSP, "0");
map.put(SSP, ENVELOPE_WITH_SIZE);
map.put(SSP, ENVELOPE_WITH_SIZE);

// Size info.
assertEquals(200, map.getMessagesSizeInQueue(SSP));

Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}

@Test
Expand Down Expand Up @@ -177,12 +212,13 @@ public MockBlockingEnvelopeMap() {
this(null);
}

public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
injectedQueue = new MockQueue();
}

public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue) {
this(injectedQueue, new Clock() {
public long currentTimeMillis() {
return System.currentTimeMillis();
}
});
this(injectedQueue, CLOCK);
}

public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue, Clock clock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ object KafkaConfig {

val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400

/**
* Defines how many bytes to use for the buffered prefetch messages for job as a whole.
* The bytes for a single system/stream/partition are computed based on this.
* This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
*/
val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"

implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}

Expand All @@ -70,6 +79,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0


/**
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
Expand Down
Loading

0 comments on commit 72a558c

Please sign in to comment.