Skip to content

Commit

Permalink
[Managed-ledger]optimized constant naming convention (apache#14004)
Browse files Browse the repository at this point in the history
  • Loading branch information
liudezhi2098 authored Jan 29, 2022
1 parent da9e806 commit 00e7cc6
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class ManagedLedgerConfig {
private boolean unackedRangesOpenCacheSetEnabled = true;
private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.instance_;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2276,7 +2276,7 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
}

private void trimConsumedLedgersInBackground() {
trimConsumedLedgersInBackground(Futures.nullPromise_);
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}

@Override
Expand All @@ -2295,7 +2295,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.instance_
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
Expand All @@ -2319,7 +2319,7 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
});

if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.instance_
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
!= null) {
Expand Down Expand Up @@ -2405,7 +2405,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.instance_
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
? config.getLedgerOffloader().getOffloadPolicies()
: null);
synchronized (this) {
Expand Down Expand Up @@ -2941,7 +2941,7 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
.thenCompose((ignore) -> {
return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toHours(1)).limit(10),
failOnConflict_,
FAIL_ON_CONFLICT,
() -> completeLedgerInfoForOffloaded(ledgerId, uuid),
scheduledExecutor, name)
.whenComplete((ignore2, exception) -> {
Expand Down Expand Up @@ -2991,7 +2991,7 @@ interface LedgerInfoTransformation {
LedgerInfo transform(LedgerInfo oldInfo) throws ManagedLedgerException;
}

static Predicate<Throwable> failOnConflict_ = (throwable) -> {
static final Predicate<Throwable> FAIL_ON_CONFLICT = (throwable) -> {
return !(throwable instanceof OffloadConflict) && Retries.NonFatalPredicate.test(throwable);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Null implementation that throws an error on any invokation.
*/
public class NullLedgerOffloader implements LedgerOffloader {
public static NullLedgerOffloader instance_ = new NullLedgerOffloader();
public static final NullLedgerOffloader INSTANCE = new NullLedgerOffloader();

@Override
public String getOffloadDriverName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class Futures {

public static CompletableFuture<Void> nullPromise_ = CompletableFuture.completedFuture(null);
public static final CompletableFuture<Void> NULL_PROMISE = CompletableFuture.completedFuture(null);

/**
* Adapts a {@link CloseCallback} to a {@link CompletableFuture}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3159,7 +3159,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio
entries.forEach(Entry::release);
// Now we update the cursors that are still subscribing to ledgers that has been consumed completely
managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
managedLedger.internalTrimConsumedLedgers(Futures.nullPromise_);
managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
ManagedLedgerImpl finalManagedLedger = managedLedger;
Awaitility.await().untilAsserted(() -> {
// We only have one empty ledger at last [{entries=0}]
Expand Down Expand Up @@ -3255,7 +3255,7 @@ public void testInvalidateReadHandleWhenDeleteLedger() throws Exception {
assertEquals(ledger.ledgerCache.size(), 2);
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 1);
assertEquals(ledger.ledgerCache.size(), 0);
Expand Down Expand Up @@ -3287,7 +3287,7 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
assertEquals(ledger.ledgerCache.size(), 2);
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 0);
Expand All @@ -3299,7 +3299,7 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
cursor3.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 0);
Expand Down Expand Up @@ -3338,7 +3338,7 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc
ledgerOffloader = mock(NullLedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);

ledger.internalTrimConsumedLedgers(Futures.nullPromise_);
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
}
} else {
LOG.info("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.instance_;
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new PulsarServerException(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,7 @@ private void checkConsumedLedgers() {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.trimConsumedLedgersInBackground(Futures.nullPromise_);
managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPo
}
} else {
log.info("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.instance_;
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new RuntimeException(t);
Expand Down

0 comments on commit 00e7cc6

Please sign in to comment.