Skip to content

Commit

Permalink
Enable FQFN specification for the Pulsar Functions CLI (apache#1504)
Browse files Browse the repository at this point in the history
* Add FQFN flag plus corresponding test

* add fqfn field to function config protobuf definition

* add section in PF overview on FQFNs

* add missing --jar flag to test

* remove unnecessary imports

* add missing license header

* add final missing import

* remove incomplete line

* make FQFN name check more stringent
  • Loading branch information
lucperkins authored and merlimat committed Apr 6, 2018
1 parent ec210cb commit 9d23892
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,31 @@ public void testCreateWithoutNamespace() throws Exception {
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";
String tenant = "sample";
String namespace = "ns1";
String functionName = "func";
String fqfn = String.format("%s/%s/%s", tenant, namespace, functionName);

cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
"--output", outputTopicName,
"--fqfn", fqfn,
"--jar", "SomeJar.jar",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
assertEquals(tenant, creater.getFunctionConfig().getTenant());
assertEquals(namespace, creater.getFunctionConfig().getNamespace());
assertEquals(functionName, creater.getFunctionConfig().getName());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateWithoutFunctionName() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ void processArguments() throws Exception {}
*/
@Getter
abstract class NamespaceCommand extends BaseCommand {
@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function", required = false)
protected String fqfn;

@Parameter(names = "--tenant", description = "The function's tenant", required = true)
protected String tenant;

Expand All @@ -124,6 +127,8 @@ abstract class FunctionCommand extends NamespaceCommand {
*/
@Getter
abstract class FunctionConfigCommand extends BaseCommand {
@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function")
protected String fqfn;
@Parameter(names = "--tenant", description = "The function's tenant")
protected String tenant;
@Parameter(names = "--namespace", description = "The function's namespace")
Expand Down Expand Up @@ -168,13 +173,29 @@ abstract class FunctionConfigCommand extends BaseCommand {

@Override
void processArguments() throws Exception {

FunctionConfig.Builder functionConfigBuilder;

// Initialize config builder either from a supplied YAML config file or from scratch
if (null != fnConfigFile) {
functionConfigBuilder = loadConfig(new File(fnConfigFile));
} else {
functionConfigBuilder = FunctionConfig.newBuilder();
}

if (null != fqfn) {
parseFullyQualifiedFunctionName(fqfn, functionConfigBuilder);
} else {
if (null != tenant) {
functionConfigBuilder.setTenant(tenant);
}
if (null != namespace) {
functionConfigBuilder.setNamespace(namespace);
}
if (null != functionName) {
functionConfigBuilder.setName(functionName);
}
}

if (null != inputs) {
String[] topicNames = inputs.split(",");
for (int i = 0; i < topicNames.length; ++i) {
Expand All @@ -192,15 +213,6 @@ void processArguments() throws Exception {
if (null != logTopic) {
functionConfigBuilder.setLogTopic(logTopic);
}
if (null != tenant) {
functionConfigBuilder.setTenant(tenant);
}
if (null != namespace) {
functionConfigBuilder.setNamespace(namespace);
}
if (null != functionName) {
functionConfigBuilder.setName(functionName);
}
if (null != className) {
functionConfigBuilder.setClassName(className);
}
Expand Down Expand Up @@ -373,6 +385,17 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
}
}

private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig.Builder functionConfigBuilder) {
String[] args = fqfn.split("/");
if (args.length != 3) {
throw new RuntimeException("Fully qualified function names must be of the form tenant/namespace/name");
} else {
functionConfigBuilder.setTenant(args[0]);
functionConfigBuilder.setNamespace(args[1]);
functionConfigBuilder.setName(args[2]);
}
}

private void doPythonSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
if (functionConfigBuilder.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
Expand Down Expand Up @@ -702,7 +725,7 @@ StateGetter getStateGetter() {
TriggerFunction getTriggerer() {
return triggerer;
}

private static FunctionConfig.Builder loadConfig(File file) throws IOException {
String json = FunctionConfigUtils.convertYamlToJson(file);
FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ message FunctionConfig {
bool autoAck = 13;
repeated string inputs = 14;
int32 parallelism = 15;
// Fully qualified function name
// (alternative to specifying tenant/namespace/name)
string fqfn = 16;
}

message PackageLocationMetaData {
Expand Down
23 changes: 23 additions & 0 deletions site/_includes/fqfn.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<section class="fqfn">
<span class="tenant">{{ include.tenant }}</span>/<span class="namespace">{{ include.namespace }}</span>/<span class="name">{{ include.name }}</span>
</section>
6 changes: 3 additions & 3 deletions site/_sass/_docs.scss
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@
}
}

.topic {
.topic, .fqfn {
color: $sx-light-gray;
background-color: $black;
font-size: $code-font-size;
font-family: $font-family-monospace;
padding: 10px 0 10px 20px;
border-radius: 0;

.property { color: $sx-olive; }
.cluster { color: $sx-red; }
.property, .tenant { color: $sx-olive; }
.cluster, .name { color: $sx-red; }
.namespace { color: $sx-7; }
.t { color: $sx-magenta; }

Expand Down
8 changes: 8 additions & 0 deletions site/docs/latest/functions/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ $ bin/pulsar-functions localrun \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction
```

## Fully Qualified Function Name (FQFN) {#fqfn}

Each Pulsar Function has a **Fully Qualified Function Name** (FQFN) that consists of three elements: the function's {% popover tenant %}, {% popover namespace %}, and function name. FQFN's look like this:

{% include fqfn.html tenant="tenant" namespace="namespace" name="name" %}

FQFNs enable you to, for example, create multiple functions with the same name provided that they're in different namespaces.

## Configuration

Pulsar Functions can be configured in two ways:
Expand Down

0 comments on commit 9d23892

Please sign in to comment.