Skip to content

Commit

Permalink
add precondition
Browse files Browse the repository at this point in the history
  • Loading branch information
MabelYC committed Jan 5, 2021
1 parent f0afc48 commit f013058
Showing 1 changed file with 2 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Preconditions
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.config.{Config, JobConfig, TaskConfig}
import org.apache.samza.container.TaskName
Expand Down Expand Up @@ -86,6 +87,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
*/
override def createResources(): Unit = {
val createResourcesSystemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName + "createResource")
Preconditions.checkNotNull(createResourcesSystemAdmin)
createResourcesSystemAdmin.start()
try {
info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
Expand Down

0 comments on commit f013058

Please sign in to comment.