Skip to content

Commit

Permalink
[Pulsar SQL] Make Pulsar SQL get correct offload configurations (apac…
Browse files Browse the repository at this point in the history
…he#7701)

### Motivation

Currently, Pulsar SQL can't get the correct offload configurations.

### Modifications

Make Pulsar SQL get the complete offload configurations.

### Verifying this change

Add a new integration test.
  • Loading branch information
gaoran10 authored Aug 5, 2020
1 parent 5546c5f commit 9f687d3
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 6 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci-integration-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

# Flaky Test: https://github.com/apache/pulsar/issues/7750
# - name: build pulsar image
# if: steps.docs.outputs.changed_only == 'no'
# run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
#
# - name: build pulsar-all image
# if: steps.docs.outputs.changed_only == 'no'
# run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
#
- name: build artifacts and docker pulsar latest test image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
Expand Down
2 changes: 2 additions & 0 deletions conf/presto/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ query.client.timeout=5m
query.min-expire-age=30m

presto.version=testversion

distributed-joins-enabled=true

node-scheduler.include-coordinator=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.common.util.FieldParser.value;

import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
Expand All @@ -32,7 +33,9 @@
* Definition of the offload policies.
*/
@Data
public class OffloadPolicies {
public class OffloadPolicies implements Serializable {

private final static long serialVersionUID = 0L;

public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -81,9 +80,8 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws

this.statsProvider.start(clientConfiguration);

OffloadPolicies offloadPolicies = new OffloadPolicies();
BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
this.defaultOffloader = initManagedLedgerOffloader(
pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
}

public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.stats.NullStatsProvider;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.Commands;

/**
Expand Down Expand Up @@ -399,6 +401,16 @@ public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
return this.pulsarAdmin;
}

public OffloadPolicies getOffloadPolices() {
Properties offloadProperties = new Properties();
offloadProperties.putAll(getOffloaderProperties());
OffloadPolicies offloadPolicies = OffloadPolicies.create(offloadProperties);
offloadPolicies.setManagedLedgerOffloadDriver(getManagedLedgerOffloadDriver());
offloadPolicies.setManagedLedgerOffloadMaxThreads(getManagedLedgerOffloadMaxThreads());
offloadPolicies.setOffloadersDirectory(getOffloadersDirectory());
return offloadPolicies;
}

@Override
public void close() throws Exception {
this.pulsarAdmin.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.sql.presto;

import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -68,4 +69,35 @@ public void testDefaultManagedLedgerConfig() {
Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads());
}

@Test
public void testGetOffloadPolices() throws Exception {
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();

final String managedLedgerOffloadDriver = "s3";
final String offloaderDirectory = "/pulsar/offloaders";
final int managedLedgerOffloadMaxThreads = 5;
final String bucket = "offload-bucket";
final String region = "us-west-2";
final String endpoint = "http://s3.amazonaws.com";
final String offloadProperties = "{"
+ "\"s3ManagedLedgerOffloadBucket\":\"" + bucket + "\","
+ "\"s3ManagedLedgerOffloadRegion\":\"" + region + "\","
+ "\"s3ManagedLedgerOffloadServiceEndpoint\":\"" + endpoint + "\""
+ "}";

connectorConfig.setManagedLedgerOffloadDriver(managedLedgerOffloadDriver);
connectorConfig.setOffloadersDirectory(offloaderDirectory);
connectorConfig.setManagedLedgerOffloadMaxThreads(managedLedgerOffloadMaxThreads);
connectorConfig.setOffloaderProperties(offloadProperties);

OffloadPolicies offloadPolicies = connectorConfig.getOffloadPolices();
Assert.assertNotNull(offloadPolicies);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), managedLedgerOffloadDriver);
Assert.assertEquals(offloadPolicies.getOffloadersDirectory(), offloaderDirectory);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadMaxThreads(), managedLedgerOffloadMaxThreads);
Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadBucket(), bucket);
Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRegion(), region);
Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint(), endpoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TestBasicPresto extends PulsarTestSuite {

@BeforeClass
public void setupPresto() throws Exception {
log.info("[setupPresto]");
pulsarCluster.startPrestoWorker();
}

Expand Down
Loading

0 comments on commit 9f687d3

Please sign in to comment.