Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jghoman committed Jan 21, 2014
2 parents f63ce72 + 9af9131 commit fe8df63
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 14 deletions.
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ ext {
metricsVersion = "2.2.0"
kafkaVersion = "0.8.1-SNAPSHOT"
commonsHttpClientVersion = "3.1"
leveldbVersion = "1.7"
leveldbVersion = "1.8"
yarnVersion = "2.2.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ object SamzaContainer extends Logging {
val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
val config = JsonConfigSerializer.fromJson(configStr)
val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)

val partitions = Util.createStreamPartitionsFromString(encodedStreamsAndPartitions)
if(partitions.isEmpty) {

if (partitions.isEmpty) {
throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.")
}

Expand Down Expand Up @@ -332,7 +332,7 @@ object SamzaContainer extends Logging {
// Wire up all task-level (unshared) objects.

val partitions = inputStreams.map(_.getPartition).toSet

val taskInstances = partitions.map(partition => {
debug("Setting up task instance: %s" format partition)

Expand Down Expand Up @@ -393,7 +393,7 @@ object SamzaContainer extends Logging {

val inputStreamsForThisPartition = inputStreams.filter(_.getPartition.equals(partition)).map(_.getSystemStream)
info("Assigning SystemStreams " + inputStreamsForThisPartition + " to " + partition)

val taskInstance = new TaskInstance(
task = task,
partition = partition,
Expand Down Expand Up @@ -439,16 +439,18 @@ class SamzaContainer(
jvm: JvmMetrics = null) extends Runnable with Logging {

def run {
info("Entering run loop.")
try {
info("Starting container.")

startMetrics
startCheckpoints
startStores
startTask
startProducers
startConsumers
startMetrics
startCheckpoints
startStores
startTask
startProducers
startConsumers

info("Entering run loop.")

try {
while (true) {
val coordinator = new ReadableCoordinator

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.container

import java.io.File
import org.apache.samza.config.Config
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.Partition
import org.apache.samza.config.MapConfig
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.system.SystemConsumer
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemProducer
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.task.StreamTask
import org.apache.samza.task.MessageCollector
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.InitableTask
import org.apache.samza.task.TaskContext
import org.apache.samza.task.ClosableTask

class TestSamzaContainer {
@Test
def testExceptionInTaskInitShutsDownTask {
val task = new StreamTask with InitableTask with ClosableTask {
var wasShutdown = false

def init(config: Config, context: TaskContext) {
throw new Exception("Trigger a shutdown, please.")
}

def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
}

def close {
wasShutdown = true
}
}
val config = new MapConfig
val partition = new Partition(0)
val containerName = "test-container"
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
Map[String, SystemConsumer]())
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
val taskInstance: TaskInstance = new TaskInstance(
task,
partition,
config,
new TaskInstanceMetrics,
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers)
val container = new SamzaContainer(
Map(partition -> taskInstance),
config,
consumerMultiplexer,
producerMultiplexer,
new SamzaContainerMetrics)
try {
container.run
fail("Expected exception to be thrown in run method.")
} catch {
case e: Exception => // Expected
}
assertTrue(task.wasShutdown)
}
}

0 comments on commit fe8df63

Please sign in to comment.