Skip to content

Commit

Permalink
[issue 8337][Worker] Move initialize dlog namespace metadata to bin/p…
Browse files Browse the repository at this point in the history
…ulsar (apache#8781)

Fixes apache#8337 

### Motivation

Currently, the starting function worker service will initialize the distributed log namespace. when initializing the distributed log namespace, the zookeeper will be connected. A better way is to implement a tool to initialize the distributed log namespace, which is similar to `./bin/pulsar initialize-cluster-metadata`.

### Modifications

- Add init distributed log namespace metadata to `./bin/pulsar initialize-cluster-metadata`
- Add flag to control whether the distributed log namespace is initialized in Function worker
- Add flag to `conf/functions_worker.yml`

### Doc

If you want to initialize distributed log metadata by `bin/pulsar`, you need to perform the following steps:
1. Using `./bin/pulsar initialize-cluster-metadata` to initialize cluster
2. Set initializedDlogMetadata to `true` in `functions_worker.yml`

**Note**: All the changes are compatible.
  • Loading branch information
nodece authored Jan 12, 2021
1 parent 7b48dea commit aca4cd7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 16 deletions.
4 changes: 4 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,7 @@ functionsDirectory: ./functions

# Should connector config be validated during during submission
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
# If it is set to true, you must ensure that it has been initialized by "bin/pulsarinitialize-cluster-metadata" command.
initializedDlogMetadata: false
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
Expand Down Expand Up @@ -141,6 +143,18 @@ private static void createZkNode(ZooKeeper zkc, String path,
}
}

private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
throws IOException {
InternalConfigurationData internalConf = new InternalConfigurationData(
configurationStore,
configurationStore,
null,
bkMetadataServiceUri,
null
);
WorkerUtils.initializeDlogNamespace(internalConf);
}

public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
Expand Down Expand Up @@ -195,15 +209,20 @@ public static void main(String[] args) throws Exception {
}
}


String uriStr = bkConf.getMetadataServiceUri();
if (arguments.existingBkMetadataServiceUri != null) {
uriStr = arguments.existingBkMetadataServiceUri;
} else if (arguments.bookieMetadataServiceUri != null) {
uriStr = arguments.bookieMetadataServiceUri;
}
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);

// initial distributed log metadata
initialDlogNamespaceMetadata(arguments.configurationStore, uriStr);

// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
String uriStr = bkConf.getMetadataServiceUri();
if (arguments.existingBkMetadataServiceUri != null) {
uriStr = arguments.existingBkMetadataServiceUri;
} else if (arguments.bookieMetadataServiceUri != null) {
uriStr = arguments.bookieMetadataServiceUri;
}
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,19 @@ public boolean getTlsEnabled() {
return tlsEnabled || workerPortTls != null;
}

@FieldContext(
category = CATEGORY_WORKER,
doc = "Whether to initialize distributed log metadata in runtime"
)
private Boolean initializedDlogMetadata = false;

public Boolean isInitializedDlogMetadata() {
if (this.initializedDlogMetadata == null){
return false;
}
return this.initializedDlogMetadata;
};

/******** security settings for pulsar broker client **********/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,21 @@ private static URI initializeStandaloneWorkerService(PulsarClientCreator clientC
}

// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
URI dlogURI;
try {
return WorkerUtils.initializeDlogNamespace(internalConf);
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing " +
"function packages", internalConf.getZookeeperServers(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}

return dlogURI;
}

@Override
Expand Down Expand Up @@ -363,9 +370,14 @@ public void initInBroker(ServiceConfiguration brokerConfig,
URI dlogURI;
try {
// initializing dlog namespace for function worker
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
if (workerConfig.isInitializedDlogMetadata()){
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for " +
"storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig)
return conf;
}

public static URI newDlogNamespaceURI(InternalConfigurationData internalConf) {
String zookeeperServers = internalConf.getZookeeperServers();
public static URI newDlogNamespaceURI(String zookeeperServers) {
return URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
}

Expand All @@ -176,7 +175,7 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);

URI dlogUri = newDlogNamespaceURI(internalConf);
URI dlogUri = newDlogNamespaceURI(internalConf.getZookeeperServers());
try {
dlMetadata.create(dlogUri);
} catch (ZKException e) {
Expand Down

0 comments on commit aca4cd7

Please sign in to comment.