Skip to content

Commit

Permalink
SAMZA-146: Throughput degrades unreasonably as the number of topic-pa…
Browse files Browse the repository at this point in the history
…rtitions increases.
  • Loading branch information
jghoman committed Feb 13, 2014
1 parent 30a6063 commit 5cdbd02
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.Queue
import org.apache.samza.serializers.SerdeManager
import grizzled.slf4j.Logging
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.util.DoublingBackOff

/**
* The SystemConsumers class coordinates between all SystemConsumers, the
Expand Down Expand Up @@ -135,6 +136,13 @@ class SystemConsumers(
*/
val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - fetchThresholdPct)).toInt

/**
* Make the maximum backoff proportional to the number of streams we're consuming.
* For a few streams, make the max back off 1, but for hundreds make it up to 1k,
* which experimentally has shown to be the most performant.
*/
var maxBackOff = 0

debug("Got stream consumers: %s" format consumers)
debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
debug("Got no new message timeout: %s" format noNewMessagesTimeout)
Expand All @@ -148,6 +156,10 @@ class SystemConsumers(
def start {
debug("Starting consumers.")

maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt

debug("Got maxBackOff: " + maxBackOff)

consumers
.keySet
.foreach(metrics.registerSystem)
Expand Down Expand Up @@ -176,7 +188,31 @@ class SystemConsumers(
chooser.register(systemStreamPartition, lastReadOffset)
}

def choose = {
/**
* Needs to be be lazy so that we are sure to get the value of maxBackOff assigned
* in start(), rather than its initial value.
*/
lazy val refresh = new DoublingBackOff(maxBackOff) {
def call(): Boolean = {
debug("Refreshing chooser with new messages.")

// Poll every system for new messages.
val receivedNewMessages = consumers.keys.map(poll(_)).contains(true)

// Update the chooser.
neededByChooser.foreach(systemStreamPartition =>
// If we have messages for a stream that the chooser needs, then update.
if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
updateFetchMap(systemStreamPartition)
neededByChooser -= systemStreamPartition
})

receivedNewMessages
}
}

def choose: IncomingMessageEnvelope = {
val envelopeFromChooser = chooser.choose

if (envelopeFromChooser == null) {
Expand All @@ -200,31 +236,16 @@ class SystemConsumers(
metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
}

refresh
refresh.maybeCall()
envelopeFromChooser
}

private def refresh {
debug("Refreshing chooser with new messages.")

// Poll every system for new messages.
consumers.keys.foreach(poll(_))

// Update the chooser.
neededByChooser.foreach(systemStreamPartition =>
// If we have messages for a stream that the chooser needs, then update.
if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
updateFetchMap(systemStreamPartition)
neededByChooser -= systemStreamPartition
})
}


/**
* Poll a system for new messages from SystemStreamPartitions that have
* dipped below the depletedQueueSizeThreshold threshold.
* dipped below the depletedQueueSizeThreshold threshold. Return true if
* any envelopes were found, false if none.
*/
private def poll(systemName: String) = {
private def poll(systemName: String): Boolean = {
debug("Polling system consumer: %s" format systemName)

metrics.systemPolls(systemName).inc
Expand Down Expand Up @@ -259,6 +280,8 @@ class SystemConsumers(

debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages))
})

!incomingEnvelopes.isEmpty
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util

/**
* Perform the provided action (via the call method) and, if it returns true,
* perform it again, the next time. If, however, call returns false, do not
* perform call the next time, instead wait 2*n calls before actually calling,
* with n increasing to the maximum specified in the constructor.
* @param maxBackOff Absolute maximum number of calls to call before actually performing call.
*/
abstract class DoublingBackOff(maxBackOff:Int = 64) {
var invocationsBeforeCall = 0
var currentBackOff = 0

/**
* Method to invoke and whose return value will determine the next time
* it is called again.
*/
def call():Boolean

/**
* Possibly execute the call method, based on the result of the previous run.
*/
def maybeCall():Unit = {
if(invocationsBeforeCall == 0) {
if (call()) {
// call succeeded so reset backoff
currentBackOff = 0
} else {
// Call failed, so start backing off
currentBackOff = scala.math.min(maxBackOff, nextBackOff(currentBackOff))
invocationsBeforeCall = currentBackOff
}
} else {
invocationsBeforeCall -= 1
}

}

// 2 * 0 == 0, making getting started a wee bit hard, so we need a little help with that first back off
private def nextBackOff(i:Int) = if(i == 0) 1 else 2 * i

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util

import org.junit.Test
import org.junit.Assert._

class TestDoublingBackOff {
@Test def zeroBackOffWorks() {
var counter = 0
val zeroBackOff = new DoublingBackOff(0) {
def call(): Boolean = {
counter += 1
true
}
}

for(i <- 0 to 1000) {
assertEquals(i, counter)
zeroBackOff.maybeCall()
}
}

@Test def backOffWorks() {
val toReturn = List(true, false, true, true)
var counter = 0
val ebo = new DoublingBackOff() {
def call(): Boolean = {
counter += 1
toReturn(counter - 1)
}
}

ebo.maybeCall() // will get back true
assertEquals(1, counter)
ebo.maybeCall() // will get back false
assertEquals(2, counter)
ebo.maybeCall() // last false means we won't actually call, will hold off for one iteration
assertEquals(2, counter)
ebo.maybeCall() // and call on this one, which gives back true
assertEquals(3, counter)
ebo.maybeCall() // so we immediately call again and increment
assertEquals(4, counter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void start() {
}

// Start thread.
Thread thread = new Thread(new MockSystemConsumerRunnable(threadSsps));
Thread thread = new Thread(new MockSystemConsumerRunnable(threadSsps), "MockSystemConsumer-" + i);
thread.setDaemon(true);
threads.add(thread);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class TestTask extends StreamTask with InitableTask {
}

def awaitMessage {
gotMessage.await(60, TimeUnit.SECONDS)
assertTrue("Timed out of waiting for message rather than received one.", gotMessage.await(60, TimeUnit.SECONDS))
assertEquals(0, gotMessage.getCount)
gotMessage = new CountDownLatch(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TestSamzaContainerPerformance extends Logging{
"task.inputs" -> (0 until streamCount).map(i => "mock.stream" + i).mkString(","),
"task.log.interval" -> logInterval.toString,
"task.max.messages" -> maxMessages.toString,
"systems.mock.samza.factory" -> "org.apache.samza.system.mock.MockSystemFactory",
"systems.mock.samza.factory" -> classOf[org.apache.samza.system.mock.MockSystemFactory].getCanonicalName,
"systems.mock.partitions.per.stream" -> partitionsPerStreamCount.toString,
"systems.mock.messages.per.batch" -> messagesPerBatch.toString,
"systems.mock.consumer.thread.count" -> consumerThreadCount.toString,
Expand Down

0 comments on commit 5cdbd02

Please sign in to comment.