Skip to content

Commit

Permalink
Integrate function cli into pulsar-admin cli (apache#1331)
Browse files Browse the repository at this point in the history
* Move pulsar functions dependency version to root pom and remove duplicated license headers

This addresses some comments in pulsar functions PR apache#1314

* shade worker

* Fix broken master

* Upgrade the bookkeeper storage client dependency to the official bookkeeper version

This removes the temp dependency in `pulsar-functions-instance`

* set `protobuf2.version` in pulsar-common

* provide a shaded worker

* include worker dependency at broker

* Embeded function worker at broker

* rename 'function worker' to 'functions worker'

* add "--no-functions-worker" for pulsar-client-cpp tests

* Integrate function cli into pulsar-admin cli

- rename `pulsar-client-tools-shaded` to `pulsar-client-admin-shaded-for-functions`, because this module is used by functions only to avoid protobuf conflicts
- move protobuf3 references to Utils, so it won't be referenced out side of pulsar-functions
- integrate function cli into pulsar-admin cli

* Fix license header issues

* Fixed ZK cache test exectutor configuration.

Fixes apache#1338
  • Loading branch information
sijie authored and merlimat committed Mar 6, 2018
1 parent a6e65eb commit 6230ab4
Show file tree
Hide file tree
Showing 29 changed files with 258 additions and 351 deletions.
53 changes: 47 additions & 6 deletions bin/pulsar-admin
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ PULSAR_HOME=`cd $BINDIR/..;pwd`
DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml

# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}

if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_tools_env.sh"
Expand All @@ -41,7 +48,7 @@ else
fi

# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
PULSAR_JAR=$RELEASE_JAR
fi
Expand All @@ -61,20 +68,20 @@ add_maven_deps_to_classpath() {
if [ "$MAVEN_HOME" != "" ]; then
MVN=${MAVEN_HOME}/bin/mvn
fi

# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
f="${PULSAR_HOME}/all/target/classpath.txt"
if [ ! -f "${f}" ]
then
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}

if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
else
add_maven_deps_to_classpath
fi
Expand All @@ -97,11 +104,45 @@ OPTS="$OPTS $PULSAR_EXTRA_OPTS"

# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}

#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"

# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime/target/java-instance.jar"
if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
else
echo "\nCouldn't find pulsar java instance jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
fi

# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
else
echo "\nCouldn't find pulsar python instance.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
fi

# functions
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"

#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-client-shaded</module>
<module>pulsar-client-admin</module>
<module>pulsar-client-admin-shaded</module>
<module>pulsar-client-admin-shaded-for-functions</module>
<module>pulsar-client-tools</module>
<module>pulsar-client-tools-shaded</module>
<module>pulsar-client-tools-test</module>
<module>pulsar-websocket</module>
<module>pulsar-proxy</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
<relativePath>..</relativePath>
</parent>

<artifactId>pulsar-client-tools-shaded</artifactId>
<name>Pulsar Client Admin Shaded</name>
<artifactId>pulsar-client-admin-shaded-for-functions</artifactId>
<name>Pulsar Client Admin Shaded (for functions)</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-tools</artifactId>
<artifactId>pulsar-client-admin-original</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
Expand Down Expand Up @@ -63,7 +63,6 @@
<include>org.apache.bookkeeper:circe-checksum</include>
<include>org.apache.pulsar:pulsar-client-original</include>
<include>org.apache.pulsar:pulsar-client-admin-original</include>
<include>org.apache.pulsar:pulsar-client-tools</include>
<!-- client dependencies as below -->
<include>org.apache.commons:commons-lang3</include>
<include>commons-codec:commons-codec</include>
Expand Down Expand Up @@ -119,12 +118,6 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-client-tools</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>commons-logging:commons-logging</artifact>
<includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
Expand Down Expand Up @@ -82,7 +81,7 @@ public IObjectFactory getObjectFactory() {

private static final String TEST_NAME = "test_name";

private PulsarFunctionsAdmin admin;
private PulsarAdminWithFunctions admin;
private Functions functions;
private CmdFunctions cmd;

Expand All @@ -101,7 +100,7 @@ private String generateCustomSerdeInputs(String topic, String serde) {

@BeforeMethod
public void setup() throws Exception {
this.admin = mock(PulsarFunctionsAdmin.class);
this.admin = mock(PulsarAdminWithFunctions.class);
this.functions = mock(Functions.class);
when(admin.functions()).thenReturn(functions);
when(admin.getServiceUrl()).thenReturn(URI.create("http://localhost:1234").toURL());
Expand Down
85 changes: 85 additions & 0 deletions pulsar-client-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,91 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- functions related dependencies (begin) -->

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-worker-shaded</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-lite</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf.nano</groupId>
<artifactId>protobuf-javanano</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf-lite</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf-nano</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-utils</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-metrics</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-instance</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-worker</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>

<!-- functions related dependencies (end) -->
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
Expand All @@ -45,7 +44,7 @@
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
Expand All @@ -64,12 +63,13 @@
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.pulsar.functions.utils.Utils;

@Slf4j
@Parameters(commandDescription = "Operations about functions")
public class CmdFunctions extends CmdBase {

private final PulsarFunctionsAdmin fnAdmin;
private final PulsarAdminWithFunctions fnAdmin;
private final LocalRunner localRunner;
private final CreateFunction creater;
private final DeleteFunction deleter;
Expand Down Expand Up @@ -451,7 +451,7 @@ void runCmd() throws Exception {
throw new RuntimeException("Missing arguments");
}

String serviceUrl = ((PulsarFunctionsAdmin) admin).getClientConf().getServiceUrl();
String serviceUrl = ((PulsarAdminWithFunctions) admin).getClientConf().getServiceUrl();
if (brokerServiceUrl != null) {
serviceUrl = brokerServiceUrl;
}
Expand Down Expand Up @@ -511,7 +511,7 @@ void runCmd() throws Exception {
class GetFunction extends FunctionCommand {
@Override
void runCmd() throws Exception {
String json = JsonFormat.printer().print(fnAdmin.functions().getFunction(tenant, namespace, functionName));
String json = Utils.printJson(fnAdmin.functions().getFunction(tenant, namespace, functionName));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
Expand All @@ -521,7 +521,7 @@ void runCmd() throws Exception {
class GetFunctionStatus extends FunctionCommand {
@Override
void runCmd() throws Exception {
String json = JsonFormat.printer().print(fnAdmin.functions().getFunctionStatus(tenant, namespace, functionName));
String json = Utils.printJson(fnAdmin.functions().getFunctionStatus(tenant, namespace, functionName));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
Expand Down Expand Up @@ -616,7 +616,7 @@ void runCmd() throws Exception {

public CmdFunctions(PulsarAdmin admin) {
super("functions", admin);
this.fnAdmin = (PulsarFunctionsAdmin) admin;
this.fnAdmin = (PulsarAdminWithFunctions) admin;
localRunner = new LocalRunner();
creater = new CreateFunction();
deleter = new DeleteFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class PulsarAdminTool {
commandMap.put("persistent", CmdPersistentTopics.class);
commandMap.put("non-persistent", CmdNonPersistentTopics.class);
commandMap.put("resource-quotas", CmdResourceQuotas.class);
commandMap.put("functions", CmdFunctions.class);
}

private void setupCommands(BiFunction<URL, ClientConfigurationData, ? extends PulsarAdmin> adminFactory) {
Expand All @@ -103,7 +105,7 @@ private void setupCommands(BiFunction<URL, ClientConfigurationData, ? extends Pu
boolean run(String[] args) {
return run(args, (url, config) -> {
try {
return new PulsarAdmin(url, config);
return new PulsarAdminWithFunctions(url, config);
} catch (Exception ex) {
System.err.println(ex.getClass() + ": " + ex.getMessage());
System.exit(1);
Expand Down Expand Up @@ -173,7 +175,36 @@ public static void main(String[] args) throws Exception {

PulsarAdminTool tool = new PulsarAdminTool(properties);

if (tool.run(Arrays.copyOfRange(args, 1, args.length))) {
int cmdPos;
for (cmdPos = 1; cmdPos < args.length; cmdPos++) {
if (tool.commandMap.containsKey(args[cmdPos])) {
break;
}
}

++cmdPos;
boolean isLocalRun = false;
if (cmdPos < args.length) {
isLocalRun = "localrun" == args[cmdPos].toLowerCase();
}

BiFunction<URL, ClientConfigurationData, ? extends PulsarAdmin> adminFactory;
if (isLocalRun) {
// bypass constructing admin client
adminFactory = (url, config) -> null;
} else {
adminFactory = (url, config) -> {
try {
return new PulsarAdminWithFunctions(url, config);
} catch (Exception ex) {
System.err.println(ex.getClass() + ": " + ex.getMessage());
System.exit(1);
return null;
}
};
}

if (tool.run(Arrays.copyOfRange(args, 1, args.length), adminFactory)) {
System.exit(0);
} else {
System.exit(1);
Expand Down
Loading

0 comments on commit 6230ab4

Please sign in to comment.