diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index ac41df942853f..679b49a740b8f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -192,13 +192,16 @@ private Optional programmedFailure(OperationType op, Str if (ex != null) { return Optional.of(ex); } - Optional failure = failures.stream() - .filter(f -> f.predicate.test(op, path)) - .findFirst(); - if (failure.isPresent()) { - failures.remove(failure.get()); + while (true) { + Optional failure = failures.stream().filter(f -> f.predicate.test(op, path)).findFirst(); + if (failure.isPresent()) { + if (failures.remove(failure.get())) { + return failure.map(Failure::getException); + } + // failure is taken by other threads. Retry. + } else { + return Optional.empty(); + } } - - return failure.map(Failure::getException); } }