Skip to content

Commit

Permalink
modified to use shutdownhook instead of finalize()
Browse files Browse the repository at this point in the history
  • Loading branch information
MabelYC committed Nov 18, 2020
1 parent 276ed04 commit f20be45
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public void init() {
systemConsumer.start();
systemProducer.register(SOURCE);
systemProducer.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("CoordinatorStreamStore Shut Down Hook thread is closing kafka clients");
this.systemProducer.stop();
this.systemConsumer.stop();
this.systemAdmin.stop();
}));
iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
readMessagesFromCoordinatorStream();
} else {
Expand Down Expand Up @@ -188,14 +194,6 @@ public void flush() {
}
}

@Override
protected void finalize() throws Throwable {
super.finalize();
systemAdmin.stop();
systemProducer.stop();
systemConsumer.stop();
}

/**
* <p>
* Fetches the metadata of the topic partition of coordinator stream. Registers the oldest offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
Preconditions.checkNotNull(systemAdmin)

systemAdmin.start()
Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
override def run = systemAdmin.stop()
})

info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
s"partition count: ${checkpointSpec.getPartitionCount}")
Expand All @@ -112,6 +115,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
systemConsumer.register(checkpointSsp, oldestOffset)
systemConsumer.start()
Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
override def run(): Unit = {
producerRef.get().stop()
systemConsumer.stop()
}
})
}

/**
Expand Down

0 comments on commit f20be45

Please sign in to comment.