Skip to content

Commit

Permalink
[state] Initialize bookkeeper table service metadata on initializing …
Browse files Browse the repository at this point in the history
…pulsar cluster metadata (apache#2706)


*Motivation*

This is the first set of changes to better integration with state storage.

*Changes*

- PulsarClusterMetadataSetup should initialize the metadata for bookkeeper table service
- Move the common util functions to `StateUtils`
- Delete state table on deleting functions
  • Loading branch information
sijie authored Nov 3, 2018
1 parent 4686724 commit 4e9971d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import java.util.List;

import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
Expand Down Expand Up @@ -93,6 +96,11 @@ private static class Arguments {
"--configuration-store" }, description = "Configuration Store connection string", required = false)
private String configurationStore;

@Parameter(names = {
"--initial-num-stream-storage-containers"
}, description = "Num storage containers of BookKeeper stream storage")
private int numStreamStorageContainers = 16;

@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
}
Expand Down Expand Up @@ -136,7 +144,7 @@ public static void main(String[] args) throws Exception {
ZooKeeper configStoreZk = zkfactory.create(
arguments.configurationStore, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get();

// Format BookKeeper metadata
// Format BookKeeper ledger storage metadata
ServerConfiguration bkConf = new ServerConfiguration();
bkConf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
bkConf.setZkServers(arguments.zookeeper);
Expand All @@ -146,6 +154,13 @@ public static void main(String[] args) throws Exception {
throw new IOException("Failed to initialize BookKeeper metadata");
}

// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
ServiceURI bkMetadataServiceUri = ServiceURI.create(bkConf.getMetadataServiceUri());
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}

if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
try {
localZk.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
Expand All @@ -59,6 +61,7 @@
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
Expand Down Expand Up @@ -270,31 +273,35 @@ private void setupStateTable() throws Exception {
return;
}

String tableNs = String.format(
"%s_%s",
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace()
).replace('-', '_');
String tableNs = StateUtils.getStateNamespace(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace()
);
String tableName = instanceConfig.getFunctionDetails().getName();

StorageClientSettings settings = StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("function-" + tableNs + "/" + tableName)
.build();

// TODO (sijie): provide a better way to provision the state table for functions
// we defer creation of the state table until a java instance is running here.
try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder()
.withSettings(settings)
.buildAdmin()) {
StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setInitialNumRanges(4)
.setMinNumRanges(4)
.setStorageType(StorageType.TABLE)
.build();
try {
result(storageAdminClient.getStream(tableNs, tableName));
} catch (NamespaceNotFoundException nnfe) {
result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.setDefaultStreamConf(streamConf)
.build()));
result(storageAdminClient.createStream(tableNs, tableName, DEFAULT_STREAM_CONF));
result(storageAdminClient.createStream(tableNs, tableName, streamConf));
} catch (StreamNotFoundException snfe) {
result(storageAdminClient.createStream(tableNs, tableName, DEFAULT_STREAM_CONF));
result(storageAdminClient.createStream(tableNs, tableName, streamConf));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

/**
* Utils for state store.
*/
public final class StateUtils {

private StateUtils() {}

/**
* Convert pulsar tenant and namespace to state storage namespace.
*
* @param tenant pulsar tenant
* @param namespace pulsar namespace
* @return state storage namespace
*/
public static String getStateNamespace(String tenant, String namespace) {
return String.format("%s_%s", tenant, namespace)
.replace("-", "_");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
Expand All @@ -44,7 +47,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.stats.JvmMetrics;

/**
* A service component contains everything to run a worker except rest server.
Expand All @@ -59,7 +61,10 @@ public class WorkerService {
private FunctionRuntimeManager functionRuntimeManager;
private FunctionMetaDataManager functionMetaDataManager;
private ClusterServiceCoordinator clusterServiceCoordinator;
// dlog namespace for storing function jars in bookkeeper
private Namespace dlogNamespace;
// storage client for accessing state storage for functions
private StorageAdminClient stateStoreAdminClient;
private MembershipManager membershipManager;
private SchedulerManager schedulerManager;
private boolean isInitialized = false;
Expand Down Expand Up @@ -117,6 +122,16 @@ public void start(URI dlogUri) throws InterruptedException {
throw new RuntimeException(e);
}

// create the state storage client for accessing function state
if (workerConfig.getStateStorageServiceUrl() != null) {
StorageClientSettings clientSettings = StorageClientSettings.newBuilder()
.serviceUri(workerConfig.getStateStorageServiceUrl())
.build();
this.stateStoreAdminClient = StorageClientBuilder.newBuilder()
.withSettings(clientSettings)
.buildAdmin();
}

// initialize the function metadata manager
try {

Expand Down Expand Up @@ -242,6 +257,14 @@ public void stop() {
if (null != this.functionAdmin) {
this.functionAdmin.close();
}

if (null != this.stateStoreAdminClient) {
this.stateStoreAdminClient.close();
}

if (null != this.dlogNamespace) {
this.dlogNamespace.close();
}

if(this.executor != null) {
this.executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -54,6 +61,9 @@
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.io.IOUtils;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -84,7 +94,10 @@
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.Utils;
Expand Down Expand Up @@ -132,9 +145,9 @@ private boolean isWorkerServiceAvailable() {
}

public Response registerFunction(final String tenant, final String namespace, final String componentName,
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
final String componentType, final String clientRole) {
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
final String componentType, final String clientRole) {

if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
Expand Down Expand Up @@ -360,6 +373,24 @@ public Response deregisterFunction(final String tenant, final String namespace,
.entity(new ErrorData(e.getMessage())).build();
}

// delete state table
if (null != worker().getStateStoreAdminClient()) {
final String tableNs = StateUtils.getStateNamespace(tenant, namespace);
final String tableName = componentName;
try {
FutureUtils.result(worker().getStateStoreAdminClient().deleteStream(tableNs, tableName));
} catch (NamespaceNotFoundException | StreamNotFoundException e) {
// ignored if the state table doesn't exist
} catch (Exception e) {
log.error("{}/{}/{} Failed to delete state table", e);
return Response
.status(Status.INTERNAL_SERVER_ERROR)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage()))
.build();
}
}

// validate parameters
try {
validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
Expand Down Expand Up @@ -818,6 +849,10 @@ public Response getFunctionState(final String tenant, final String namespace,
return getUnavailableResponse();
}

if (null == worker().getStateStoreAdminClient()) {
return getStateStoreUnvailableResponse();
}

// validate parameters
try {
validateGetFunctionStateParams(tenant, namespace, functionName, key);
Expand All @@ -828,10 +863,7 @@ public Response getFunctionState(final String tenant, final String namespace,
.entity(new ErrorData(e.getMessage())).build();
}

String tableNs = String.format(
"%s_%s",
tenant,
namespace).replace('-', '_');
String tableNs = StateUtils.getStateNamespace(tenant, namespace);
String tableName = functionName;

String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
Expand Down Expand Up @@ -1274,6 +1306,14 @@ private Response getUnavailableResponse() {
.build();
}

private Response getStateStoreUnvailableResponse() {
return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(
"State storage client is not done initializing. "
+ "Please try again in a little while."))
.build();
}

public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName),
Utils.getUniquePackageName(Codec.encode(fileName)));
Expand Down

0 comments on commit 4e9971d

Please sign in to comment.