Skip to content

Commit

Permalink
More Pulsar Functions documentation (apache#1362)
Browse files Browse the repository at this point in the history
* add note on multiple input topics

* fix CSS for highlight blocks

* more java API docs

* fix scrolling issue

* add CLI documentation for pulsar-admin functions

* fix YAML issues

* add PF to features list on front page

* add note about SPEs to feature description

* remove unnecessary include in local config

* add new badges to templates

* add section on core programming modeL

* add section on the SDK

* begin adding section on java sdk

* comparison table for native vs SDK

* add decrementCounter method

* add link

* add counters section

* update example functions

* revert Context object to master

* add user config section for java

* fix misspelling in error output

* add new PF config to YAML descriptor

* add processing guarantees doc to the sidebar

* update download URL and add trigger command to CLI docs

* add logTopic flag to CLI docs

* use native python function in quickstart

* add intro to java section

* update example functions and finish draft of API doc

* fix URL in sidebar config

* finish user config section in API doc

* add example ContextFunction

* finish draft of processing guarantees doc

* add section on triggering

* comment out subscription types section for now

* remove unnecessary console.log statement

* remove <hr> in docs template

* add missing DefaultSerDe class import

* fix error output to match test expectation

* Add missing license header to new example Pulsar Function
  • Loading branch information
lucperkins authored and merlimat committed Mar 30, 2018
1 parent a2cf918 commit 5f1aed2
Show file tree
Hide file tree
Showing 35 changed files with 946 additions and 185 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pulsar-broker/src/test/resources/log4j2.yaml
# Mac
.DS_Store

# VisualStudioCode artifacts
.vscode/

# Maven
log/
target/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.apache.pulsar.functions.utils.Utils;

@Slf4j
@Parameters(commandDescription = "Operations about functions")
@Parameters(commandDescription = "Operations for managing Pulsar Functions")
public class CmdFunctions extends CmdBase {

private final PulsarAdminWithFunctions fnAdmin;
Expand Down Expand Up @@ -100,10 +100,10 @@ void processArguments() throws Exception {}
*/
@Getter
abstract class NamespaceCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "Tenant Name", required = true)
@Parameter(names = "--tenant", description = "Tenant name", required = true)
protected String tenant;

@Parameter(names = "--namespace", description = "Namespace Name", required = true)
@Parameter(names = "--namespace", description = "Namespace name", required = true)
protected String namespace;
}

Expand All @@ -112,7 +112,7 @@ abstract class NamespaceCommand extends BaseCommand {
*/
@Getter
abstract class FunctionCommand extends NamespaceCommand {
@Parameter(names = "--name", description = "Function Name", required = true)
@Parameter(names = "--name", description = "Function name", required = true)
protected String functionName;
}

Expand All @@ -121,43 +121,43 @@ abstract class FunctionCommand extends NamespaceCommand {
*/
@Getter
abstract class FunctionConfigCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "Tenant Name")
@Parameter(names = "--tenant", description = "Tenant name")
protected String tenant;
@Parameter(names = "--namespace", description = "Namespace Name")
@Parameter(names = "--namespace", description = "Namespace name")
protected String namespace;
@Parameter(names = "--name", description = "Function Name")
@Parameter(names = "--name", description = "Function name")
protected String functionName;
@Parameter(names = "--className", description = "Function Class Name", required = true)
@Parameter(names = "--className", description = "Function class name", required = true)
protected String className;
@Parameter(
names = "--jar",
description = "Path to Jar",
description = "Path to the Java JAR file",
listConverter = StringConverter.class)
protected String jarFile;
@Parameter(
names = "--py",
description = "Path to Python",
description = "Path to the main Python file",
listConverter = StringConverter.class)
protected String pyFile;
@Parameter(names = "--inputs", description = "Input Topic Name")
@Parameter(names = "--inputs", description = "Input topic name")
protected String inputs;
@Parameter(names = "--output", description = "Output Topic Name")
@Parameter(names = "--output", description = "Output topic name")
protected String output;
@Parameter(names = "--logTopic", description = "Log Topic")
@Parameter(names = "--logTopic", description = "Log topic")
protected String logTopic;
@Parameter(names = "--customSerdeInputs", description = "Map of input topic to serde classname")
@Parameter(names = "--customSerdeInputs", description = "Map of input topic to SerDe class name")
protected String customSerdeInputString;
@Parameter(names = "--outputSerdeClassName", description = "Output SerDe")
protected String outputSerdeClassName;
@Parameter(names = "--functionConfigFile", description = "Function Config")
@Parameter(names = "--functionConfigFile", description = "Path to a YAML config file for the function")
protected String fnConfigFile;
@Parameter(names = "--processingGuarantees", description = "Processing Guarantees")
@Parameter(names = "--processingGuarantees", description = "Processing guarantees applied to the function")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--subscriptionType", description = "The type of subscription")
@Parameter(names = "--subscriptionType", description = "The type of subscription used by the function as a consumer")
protected FunctionConfig.SubscriptionType subscriptionType;
@Parameter(names = "--userConfig", description = "User Config")
@Parameter(names = "--userConfig", description = "User-defined config key/values")
protected String userConfigString;
@Parameter(names = "--parallelism", description = "Function Parallelism")
@Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)")
protected String parallelism;

protected FunctionConfig functionConfig;
Expand Down Expand Up @@ -256,7 +256,7 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
functionConfigBuilder.getClassName(), jarFile));
} else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)
&& !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), java.util.function.Function.class)) {
throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither Function nor java.util.function.Function",
throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
functionConfigBuilder.getClassName(), jarFile));
}

Expand Down Expand Up @@ -444,10 +444,10 @@ private String getUniqueInput(FunctionConfig.Builder builder) {
class LocalRunner extends FunctionConfigCommand {

// TODO: this should become bookkeeper url and it should be fetched from pulsar client.
@Parameter(names = "--stateStorageServiceUrl", description = "state storage service url")
@Parameter(names = "--stateStorageServiceUrl", description = "State storage service URL")
protected String stateStorageServiceUrl;

@Parameter(names = "--brokerServiceUrl", description = "The pulsar broker url")
@Parameter(names = "--brokerServiceUrl", description = "The Pulsar broker URL")
protected String brokerServiceUrl;

@Override
Expand Down Expand Up @@ -618,16 +618,16 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "Trigger function")
@Parameters(commandDescription = "Triggers the specified Pulsar Function with a supplied value")
class TriggerFunction extends FunctionCommand {
@Parameter(names = "--triggerValue", description = "The value the function needs to be triggered with")
@Parameter(names = "--triggerValue", description = "The value with which you want to trigger the function")
protected String triggerValue;
@Parameter(names = "--triggerFile", description = "The fileName that contains data the function needs to be triggered with")
@Parameter(names = "--triggerFile", description = "The path to the file that contains the data with which you'd like to trigger the function")
protected String triggerFile;
@Override
void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
throw new RuntimeException("One of triggerValue/triggerFile has to be present");
throw new RuntimeException("Either a trigger value or a trigger filepath needs to be present");
}
String retval = fnAdmin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
System.out.println(retval);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

import java.util.stream.Collectors;

public class ContextFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Logger LOG = context.getLogger();
String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
String functionName = context.getFunctionName();

String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n",
input,
inputTopics);

LOG.info(logMessage);

String metricName = String.format("function-%s-messages-received", functionName);
context.recordMetric(metricName, 1);

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class CounterFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
String[] parts = input.split("\\.");

for (String part : parts) {
context.incrCounter(part, 1);
}
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import org.apache.pulsar.functions.api.Function;

public class ExclamationFunction implements Function<String, String> {

@Override
public String process(String input, Context context) {
return input + "!";
return String.format("%s!", input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
public class JavaNativeExclmationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return input + "!";
return String.format("%s!", input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.functions.api.examples;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

/**
* A function with logging example.
Expand All @@ -29,17 +31,17 @@ public class LoggingFunction implements Function<String, String> {

private static final AtomicIntegerFieldUpdater<LoggingFunction> COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter");
private volatile int counter = 0;

@Override
public String process(String input, Context context) {
Logger LOG = context.getLogger();

int counterLocal = COUNTER_UPDATER.incrementAndGet(this);
if ((counterLocal & Integer.MAX_VALUE) % 100000 == 0) {
context.getLogger().info("Handled {} messages", counterLocal);
LOG.info("Handled {} messages", counterLocal);
}

return input + "!";
return String.format("%s!", input);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.pulsar.functions.api.Function;

public class UserMetricFunction implements Function<String, Void> {

@Override
public Void process(String input, Context context) {
context.recordMetric("MyMetricName", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
}

// function state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void testRegisterExistedFunction() throws IOException {

Response response = registerDefaultFunction();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData("Function " + function + " already exist").reason, ((ErrorData) response.getEntity()).reason);
assertEquals(new ErrorData("Function " + function + " already exists").reason, ((ErrorData) response.getEntity()).reason);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion site/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
source 'https://rubygems.org'
ruby '2.3.1'

gem 'jekyll', '3.7.0'
gem 'jekyll', '3.7.3'
16 changes: 8 additions & 8 deletions site/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ GEM
eventmachine (>= 0.12.9)
http_parser.rb (~> 0.6.0)
eventmachine (1.2.5)
ffi (1.9.18)
ffi (1.9.23)
forwardable-extended (2.6.0)
http_parser.rb (0.6.0)
i18n (0.9.1)
i18n (0.9.5)
concurrent-ruby (~> 1.0)
jekyll (3.7.0)
jekyll (3.7.3)
addressable (~> 2.4)
colorator (~> 1.0)
em-websocket (~> 0.5)
Expand All @@ -27,7 +27,7 @@ GEM
pathutil (~> 0.9)
rouge (>= 1.7, < 4)
safe_yaml (~> 1.0)
jekyll-sass-converter (1.5.1)
jekyll-sass-converter (1.5.2)
sass (~> 3.4)
jekyll-watch (2.0.0)
listen (~> 3.0)
Expand All @@ -40,11 +40,11 @@ GEM
mercenary (0.3.6)
pathutil (0.16.1)
forwardable-extended (~> 2.6)
public_suffix (3.0.1)
rb-fsevent (0.10.2)
public_suffix (3.0.2)
rb-fsevent (0.10.3)
rb-inotify (0.9.10)
ffi (>= 0.5.0, < 2)
rouge (3.1.0)
rouge (3.1.1)
ruby_dep (1.5.0)
safe_yaml (1.0.4)
sass (3.5.5)
Expand All @@ -57,7 +57,7 @@ PLATFORMS
ruby

DEPENDENCIES
jekyll (= 3.7.0)
jekyll (= 3.7.3)

RUBY VERSION
ruby 2.3.1p112
Expand Down
2 changes: 0 additions & 2 deletions site/_config.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,3 @@

destination: generated
baseurl: ""
include:
- docs/example.md
1 change: 1 addition & 0 deletions site/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pulsar_repo: https://github.com/apache/incubator-pulsar/tree/master
baseurl: /
destination: ../generated-site/content

preview_version: 2.0.0-streamlio-6
current_version: 1.22.0-incubating
archived_releases:
- 1.21.0-incubating
Expand Down
Loading

0 comments on commit 5f1aed2

Please sign in to comment.