Skip to content

Commit

Permalink
Make SchemaStorage accessible in Offloader (apache#6567)
Browse files Browse the repository at this point in the history
While offloading ledgers from bookies to 2nd storage, we could offload the ledgers in columnar format. Columnar data could accelerate analytical workloads' execution by skipping unnecessary columns or data blocks (also known as column pruning and filter push down in analytical systems). 

The only blocker in Pulsar side is that offloaders cannot get the schema of the ledgers, this PR makes the schema storage accessible from offloaders.
  • Loading branch information
yjshen authored Mar 20, 2020
1 parent 7fef14b commit 3223477
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;

/**
* Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
Expand Down Expand Up @@ -55,4 +56,21 @@ T create(OffloadPolicies offloadPolicies,
OrderedScheduler scheduler)
throws IOException;

/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param schemaStorage used for schema lookup in offloader
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPolicies offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
Expand Down Expand Up @@ -180,6 +182,7 @@ public class PulsarService implements AutoCloseable {
private String brokerServiceUrl;
private String brokerServiceUrlTls;
private final String brokerVersion;
private SchemaStorage schemaStorage = null;
private SchemaRegistryService schemaRegistryService = null;
private final Optional<WorkerService> functionWorkerService;
private ProtocolHandlers protocolHandlers = null;
Expand Down Expand Up @@ -403,7 +406,10 @@ public void start() throws PulsarServerException {

// needs load management service and before start broker service,
this.startNamespaceService();
schemaRegistryService = SchemaRegistryService.create(this);

schemaStorage = createAndStartSchemaStorage();
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers());

this.defaultOffloader = createManagedLedgerOffloader(
OffloadPolicies.create(this.getConfiguration().getProperties()));
Expand Down Expand Up @@ -817,6 +823,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
schemaStorage,
getOffloaderScheduler(offloadPolicies));
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
Expand All @@ -830,6 +837,20 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
}
}

private SchemaStorage createAndStartSchemaStorage() {
SchemaStorage schemaStorage = null;
try {
final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
Object factoryInstance = storageClass.newInstance();
Method createMethod = storageClass.getMethod("create", PulsarService.class);
schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this);
schemaStorage.start();
} catch (Exception e) {
LOG.warn("Unable to create schema registry storage");
}
return schemaStorage;
}

public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;

@SuppressWarnings("unused")
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface SchemaRegistryService extends SchemaRegistry {
String CreateMethodName = "create";
Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
long NO_SCHEMA_VERSION = -1L;

Expand All @@ -44,26 +44,16 @@ static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checker
return checkers;
}

static SchemaRegistryService create(PulsarService pulsar) {
try {
ServiceConfiguration config = pulsar.getConfiguration();
final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
Object factoryInstance = storageClass.newInstance();
Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class);

SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar);

Map<SchemaType, SchemaCompatibilityCheck> checkers =
getCheckers(config.getSchemaRegistryCompatibilityCheckers());

checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));

schemaStorage.start();

return SchemaRegistryServiceWithSchemaDataValidator.of(
new SchemaRegistryServiceImpl(schemaStorage, checkers));
} catch (Exception e) {
log.warn("Unable to create schema registry storage, defaulting to empty storage", e);
static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers) {
if (schemaStorage != null) {
try {
Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(schemaRegistryCompatibilityCheckers);
checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
return SchemaRegistryServiceWithSchemaDataValidator.of(
new SchemaRegistryServiceImpl(schemaStorage, checkers));
} catch (Exception e) {
log.warn("Unable to create schema registry storage, defaulting to empty storage", e);
}
}
return new DefaultSchemaRegistryService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;

public interface SchemaStorageFactory {
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class MessageImpl<T> implements Message<T> {
private final int redeliveryCount;

// Constructor for out-going message
static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
public static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
msg.msgMetadataBuilder = msgMetadataBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MessageParser {
* Definition of an interface to process a raw Pulsar entry payload.
*/
public interface MessageProcessor {
void process(RawMessage message);
void process(RawMessage message) throws IOException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.api.raw;

import lombok.Getter;

@Getter
public class RawMessageIdImpl implements RawMessageId {

long ledgerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
package org.apache.pulsar.common.protocol.schema;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

/**
* Schema storage.
*/
public interface SchemaStorage {

CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
package org.apache.pulsar.common.protocol.schema;

import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.Objects;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

/**
* Stored schema with version.
*/
public class StoredSchema {
public final byte[] data;
public final SchemaVersion version;

StoredSchema(byte[] data, SchemaVersion version) {
public StoredSchema(byte[] data, SchemaVersion version) {
this.data = data;
this.version = version;
}
Expand All @@ -41,8 +43,8 @@ public boolean equals(Object o) {
return false;
}
StoredSchema that = (StoredSchema) o;
return Arrays.equals(data, that.data) &&
Objects.equals(version, that.version);
return Arrays.equals(data, that.data)
&& Objects.equals(version, that.version);
}

@Override
Expand Down

0 comments on commit 3223477

Please sign in to comment.