Skip to content

Commit

Permalink
Add tiered storage support for Pulsar SQL (apache#4045)
Browse files Browse the repository at this point in the history
* Adding offloader support for sql

* cleaning up

* cleaning up imports

* cleaning up configs

* fix imports

* fix behavior when offloader not configured and fix license

* fix unit test
  • Loading branch information
jerrypeng authored and merlimat committed Apr 16, 2019
1 parent cb9dcce commit 76aacd3
Show file tree
Hide file tree
Showing 19 changed files with 418 additions and 97 deletions.
18 changes: 18 additions & 0 deletions conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,21 @@ pulsar.target-num-splits=2
pulsar.max-split-message-queue-size=10000
# max entry queue size
pulsar.max-split-entry-queue-size = 1000


####### TIERED STORAGE OFFLOADER CONFIGS #######

## Driver to use to offload old data to long term storage
#pulsar.managed-ledger-offload-driver = aws-s3

## The directory to locate offloaders
#pulsar.offloaders-directory = /pulsar/offloaders

## Maximum number of thread pool threads for ledger offloading
#pulsar.managed-ledger-offload-max-threads = 2

## Properties and configurations related to specific offloader implementation
#pulsar.offloader-properties = \
# {"s3ManagedLedgerOffloadBucket": "offload-bucket", \
# "s3ManagedLedgerOffloadRegion": "us-west-2", \
# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
@Beta
public interface LedgerOffloader {

// TODO: improve the user metadata in subsequent changes
String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";

/**
* Get offload driver name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;

@Slf4j
public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCursor {
Expand Down Expand Up @@ -62,6 +63,10 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object
callback.closeComplete(ctx);
}

public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() {
return this.ledger.getLedgersInfo().get(this.readPosition.getLedgerId());
}

public long getNumberOfEntries(Range<PositionImpl> range) {
return this.ledger.getNumberOfEntries(range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public class OffloaderUtils {
* @throws IOException when fail to retrieve the pulsar offloader class
*/
static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet());
// need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
// LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
// as is the case for the pulsar presto plugin
NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader());
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);

OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
Expand All @@ -66,8 +69,6 @@ static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String n
CompletableFuture<LedgerOffloaderFactory> loadFuture = new CompletableFuture<>();
Thread loadingThread = new Thread(() -> {
Thread.currentThread().setContextClassLoader(ncl);

log.info("Loading offloader factory {} using class loader {}", factoryClass, ncl);
try {
Object offloader = factoryClass.newInstance();
if (!(offloader instanceof LedgerOffloaderFactory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,6 @@ public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
}

// TODO: improve the user metadata in subsequent changes
static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";


public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
try {
Expand All @@ -711,8 +706,8 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur
return offloaderFactory.create(
conf.getProperties(),
ImmutableMap.of(
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,16 @@ public boolean accept(File pathname) {
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
try {
return new NarClassLoader(unpacked, additionalJars);
return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader() );
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
try {
return new NarClassLoader(unpacked, additionalJars, parent);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
Expand All @@ -155,6 +164,7 @@ public static NarClassLoader getFromArchive(File narPath, Set<String> additional
*
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
Expand All @@ -163,9 +173,9 @@ public static NarClassLoader getFromArchive(File narPath, Set<String> additional
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars)
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
super(new URL[0]);
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

// process the classpath
Expand Down
73 changes: 70 additions & 3 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ The Apache Software License, Version 2.0
- commons-lang3-3.4.jar
* Netty
- netty-3.6.2.Final.jar
- netty-all-4.1.32.Final.jar
- netty-buffer-4.1.31.Final.jar
- netty-codec-4.1.31.Final.jar
- netty-codec-dns-4.1.33.Final.jar
- netty-codec-http-4.1.33.Final.jar
- netty-codec-socks-4.1.33.Final.jar
- netty-common-4.1.31.Final.jar
- netty-handler-4.1.31.Final.jar
- netty-handler-proxy-4.1.33.Final.jar
- netty-reactive-streams-2.0.0.jar
- netty-resolver-4.1.31.Final.jar
- netty-resolver-dns-4.1.33.Final.jar
- netty-tcnative-boringssl-static-2.0.20.Final.jar
- netty-transport-4.1.31.Final.jar
- netty-transport-native-epoll-4.1.31.Final.jar
- netty-transport-native-epoll-4.1.33.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.31.Final.jar
* Joda Time
- joda-time-2.9.9.jar
* Jetty
Expand All @@ -252,8 +269,6 @@ The Apache Software License, Version 2.0
- jetty-server-9.4.11.v20180605.jar
- jetty-servlet-9.4.11.v20180605.jar
- jetty-util-9.4.11.v20180605.jar
* Javassist
- javassist-3.22.0-CR2.jar
* Asynchronous Http Client
- async-http-client-1.6.5.jar
* Apache BVal
Expand Down Expand Up @@ -372,7 +387,6 @@ The Apache Software License, Version 2.0
- rocksdbjni-5.13.3.jar
* SnakeYAML
- snakeyaml-1.17.jar
- snakeyaml-1.23.jar
* Snappy Java
- snappy-java-1.1.1.3.jar
* Bean Validation API
Expand All @@ -392,10 +406,59 @@ The Apache Software License, Version 2.0
- lz4-java-1.5.0.jar
* JCTools
- jctools-core-2.1.2.jar
* Asynchronous Http Client
- async-http-client-2.7.0.jar
- async-http-client-netty-utils-2.7.0.jar
* Apache Bookkeeper
- bookkeeper-common-4.9.0.jar
- bookkeeper-common-allocator-4.9.0.jar
- bookkeeper-proto-4.9.0.jar
- bookkeeper-server-4.9.0.jar
- bookkeeper-stats-api-4.9.0.jar
- bookkeeper-tools-framework-4.9.0.jar
- circe-checksum-4.9.0.jar
- codahale-metrics-provider-4.9.0.jar
- cpu-affinity-4.9.0.jar
- http-server-4.9.0.jar
- prometheus-metrics-provider-4.9.0.jar
* Apache Commons
- commons-cli-1.2.jar
- commons-codec-1.10.jar
- commons-collections4-4.1.jar
- commons-configuration-1.10.jar
- commons-io-2.5.jar
- commons-lang-2.6.jar
- commons-logging-1.1.1.jar
* GSON
- gson-2.8.2.jar
* Jackson
- jackson-jaxrs-base-2.8.11.jar
- jackson-jaxrs-json-provider-2.8.11.jar
- jackson-module-jaxb-annotations-2.8.11.jar
- jackson-module-jsonSchema-2.8.11.jar
* Java Assist
- javassist-3.21.0-GA.jar
* Jetty
- jetty-http-9.4.12.v20180830.jar
- jetty-io-9.4.12.v20180830.jar
- jetty-security-9.4.12.v20180830.jar
- jetty-server-9.4.12.v20180830.jar
- jetty-servlet-9.4.12.v20180830.jar
- jetty-util-9.4.12.v20180830.jar
* Java Native Access
- jna-4.2.0.jar
* Yahoo Datasketches
- memory-0.8.3.jar
- sketches-core-0.8.3.jar
* Apache Zookeeper
- zookeeper-3.4.13.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.5.0.jar

Protocol Buffers License
* Protocol Buffers
- protobuf-shaded-2.1.0-incubating.jar
- protobuf-java-3.5.1.jar

BSD 3-clause "New" or "Revised" License
* RE2J TD -- re2j-td-1.4.jar
Expand Down Expand Up @@ -476,6 +539,8 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
- jgrapht-core-0.9.0.jar
* Logback Core Module
- logback-core-1.2.3.jar
* MIME Streaming Extension
- mimepull-1.9.6.jar

Public Domain (CC0) -- licenses/LICENSE-CC0.txt
* HdrHistogram
Expand All @@ -484,6 +549,8 @@ Public Domain (CC0) -- licenses/LICENSE-CC0.txt
- aopalliance-1.0.jar
* XZ For Java
- xz-1.5.jar
* Reactive Streams
- reactive-streams-1.0.2.jar

Bouncy Castle License
* Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
Expand Down
104 changes: 79 additions & 25 deletions pulsar-sql/presto-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@

<properties>
<dep.airlift.version>0.170</dep.airlift.version>
<dep.slice.version>0.35</dep.slice.version>
<dep.guice.version>4.2.0</dep.guice.version>
<dep.javax-validation.version>1.1.0.Final</dep.javax-validation.version>
<dep.javax-inject.version>1</dep.javax-inject.version>
<dep.guava.version>24.1-jre</dep.guava.version>
<jctools.version>2.1.2</jctools.version>
<dslJson.verson>1.8.4</dslJson.verson>
</properties>
Expand All @@ -56,24 +51,6 @@
<version>${dep.airlift.version}</version>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>${dep.guice.version}</version>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>${dep.javax-validation.version}</version>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>${dep.javax-inject.version}</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand All @@ -82,13 +59,13 @@

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<artifactId>pulsar-client-admin-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger</artifactId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -127,4 +104,81 @@

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>

<artifactSet>
<includes>
<include>org.apache.pulsar:pulsar-client-original</include>
<include>org.apache.pulsar:pulsar-client-admin-original</include>
<include>org.apache.pulsar:managed-ledger-original</include>

<include>org.glassfish.jersey*:*</include>
<include>javax.ws.rs:*</include>
<include>javax.annotation:*</include>
<include>org.glassfish.hk2*:*</include>

<include>org.apache.httpcomponents:*</include>
<include>org.eclipse.jetty:*</include>

</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.pulsar:pulsar-client-original</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.glassfish</pattern>
<shadedPattern>org.apache.pulsar.shade.org.glassfish</shadedPattern>
</relocation>
<relocation>
<pattern>javax.ws</pattern>
<shadedPattern>org.apache.pulsar.shade.javax.ws</shadedPattern>
</relocation>
<relocation>
<pattern>javax.annotation</pattern>
<shadedPattern>org.apache.pulsar.shade.javax.annotation</shadedPattern>
</relocation>
<relocation>
<pattern>jersey</pattern>
<shadedPattern>org.apache.pulsar.shade.jersey</shadedPattern>
</relocation>
<relocation>
<pattern>org.eclipse.jetty</pattern>
<shadedPattern>org.apache.pulsar.shade.org.eclipse.jetty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
</relocation>

</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading

0 comments on commit 76aacd3

Please sign in to comment.