Skip to content

Commit

Permalink
Avoid contended synchronized block on topic load (apache#15883)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 2, 2022
1 parent a6a7516 commit 7d2fdea
Showing 1 changed file with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1278,33 +1278,38 @@ public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, Of
});
}

public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
throws PulsarServerException {
try {
if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());

Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());

LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
),
schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats);
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
synchronized (this) {
Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());

LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarVersion.getGitSha(),
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
config.getClusterName()
),
schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats);
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
}
} else {
LOG.info("No ledger offloader configured, using NULL instance");
LOG.debug("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
Expand Down

0 comments on commit 7d2fdea

Please sign in to comment.