Skip to content

Commit

Permalink
[tiered storage] Provide LedgerOffloaderFactory for creating offloade…
Browse files Browse the repository at this point in the history
…rs (apache#2392)

* [tiered storage] Provide LedgerOffloaderFactory for creating offloaders

 ### Motivation

In order to use NAR for packaging offloaders, we need a factory interface for creating offloaders.

 ### Changes

- Provide a ledger offloader factory interface for creating offloaders.
- Move implemention specific settings to implementation package to be respecting to offloader factory interface

* remove unneeded change
  • Loading branch information
sijie authored Aug 17, 2018
1 parent e1238ac commit 83d33d4
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;

/**
* Interface for offloading ledgers to longterm storage
* Interface for offloading ledgers to long-term storage
*/
@Beta
public interface LedgerOffloader {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;

/**
* Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
*/
@LimitedPrivate
@Evolving
public interface LedgerOffloaderFactory<T extends LedgerOffloader> {

/**
* Check whether the provided driver <tt>driverName</tt> is supported.
*
* @param driverName offloader driver name
* @return true if the driver is supported, otherwise false.
*/
boolean isDriverSupported(String driverName);

/**
* Create a ledger offloader with the provided configuration, user-metadata and scheduler.
*
* @param properties service configuration
* @param userMetadata user metadata
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(Properties properties,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -480,48 +480,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean exposePublisherStats = true;

/**** --- Ledger Offloading --- ****/
/****
* NOTES: all implementation related settings should be put in implementation package.
* only common settings like driver name, io threads can be added here.
****/
// Driver to use to offload old data to long term storage
private String managedLedgerOffloadDriver = null;

// Maximum number of thread pool threads for ledger offloading
private int managedLedgerOffloadMaxThreads = 2;

// For Amazon S3 ledger offload, AWS region
private String s3ManagedLedgerOffloadRegion = null;

// For Amazon S3 ledger offload, Bucket to place offloaded ledger into
private String s3ManagedLedgerOffloadBucket = null;

// For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
private String s3ManagedLedgerOffloadServiceEndpoint = null;

// For Amazon S3 ledger offload, Max block size in bytes.
@FieldContext(minValue = 5242880) // 5MB
private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB

// For Amazon S3 ledger offload, Read buffer size in bytes.
@FieldContext(minValue = 1024)
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage ledger offload, region where offload bucket is located.
// reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
private String gcsManagedLedgerOffloadRegion = null;

// For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
private String gcsManagedLedgerOffloadBucket = null;

// For Google Cloud Storage ledger offload, Max block size in bytes.
@FieldContext(minValue = 5242880) // 5MB
private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB

// For Google Cloud Storage ledger offload, Read buffer size in bytes.
@FieldContext(minValue = 1024)
private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage, path to json file containing service account credentials.
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -1721,86 +1689,6 @@ public int getManagedLedgerOffloadMaxThreads() {
return this.managedLedgerOffloadMaxThreads;
}

public void setS3ManagedLedgerOffloadRegion(String region) {
this.s3ManagedLedgerOffloadRegion = region;
}

public String getS3ManagedLedgerOffloadRegion() {
return this.s3ManagedLedgerOffloadRegion;
}

public void setS3ManagedLedgerOffloadBucket(String bucket) {
this.s3ManagedLedgerOffloadBucket = bucket;
}

public String getS3ManagedLedgerOffloadBucket() {
return this.s3ManagedLedgerOffloadBucket;
}

public void setS3ManagedLedgerOffloadServiceEndpoint(String endpoint) {
this.s3ManagedLedgerOffloadServiceEndpoint = endpoint;
}

public String getS3ManagedLedgerOffloadServiceEndpoint() {
return this.s3ManagedLedgerOffloadServiceEndpoint;
}

public void setS3ManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
this.s3ManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
}

public int getS3ManagedLedgerOffloadMaxBlockSizeInBytes() {
return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
}

public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
}

public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
}

public void setGcsManagedLedgerOffloadRegion(String region) {
this.gcsManagedLedgerOffloadRegion = region;
}

public String getGcsManagedLedgerOffloadRegion() {
return this.gcsManagedLedgerOffloadRegion;
}

public void setGcsManagedLedgerOffloadBucket(String bucket) {
this.gcsManagedLedgerOffloadBucket = bucket;
}

public String getGcsManagedLedgerOffloadBucket() {
return this.gcsManagedLedgerOffloadBucket;
}

public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
}

public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
}

public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
this.gcsManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
}

public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
}

public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String keyPath) {
this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
}

public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
}

public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) {
this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.broker.admin.AdminResource;
Expand All @@ -62,8 +64,6 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
Expand Down Expand Up @@ -664,11 +664,14 @@ public LedgerOffloader getManagedLedgerOffloader() {
public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
try {
// TODO: will make this configurable when switching to use NAR loader to load offloaders
LedgerOffloaderFactory offloaderFactory = JCloudLedgerOffloaderFactory.of();

if (conf.getManagedLedgerOffloadDriver() != null
&& BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) {
&& offloaderFactory.isDriverSupported(conf.getManagedLedgerOffloadDriver())) {
try {
return BlobStoreManagedLedgerOffloader.create(
getTieredStorageConf(conf),
return offloaderFactory.create(
conf.getProperties(),
ImmutableMap.of(
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha()
Expand All @@ -685,26 +688,6 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur
}
}

private static TieredStorageConfigurationData getTieredStorageConf(ServiceConfiguration serverConf) {
TieredStorageConfigurationData tsConf = new TieredStorageConfigurationData();
// generic settings
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
// s3 settings
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
// gcs settings
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
return tsConf;
}

public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;

/**
* A jcloud based offloader factory.
*/
public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {

public static JCloudLedgerOffloaderFactory of() {
return INSTANCE;
}

private static final JCloudLedgerOffloaderFactory INSTANCE = new JCloudLedgerOffloaderFactory();

@Override
public boolean isDriverSupported(String driverName) {
return BlobStoreManagedLedgerOffloader.driverSupported(driverName);
}

@Override
public BlobStoreManagedLedgerOffloader create(Properties properties,
Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {
TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties);
return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud;

import static org.apache.pulsar.common.util.FieldParser.value;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Properties;
import lombok.Data;

/**
* Configuration for tiered storage.
*/
@Data
public class TieredStorageConfigurationData implements Serializable, Cloneable{
public class TieredStorageConfigurationData implements Serializable, Cloneable {

/**** --- Ledger Offloading --- ****/
// Driver to use to offload old data to long term storage
Expand Down Expand Up @@ -66,4 +71,27 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable{
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

/**
* Create a tiered storage configuration from the provided <tt>properties</tt>.
*
* @param properties the configuration properties
* @return tiered storage configuration
*/
public static TieredStorageConfigurationData create(Properties properties) {
TieredStorageConfigurationData data = new TieredStorageConfigurationData();
Field[] fields = TieredStorageConfigurationData.class.getDeclaredFields();
Arrays.stream(fields).forEach(f -> {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
f.set(data, value((String) properties.get(f.getName()), f));
} catch (Exception e) {
throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s",
f.getName(), properties.get(f.getName())), e);
}
}
});
return data;
}

}

0 comments on commit 83d33d4

Please sign in to comment.