Skip to content

Commit

Permalink
[fix][broker]Make LedgerOffloaderFactory can load the old nar. (apach…
Browse files Browse the repository at this point in the history
…e#19913)

After apache#13833, we change the signature of LedgerOffloaderFactory#create. But some users may use the old version LedgerOffloaderFactory in the offloader nar, when pulsar load the offloader nar, it will use the LedgerOffloaderFactory in the offloader nar, the LedgerOffloaderFactory#create has no param offloaderStats.

Modifications: Make LedgerOffloaderFactory can load the old nar.
  • Loading branch information
horizonzy authored Apr 10, 2023
1 parent 21c7c62 commit 02c838c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,37 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
boolean isDriverSupported(String driverName);

/**
* Create a ledger offloader with the provided configuration, user-metadata and scheduler.
* Create a ledger offloader with the provided configuration, user-metadata, scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;


/**
* Create a ledger offloader with the provided configuration, user-metadata, scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler,
LedgerOffloaderStats offloaderStats)
throws IOException;


/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler.
*
Expand All @@ -66,6 +83,26 @@ T create(OffloadPoliciesImpl offloadPolicies,
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler);
}

/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage,
* scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param schemaStorage used for schema lookup in offloader
* @param scheduler scheduler
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import java.util.concurrent.TimeUnit;

class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {
public class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {

static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();
public static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();

private LedgerOffloaderStatsDisable() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;

Expand All @@ -32,6 +33,13 @@ public boolean isDriverSupported(String driverName) {
return FileSystemManagedLedgerOffloader.driverSupported(driverName);
}

@Override
public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata, OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler, LedgerOffloaderStatsDisable.INSTANCE);
}

@Override
public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
Expand All @@ -44,6 +45,12 @@ public boolean isDriverSupported(String driverName) {
return JCloudBlobStoreProvider.driverSupported(driverName);
}

@Override
public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {
return create(offloadPolicies, userMetadata, scheduler, LedgerOffloaderStatsDisable.INSTANCE);
}

@Override
public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
OrderedScheduler scheduler,
Expand Down

0 comments on commit 02c838c

Please sign in to comment.