Skip to content

Commit

Permalink
SAMZA-1077: SamzaContainer should catch all Throwables instead of onl…
Browse files Browse the repository at this point in the history
…y exceptions

Author: vjagadish1989 <[email protected]>

Reviewers: Jake Maes <[email protected]>

Closes apache#30 from vjagadish1989/samza-1077
  • Loading branch information
jagadish-northguard committed Jan 30, 2017
1 parent 6dc33a8 commit c6c10d3
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ class SamzaContainer(
addShutdownHook
runLoop.run
} catch {
case e: Exception =>
error("Caught exception in process loop.", e)
case e: Throwable =>
error("Caught exception/error in process loop.", e)
throw e
} finally {
info("Shutting down.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,65 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
assertTrue(caughtException)
}

@Test
def testErrorInTaskInitShutsDownTask {
val task = new StreamTask with InitableTask with ClosableTask {
var wasShutdown = false

def init(config: Config, context: TaskContext) {
throw new NoSuchMethodError("Trigger a shutdown, please.")
}

def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
}

def close {
wasShutdown = true
}
}
val config = new MapConfig
val taskName = new TaskName("taskName")
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
Map[String, SystemConsumer]())
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
val taskInstance: TaskInstance[StreamTask] = new TaskInstance[StreamTask](
task,
taskName,
config,
new TaskInstanceMetrics,
null,
consumerMultiplexer,
collector,
containerContext
)
val runLoop = new RunLoop(
taskInstances = Map(taskName -> taskInstance),
consumerMultiplexer = consumerMultiplexer,
metrics = new SamzaContainerMetrics,
maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
val container = new SamzaContainer(
containerContext = containerContext,
taskInstances = Map(taskName -> taskInstance),
runLoop = runLoop,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
metrics = new SamzaContainerMetrics,
jmxServer = null
)
try {
container.run
fail("Expected error to be thrown in run method.")
} catch {
case e: Throwable => // Expected
}
assertTrue(task.wasShutdown)
}

@Test
def testStartStoresIncrementsCounter {
val task = new StreamTask {
Expand Down

0 comments on commit c6c10d3

Please sign in to comment.