Skip to content

Commit

Permalink
Ensuring store consumers are started exactly once (apache#1094)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmatharu-zz authored and prateekm committed Jun 28, 2019
1 parent bebad34 commit 8021074
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ private void restoreStores() {
// initialize each TaskStorageManager
this.taskRestoreManagers.values().forEach(taskStorageManager -> taskStorageManager.initialize());

// Start store consumers
this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.start());
// Start each store consumer once
this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.start());

// Create a thread pool for parallel restores (and stopping of persistent stores)
ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
Expand All @@ -657,8 +657,8 @@ private void restoreStores() {

executorService.shutdown();

// Stop store consumers
this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.stop());
// Stop each store consumer once
this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.stop());

// Now re-create persistent stores in read-write mode, leave non-persistent stores as-is
recreatePersistentTaskStoresInReadWriteMode(this.containerModel, jobContext, containerContext,
Expand Down

0 comments on commit 8021074

Please sign in to comment.