Skip to content

Commit

Permalink
Namespace level offloader (apache#6183)
Browse files Browse the repository at this point in the history
### Motivation

Currently, the offload operation only have the cluster level configuration, can't set the offload configuration at the namespace level, it's inflexible. 

### Modifications

Add the namespace offload policies.
  • Loading branch information
gaoran10 authored Feb 10, 2020
1 parent bec768f commit fd03be5
Show file tree
Hide file tree
Showing 35 changed files with 969 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
* Interface for offloading ledgers to long-term storage
Expand Down Expand Up @@ -112,5 +113,17 @@ CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
*/
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);

/**
* Get offload policies of this LedgerOffloader
*
* @return offload policies
*/
OffloadPolicies getOffloadPolicies();

/**
* Close the resources if necessary
*/
void close();
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
* Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
Expand All @@ -43,13 +44,13 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
/**
* Create a ledger offloader with the provided configuration, user-metadata and scheduler.
*
* @param properties service configuration
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(Properties properties,
T create(OffloadPolicies offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
* Null implementation that throws an error on any invokation.
Expand Down Expand Up @@ -60,4 +61,14 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}

@Override
public OffloadPolicies getOffloadPolicies() {
return null;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -145,6 +146,16 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
offloads.remove(uuid);
return CompletableFuture.completedFuture(null);
};

@Override
public OffloadPolicies getOffloadPolicies() {
return null;
}

@Override
public void close() {

}
}

static class MockOffloadReadHandle implements ReadHandle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;

import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1025,6 +1026,16 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
}
return promise;
};

@Override
public OffloadPolicies getOffloadPolicies() {
return null;
}

@Override
public void close() {

}
}

static class ErroringMockLedgerOffloader extends MockLedgerOffloader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -98,6 +100,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand All @@ -109,7 +112,6 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
Expand Down Expand Up @@ -162,7 +164,8 @@ public class PulsarService implements AutoCloseable {
private ScheduledExecutorService compactorExecutor;
private OrderedScheduler offloaderScheduler;
private Offloaders offloaderManager = new Offloaders();
private LedgerOffloader offloader;
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
Expand Down Expand Up @@ -398,7 +401,8 @@ public void start() throws PulsarServerException {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));

this.offloader = createManagedLedgerOffloader(this.getConfiguration());
this.defaultOffloader = createManagedLedgerOffloader(
OffloadPolicies.create(this.getConfiguration().getProperties()));

brokerService.start();

Expand Down Expand Up @@ -764,28 +768,54 @@ public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
return managedLedgerClientFactory;
}

public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
/**
* First, get <code>LedgerOffloader</code> from local map cache, create new <code>LedgerOffloader</code> if not in cache or
* the <code>OffloadPolicies</code> changed, return the <code>LedgerOffloader</code> directly if exist in cache
* and the <code>OffloadPolicies</code> not changed.
*
* @param namespaceName NamespaceName
* @param offloadPolicies the OffloadPolicies
* @return LedgerOffloader
*/
public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPolicies offloadPolicies) {
if (offloadPolicies == null) {
return getDefaultOffloader();
}
return ledgerOffloaderMap.compute(namespaceName, (ns, offloader) -> {
try {
if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) {
return offloader;
} else {
if (offloader != null) {
offloader.close();
}
return createManagedLedgerOffloader(offloadPolicies);
}
} catch (PulsarServerException e) {
LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e);
return new NullLedgerOffloader();
}
});
}

public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies offloadPolicies)
throws PulsarServerException {
try {
if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) {
checkNotNull(conf.getOffloadersDirectory(),
if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
conf.getManagedLedgerOffloadDriver());
this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory());
offloadPolicies.getManagedLedgerOffloadDriver());
this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
conf.getManagedLedgerOffloadDriver());
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
conf.getProperties(),
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
getOffloaderScheduler(offloadPolicies));
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
Expand Down Expand Up @@ -862,10 +892,10 @@ public synchronized Compactor getCompactor() throws PulsarServerException {
return this.compactor;
}

protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) {
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
.name("offloader").build();
}
return this.offloaderScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -2276,5 +2277,74 @@ private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
}
}

protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
validateOffloadPolicies(offloadPolicies);

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.offload_policies = offloadPolicies;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
} else {
String errorMsg = String.format(
"[%s] Failed to update offload configuration for namespace %s",
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(policies.offload_policies));
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName,
e);
asyncResponse.resume(new RestException(e));
}
}

private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (offloadPolicies == null) {
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"The offloadPolicies must be specified for namespace offload.");
}
if (!offloadPolicies.driverSupported()) {
log.warn("[{}] Failed to update offload configuration for namespace {}: " +
"driver is not supported, support value: {}",
clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames());
throw new RestException(Status.PRECONDITION_FAILED,
"The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames());
}
if (!offloadPolicies.bucketValid()) {
log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"The bucket must be specified for namespace offload.");
}
}

protected OffloadPolicies internalGetOffloadPolicies() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
return policies.offload_policies;
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -1142,5 +1143,37 @@ public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
internalSetSchemaValidationEnforced(schemaValidationEnforced);
}

@POST
@Path("/{tenant}/{namespace}/offloadPolicies")
@ApiOperation(value = " Set offload configuration on a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid") })
public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
OffloadPolicies offload, @Suspended final AsyncResponse asyncResponse) {
try {
validateNamespaceName(tenant, namespace);
internalSetOffloadPolicies(asyncResponse, offload);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/offloadPolicies")
@ApiOperation(value = "Get offload configuration on a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public OffloadPolicies getOffloadPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetOffloadPolicies();
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -982,7 +983,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());

managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
policies.ifPresent(p -> {
long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
if (p.offload_deletion_lag_ms != null) {
Expand Down
Loading

0 comments on commit fd03be5

Please sign in to comment.