Skip to content

Commit

Permalink
[Pulsar SQL] Fix OffloadPolicies json serialization error in Pulsar S…
Browse files Browse the repository at this point in the history
…QL (apache#9300)

### Motivation

1. The class `OffloadPolicies` couldn't be serialized to JSON bytes array in Pulsar SQL.
2. The PulsarSplitManager can't split data in the tiered storage.

serialization error log
```
2021-01-23T03:20:15.403Z	ERROR	remote-task-callback-9	io.airlift.concurrent.BoundedExecutor	Task failed
java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
	at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->io.prestosql.execution.ScheduledSplit["split"]->io.prestosql.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1277)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:640)
	at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
	at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1509)
	at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1215)
	at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1109)
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
	... 5 more
```

### Modifications

1. Add `JsonProperty` annotation for `OffloadPolicies` config fields.
2. Set the right LedgerOffloader for the split manager to read data from tiered storage.
3. Add a new `apply-config-from-env-with-prefix.py` to set new configs with a specific prefix to config files.

### Verifying this change

This change can be verified as follows:

  - *org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage*
  • Loading branch information
gaoran10 authored Jan 28, 2021
1 parent 7c6f5e2 commit 7dcc2ae
Show file tree
Hide file tree
Showing 17 changed files with 782 additions and 444 deletions.
1 change: 1 addition & 0 deletions docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ADD ${PULSAR_TARBALL} /
RUN mv /apache-pulsar-* /pulsar

COPY scripts/apply-config-from-env.py /pulsar/bin
COPY scripts/apply-config-from-env-with-prefix.py /pulsar/bin
COPY scripts/gen-yml-from-env.py /pulsar/bin
COPY scripts/generate-zookeeper-config.sh /pulsar/bin
COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
Expand Down
100 changes: 100 additions & 0 deletions docker/pulsar/scripts/apply-config-from-env-with-prefix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

##
## Edit a properties config file and replace values based on
## the ENV variables
## export prefix_my-key=new-value
## ./apply-config-from-env-with-prefix prefix_ file.conf
##

import os, sys

if len(sys.argv) < 3:
print('Usage: %s' % (sys.argv[0]))
sys.exit(1)

# Always apply env config to env scripts as well
prefix = sys.argv[1]
conf_files = sys.argv[2:]


for conf_filename in conf_files:
lines = [] # List of config file lines
keys = {} # Map a key to its line number in the file

# Load conf file
for line in open(conf_filename):
lines.append(line)
line = line.strip()
if not line or line.startswith('#'):
continue

try:
k,v = line.split('=', 1)
keys[k] = len(lines) - 1
except:
print("[%s] skip Processing %s" % (conf_filename, line))

# Update values from Env
for k in sorted(os.environ.keys()):
v = os.environ[k].strip()

# Hide the value in logs if is password.
if "password" in k:
displayValue = "********"
else:
displayValue = v

if k.startswith(prefix):
k = k[len(prefix):]
if k in keys:
print('[%s] Applying config %s = %s' % (conf_filename, k, displayValue))
idx = keys[k]
lines[idx] = '%s=%s\n' % (k, v)


# Add new keys from Env
for k in sorted(os.environ.keys()):
v = os.environ[k]
if not k.startswith(prefix):
continue

# Hide the value in logs if is password.
if "password" in k:
displayValue = "********"
else:
displayValue = v

k = k[len(prefix):]
if k not in keys:
print('[%s] Adding config %s = %s' % (conf_filename, k, displayValue))
lines.append('%s=%s\n' % (k, v))
else:
print('[%s] Updating config %s = %s' % (conf_filename, k, displayValue))
lines[keys[k]] = '%s=%s\n' % (k, v)


# Store back the updated config in the same file
f = open(conf_filename, 'w')
for line in lines:
f.write(line)
f.close()

Original file line number Diff line number Diff line change
Expand Up @@ -3435,6 +3435,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
&& !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
} else {
log.error("Unknown exception for ManagedLedgerException.", t);
return new ManagedLedgerException("Unknown exception");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.common.policies.data;

import static org.apache.pulsar.common.util.FieldParser.value;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
Expand Down Expand Up @@ -126,67 +128,93 @@ public String getValue() {

// common config
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String managedLedgerOffloadDriver = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;

// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadRegion = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadBucket = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadServiceEndpoint = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer s3ManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer s3ManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
// s3 config, set by service configuration
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadRole = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload";

// gcs config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String gcsManagedLedgerOffloadRegion = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String gcsManagedLedgerOffloadBucket = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
// gcs config, set by service configuration
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

// file system config, set by service configuration
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String fileSystemProfilePath = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String fileSystemURI = null;

// --------- new offload configurations ---------
// they are universal configurations and could be used to `aws-s3`, `google-cloud-storage` or `azureblob`.
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String managedLedgerOffloadBucket;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String managedLedgerOffloadRegion;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String managedLedgerOffloadServiceEndpoint;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadMaxBlockSizeInBytes;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadReadBufferSizeInBytes;

public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
Expand Down Expand Up @@ -572,7 +600,7 @@ private static Object getCompatibleValue(Properties properties, Field field) {
if (field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
object = properties.getProperty("managedLedgerOffloadThresholdInBytes",
properties.getProperty(OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE));
} else if (field.getName().equals("")) {
} else if (field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) {
object = properties.getProperty("managedLedgerOffloadDeletionLagInMillis",
properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
try {
OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces()
.getOffloadPolicies(topicName.getNamespace());
if (offloadPolicies != null) {
offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory());
offloadPolicies.setManagedLedgerOffloadMaxThreads(
pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads());
}
if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
splits = getSplitsNonPartitionedTopic(
numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
Expand Down Expand Up @@ -160,8 +165,10 @@ Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topic

int splitRemainder = actualNumSplits % predicatedPartitions.size();

ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();
PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory();
ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig(
topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig);

List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < predicatedPartitions.size(); i++) {
Expand All @@ -170,6 +177,7 @@ Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topic
getSplitsForTopic(
topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
managedLedgerFactory,
managedLedgerConfig,
splitsForThisPartition,
tableHandle,
schemaInfo,
Expand Down Expand Up @@ -231,12 +239,15 @@ private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<C
Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain,
OffloadPolicies offloadPolicies) throws Exception {
ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();
PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory();
ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig(
topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig);

return getSplitsForTopic(
topicName.getPersistenceNamingEncoding(),
managedLedgerFactory,
managedLedgerConfig,
numSplits,
tableHandle,
schemaInfo,
Expand All @@ -248,6 +259,7 @@ Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName to
@VisibleForTesting
Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
int numSplits,
PulsarTableHandle tableHandle,
SchemaInfo schemaInfo, String tableName,
Expand All @@ -259,7 +271,7 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
PositionImpl.earliest, new ManagedLedgerConfig());
PositionImpl.earliest, managedLedgerConfig);

long numEntries = readOnlyCursor.getNumberOfEntries();
if (numEntries <= 0) {
Expand All @@ -270,6 +282,7 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
this.connectorId,
tupleDomain,
managedLedgerFactory,
managedLedgerConfig,
topicNamePersistenceEncoding,
numEntries);

Expand Down Expand Up @@ -341,6 +354,7 @@ private PredicatePushdownInfo(PositionImpl startPosition, PositionImpl endPositi
public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
TupleDomain<ColumnHandle> tupleDomain,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
String topicNamePersistenceEncoding,
long totalNumEntries) throws
ManagedLedgerException, InterruptedException {
Expand All @@ -349,7 +363,7 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
PositionImpl.earliest, new ManagedLedgerConfig());
PositionImpl.earliest, managedLedgerConfig);

if (tupleDomain.getDomains().isPresent()) {
Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
Expand Down
Loading

0 comments on commit 7dcc2ae

Please sign in to comment.