diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index def4b2be47887..b50da21bfd69d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -84,7 +84,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId()); } @PUT @@ -106,7 +106,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId()); } @@ -124,7 +124,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, public Response deregisterFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId()); } @GET @@ -143,7 +143,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionInfo( - tenant, namespace, functionName); + tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @GET @@ -196,7 +196,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, public Response listFunctions(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { return functions.listFunctions( - tenant, namespace); + tenant, namespace, FunctionsImpl.FUNCTION); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java index 2fe398973d4d1..0f5a5c5d3b372 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java @@ -72,7 +72,7 @@ public Response registerSink(final @PathParam("tenant") String tenant, final @FormDataParam("sinkConfig") String sinkConfigJson) { return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId()); } @PUT @@ -93,7 +93,7 @@ public Response updateSink(final @PathParam("tenant") String tenant, final @FormDataParam("sinkConfig") String sinkConfigJson) { return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId()); } @@ -111,7 +111,7 @@ public Response updateSink(final @PathParam("tenant") String tenant, public Response deregisterSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId()); } @GET @@ -129,7 +129,7 @@ public Response deregisterSink(final @PathParam("tenant") String tenant, public Response getSinkInfo(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) throws IOException { - return functions.getFunctionInfo(tenant, namespace, sinkName); + return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK); } @GET @@ -180,7 +180,7 @@ public Response getSinkStatus(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}") public Response listSinks(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { - return functions.listFunctions(tenant, namespace); + return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java index 1a82ac2880b18..4bda48995c601 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java @@ -73,7 +73,7 @@ public Response registerSource(final @PathParam("tenant") String tenant, final @FormDataParam("sourceConfig") String sourceConfigJson) { return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId()); } @PUT @@ -94,7 +94,7 @@ public Response updateSource(final @PathParam("tenant") String tenant, final @FormDataParam("sourceConfig") String sourceConfigJson) { return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId()); } @@ -112,7 +112,7 @@ public Response updateSource(final @PathParam("tenant") String tenant, public Response deregisterSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId()); } @GET @@ -131,7 +131,7 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) throws IOException { return functions.getFunctionInfo( - tenant, namespace, sourceName); + tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @GET @@ -183,7 +183,7 @@ public Response getSourceStatus(final @PathParam("tenant") String tenant, public Response listSources(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { return functions.listFunctions( - tenant, namespace); + tenant, namespace, FunctionsImpl.SOURCE); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index afa7ef1dd7bd4..acd6ed394618a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception { // validate updated function prop = auto-ack=false and instnaceid for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) { String functionName = baseFunctionName + i; - assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck()); + assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).isAutoAck()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index babcdd6df4c3a..21133d678d8c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -407,36 +407,6 @@ public void testAuthorization(boolean validRoleName) throws Exception { } } - /** - * Test to verify: function-server loads jar using file-url and derives type-args classes if not provided - * @throws Exception - */ - @Test(timeOut = 20000) - public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception { - - final String namespacePortion = "io"; - final String replNamespace = tenant + "/" + namespacePortion; - final String sinkTopic = "persistent://" + replNamespace + "/output"; - final String functionName = "PulsarSink-test"; - final String subscriptionName = "test-sub"; - admin.namespaces().createNamespace(replNamespace); - Set clusters = Sets.newHashSet(Lists.newArrayList("use")); - admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); - - String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile(); - - FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, - "my.*", sinkTopic, subscriptionName); - - admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); - - FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName); - - assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName()); - assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName()); - - } - @Test(timeOut = 20000) public void testFunctionStopAndRestartApi() throws Exception { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 10c890c810842..a72260584e0c3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -25,7 +25,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.FunctionConfig; @@ -77,7 +76,7 @@ public interface Functions { * @throws PulsarAdminException * Unexpected error */ - FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException; + FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** * Create a new function. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java index 3f8fe2f478906..afad6f371f102 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java @@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.SinkConfig; import java.util.List; -import java.util.Set; /** * Admin interface for Sink management. @@ -50,7 +48,7 @@ public interface Sink { * @throws PulsarAdminException * Unexpected error */ - List getSinks(String tenant, String namespace) throws PulsarAdminException; + List listSinks(String tenant, String namespace) throws PulsarAdminException; /** * Get the configuration for the specified sink. @@ -77,7 +75,7 @@ public interface Sink { * @throws PulsarAdminException * Unexpected error */ - Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException; + SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException; /** * Create a new sink. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java index 3c43cf203f51b..9d1a318f1109c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java @@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.SourceConfig; import java.util.List; -import java.util.Set; /** * Admin interface for Source management. @@ -50,7 +48,7 @@ public interface Source { * @throws PulsarAdminException * Unexpected error */ - List getSources(String tenant, String namespace) throws PulsarAdminException; + List listSources(String tenant, String namespace) throws PulsarAdminException; /** * Get the configuration for the specified source. @@ -77,7 +75,7 @@ public interface Source { * @throws PulsarAdminException * Unexpected error */ - Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException; + SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException; /** * Create a new source. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index b9ea1a52448af..77cc3d6816f2b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -46,7 +46,6 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.FunctionConfig; @@ -80,16 +79,13 @@ public List getFunctions(String tenant, String namespace) throws PulsarA } @Override - public FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException { + public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException { try { Response response = request(functions.path(tenant).path(namespace).path(function)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { throw new ClientErrorException(response); } - String jsonResponse = response.readEntity(String.class); - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - mergeJson(jsonResponse, functionDetailsBuilder); - return functionDetailsBuilder.build(); + return response.readEntity(FunctionConfig.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java index 4e13693cc06a0..d117374131877 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java @@ -27,7 +27,6 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.SinkConfig; @@ -44,8 +43,6 @@ import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; @Slf4j public class SinkImpl extends BaseResource implements Sink { @@ -58,7 +55,7 @@ public SinkImpl(WebTarget web, Authentication auth) { } @Override - public List getSinks(String tenant, String namespace) throws PulsarAdminException { + public List listSinks(String tenant, String namespace) throws PulsarAdminException { try { Response response = request(sink.path(tenant).path(namespace)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { @@ -72,16 +69,13 @@ public List getSinks(String tenant, String namespace) throws PulsarAdmin } @Override - public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException { + public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException { try { Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { throw new ClientErrorException(response); } - String jsonResponse = response.readEntity(String.class); - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - mergeJson(jsonResponse, functionDetailsBuilder); - return functionDetailsBuilder.build(); + return response.readEntity(SinkConfig.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java index 65a2bfc55a06b..0c7a1df389c1f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java @@ -27,7 +27,6 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.utils.SourceConfig; @@ -44,8 +43,6 @@ import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; @Slf4j public class SourceImpl extends BaseResource implements Source { @@ -58,7 +55,7 @@ public SourceImpl(WebTarget web, Authentication auth) { } @Override - public List getSources(String tenant, String namespace) throws PulsarAdminException { + public List listSources(String tenant, String namespace) throws PulsarAdminException { try { Response response = request(source.path(tenant).path(namespace)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { @@ -72,16 +69,13 @@ public List getSources(String tenant, String namespace) throws PulsarAdm } @Override - public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException { + public SourceConfig getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException { try { Response response = request(source.path(tenant).path(namespace).path(sourceName)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { throw new ClientErrorException(response); } - String jsonResponse = response.readEntity(String.class); - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - mergeJson(jsonResponse, functionDetailsBuilder); - return functionDetailsBuilder.build(); + return response.readEntity(SourceConfig.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 2f23a63039742..7e44b440fd01d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -697,9 +697,9 @@ void runCmd() throws Exception { class GetFunction extends FunctionCommand { @Override void runCmd() throws Exception { - String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName)); + FunctionConfig functionConfig = admin.functions().getFunction(tenant, namespace, functionName); Gson gson = new GsonBuilder().setPrettyPrinting().create(); - System.out.println(gson.toJson(new JsonParser().parse(json))); + System.out.println(gson.toJson(functionConfig)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index e56a34384e12b..3b9159e98c329 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -30,16 +30,14 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import java.io.File; import java.io.IOException; import java.lang.reflect.Type; import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import lombok.Getter; @@ -67,6 +65,8 @@ public class CmdSinks extends CmdBase { private final CreateSink createSink; private final UpdateSink updateSink; private final DeleteSink deleteSink; + private final ListSinks listSinks; + private final GetSink getSink; private final LocalSinkRunner localSinkRunner; public CmdSinks(PulsarAdmin admin) { @@ -74,11 +74,15 @@ public CmdSinks(PulsarAdmin admin) { createSink = new CreateSink(); updateSink = new UpdateSink(); deleteSink = new DeleteSink(); + listSinks = new ListSinks(); + getSink = new GetSink(); localSinkRunner = new LocalSinkRunner(); jcommander.addCommand("create", createSink); jcommander.addCommand("update", updateSink); jcommander.addCommand("delete", deleteSink); + jcommander.addCommand("list", listSinks); + jcommander.addCommand("get", getSink); jcommander.addCommand("localrun", localSinkRunner); jcommander.addCommand("available-sinks", new ListBuiltInSinks()); } @@ -184,7 +188,7 @@ protected String validateSinkType(String sinkType) throws IOException { } @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster") - protected class CreateSink extends SinkCommand { + protected class CreateSink extends SinkDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { @@ -197,7 +201,7 @@ void runCmd() throws Exception { } @Parameters(commandDescription = "Update a Pulsar IO sink connector") - protected class UpdateSink extends SinkCommand { + protected class UpdateSink extends SinkDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { @@ -209,7 +213,7 @@ void runCmd() throws Exception { } } - abstract class SinkCommand extends BaseCommand { + abstract class SinkDetailsCommand extends BaseCommand { @Parameter(names = "--tenant", description = "The sink's tenant") protected String tenant; @Parameter(names = "--namespace", description = "The sink's namespace") @@ -506,25 +510,70 @@ protected String validateSinkType(String sinkType) throws IOException { } } - @Parameters(commandDescription = "Stops a Pulsar IO sink connector") - protected class DeleteSink extends BaseCommand { - - @Parameter(names = "--tenant", description = "The tenant of the sink") + /** + * Sink level command + */ + @Getter + abstract class SinkCommand extends BaseCommand { + @Parameter(names = "--tenant", description = "The sink's tenant") protected String tenant; - @Parameter(names = "--namespace", description = "The namespace of the sink") + @Parameter(names = "--namespace", description = "The sink's namespace") protected String namespace; - @Parameter(names = "--name", description = "The name of the sink") - protected String name; + @Parameter(names = "--name", description = "The sink's name") + protected String sinkName; @Override void processArguments() throws Exception { super.processArguments(); - if (null == name) { - throw new ParameterException( + if (tenant == null) { + tenant = PUBLIC_TENANT; + } + if (namespace == null) { + namespace = DEFAULT_NAMESPACE; + } + if (null == sinkName) { + throw new RuntimeException( "You must specify a name for the sink"); } + } + } + + @Parameters(commandDescription = "Stops a Pulsar IO sink connector") + protected class DeleteSink extends SinkCommand { + + @Override + void runCmd() throws Exception { + admin.sink().deleteSink(tenant, namespace, sinkName); + print("Deleted successfully"); + } + } + + @Parameters(commandDescription = "Gets the information about a Pulsar IO sink connector") + protected class GetSink extends SinkCommand { + + @Override + void runCmd() throws Exception { + SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, sinkName); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(sinkConfig)); + } + } + + /** + * List Sources command + */ + @Parameters(commandDescription = "List all running Pulsar IO sink connectors") + protected class ListSinks extends BaseCommand { + @Parameter(names = "--tenant", description = "The sink's tenant") + protected String tenant; + + @Parameter(names = "--namespace", description = "The sink's namespace") + protected String namespace; + + @Override + public void processArguments() { if (tenant == null) { tenant = PUBLIC_TENANT; } @@ -535,8 +584,9 @@ void processArguments() throws Exception { @Override void runCmd() throws Exception { - admin.sink().deleteSink(tenant, namespace, name); - print("Deleted successfully"); + List sinks = admin.sink().listSinks(tenant, namespace); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(sinks)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index f2d768199e5ac..f27b0a543735c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -28,6 +28,7 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import java.io.File; @@ -35,6 +36,7 @@ import java.lang.reflect.Type; import java.nio.file.Paths; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -66,6 +68,8 @@ public class CmdSources extends CmdBase { private final CreateSource createSource; private final DeleteSource deleteSource; + private final GetSource getSource; + private final ListSources listSources; private final UpdateSource updateSource; private final LocalSourceRunner localSourceRunner; @@ -74,11 +78,15 @@ public CmdSources(PulsarAdmin admin) { createSource = new CreateSource(); updateSource = new UpdateSource(); deleteSource = new DeleteSource(); + listSources = new ListSources(); + getSource = new GetSource(); localSourceRunner = new LocalSourceRunner(); jcommander.addCommand("create", createSource); jcommander.addCommand("update", updateSource); jcommander.addCommand("delete", deleteSource); + jcommander.addCommand("get", getSource); + jcommander.addCommand("list", listSources); jcommander.addCommand("localrun", localSourceRunner); jcommander.addCommand("available-sources", new ListBuiltInSources()); } @@ -184,7 +192,7 @@ protected String validateSourceType(String sourceType) throws IOException { } @Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster") - protected class CreateSource extends SourceCommand { + protected class CreateSource extends SourceDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) { @@ -197,7 +205,7 @@ void runCmd() throws Exception { } @Parameters(commandDescription = "Update a Pulsar IO source connector") - protected class UpdateSource extends SourceCommand { + protected class UpdateSource extends SourceDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) { @@ -209,7 +217,7 @@ void runCmd() throws Exception { } } - abstract class SourceCommand extends BaseCommand { + abstract class SourceDetailsCommand extends BaseCommand { @Parameter(names = "--tenant", description = "The source's tenant") protected String tenant; @Parameter(names = "--namespace", description = "The source's namespace") @@ -460,40 +468,86 @@ protected String validateSourceType(String sourceType) throws IOException { } } - @Parameters(commandDescription = "Stops a Pulsar IO source connector") - protected class DeleteSource extends BaseCommand { - - @Parameter(names = "--tenant", description = "The tenant of a sink or source") + /** + * Function level command + */ + @Getter + abstract class SourceCommand extends BaseCommand { + @Parameter(names = "--tenant", description = "The source's tenant") protected String tenant; - @Parameter(names = "--namespace", description = "The namespace of a sink or source") + @Parameter(names = "--namespace", description = "The source's namespace") protected String namespace; - @Parameter(names = "--name", description = "The name of a sink or source") - protected String name; + @Parameter(names = "--name", description = "The source's name") + protected String sourceName; @Override void processArguments() throws Exception { super.processArguments(); - if (null == name) { - throw new ParameterException( - "You must specify a name for the source"); - } if (tenant == null) { tenant = PUBLIC_TENANT; } if (namespace == null) { namespace = DEFAULT_NAMESPACE; } + if (null == sourceName) { + throw new RuntimeException( + "You must specify a name for the source"); + } } + } + + @Parameters(commandDescription = "Stops a Pulsar IO source connector") + protected class DeleteSource extends SourceCommand { @Override void runCmd() throws Exception { - admin.source().deleteSource(tenant, namespace, name); + admin.source().deleteSource(tenant, namespace, sourceName); print("Delete source successfully"); } } + @Parameters(commandDescription = "Gets the information about a Pulsar IO source connector") + protected class GetSource extends SourceCommand { + + @Override + void runCmd() throws Exception { + SourceConfig sourceConfig = admin.source().getSource(tenant, namespace, sourceName); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(sourceConfig)); + } + } + + /** + * List Sources command + */ + @Parameters(commandDescription = "List all running Pulsar IO source connectors") + protected class ListSources extends BaseCommand { + @Parameter(names = "--tenant", description = "The sink's tenant") + protected String tenant; + + @Parameter(names = "--namespace", description = "The sink's namespace") + protected String namespace; + + @Override + public void processArguments() { + if (tenant == null) { + tenant = PUBLIC_TENANT; + } + if (namespace == null) { + namespace = DEFAULT_NAMESPACE; + } + } + + @Override + void runCmd() throws Exception { + List sources = admin.source().listSources(tenant, namespace); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(sources)); + } + } + @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster") public class ListBuiltInSources extends BaseCommand { @Override diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index 0c561453fdd15..b52bc17498f84 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -962,7 +962,7 @@ public void testMixCliAndConfigFile( public void testDeleteMissingTenant() throws Exception { deleteSink.tenant = null; deleteSink.namespace = NAMESPACE; - deleteSink.name = NAME; + deleteSink.sinkName = NAME; deleteSink.processArguments(); @@ -975,7 +975,7 @@ public void testDeleteMissingTenant() throws Exception { public void testDeleteMissingNamespace() throws Exception { deleteSink.tenant = TENANT; deleteSink.namespace = null; - deleteSink.name = NAME; + deleteSink.sinkName = NAME; deleteSink.processArguments(); @@ -984,11 +984,11 @@ public void testDeleteMissingNamespace() throws Exception { verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME)); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink") + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink") public void testDeleteMissingName() throws Exception { deleteSink.tenant = TENANT; deleteSink.namespace = NAMESPACE; - deleteSink.name = null; + deleteSink.sinkName = null; deleteSink.processArguments(); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index fe799bc61f3e1..4a3b3cc2fa5f7 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -820,7 +820,7 @@ public void testMixCliAndConfigFile( public void testDeleteMissingTenant() throws Exception { deleteSource.tenant = null; deleteSource.namespace = NAMESPACE; - deleteSource.name = NAME; + deleteSource.sourceName = NAME; deleteSource.processArguments(); @@ -833,7 +833,7 @@ public void testDeleteMissingTenant() throws Exception { public void testDeleteMissingNamespace() throws Exception { deleteSource.tenant = TENANT; deleteSource.namespace = null; - deleteSource.name = NAME; + deleteSource.sourceName = NAME; deleteSource.processArguments(); @@ -842,11 +842,11 @@ public void testDeleteMissingNamespace() throws Exception { verify(source).deleteSource(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME)); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source") + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source") public void testDeleteMissingName() throws Exception { deleteSource.tenant = TENANT; deleteSource.namespace = NAMESPACE; - deleteSource.name = null; + deleteSource.sourceName = null; deleteSource.processArguments(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index cf182a8e4a490..11b623d35817f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -20,15 +20,18 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; import static org.apache.commons.lang.StringUtils.isNotBlank; import static org.apache.commons.lang.StringUtils.isNotEmpty; +import static org.apache.commons.lang3.StringUtils.isEmpty; public class FunctionConfigUtils { @@ -195,4 +198,83 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader } return functionDetailsBuilder.build(); } + + public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(functionDetails.getTenant()); + functionConfig.setNamespace(functionDetails.getNamespace()); + functionConfig.setName(functionDetails.getName()); + functionConfig.setParallelism(functionDetails.getParallelism()); + functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); + Map consumerConfigMap = new HashMap<>(); + for (Map.Entry input : functionDetails.getSource().getInputSpecsMap().entrySet()) { + ConsumerConfig consumerConfig = new ConsumerConfig(); + if (!isEmpty(input.getValue().getSerdeClassName())) { + consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName()); + } + if (!isEmpty(input.getValue().getSchemaType())) { + consumerConfig.setSchemaType(input.getValue().getSchemaType()); + } + consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern()); + consumerConfigMap.put(input.getKey(), consumerConfig); + } + functionConfig.setInputSpecs(consumerConfigMap); + if (!isEmpty(functionDetails.getSource().getSubscriptionName())) { + functionConfig.setSubName(functionDetails.getSource().getSubscriptionName()); + } + if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) { + functionConfig.setRetainOrdering(true); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); + } else { + functionConfig.setRetainOrdering(false); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + } + functionConfig.setAutoAck(functionDetails.getAutoAck()); + functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (!isEmpty(functionDetails.getSink().getTopic())) { + functionConfig.setOutput(functionDetails.getSink().getTopic()); + } + if (!isEmpty(functionDetails.getSink().getSerDeClassName())) { + functionConfig.setOutputSerdeClassName(functionDetails.getSink().getSerDeClassName()); + } + if (!isEmpty(functionDetails.getSink().getSchemaType())) { + functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType()); + } + if (!isEmpty(functionDetails.getLogTopic())) { + functionConfig.setLogTopic(functionDetails.getLogTopic()); + } + functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime())); + functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); + if (functionDetails.hasRetryDetails()) { + functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries()); + if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) { + functionConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic()); + } + } + Map userConfig; + if (!isEmpty(functionDetails.getUserConfig())) { + Type type = new TypeToken>() {}.getType(); + userConfig = new Gson().fromJson(functionDetails.getUserConfig(), type); + } else { + userConfig = new HashMap<>(); + } + if (userConfig.containsKey(WindowConfig.WINDOW_CONFIG_KEY)) { + WindowConfig windowConfig = (WindowConfig) userConfig.get(WindowConfig.WINDOW_CONFIG_KEY); + userConfig.remove(WindowConfig.WINDOW_CONFIG_KEY); + functionConfig.setClassName(windowConfig.getActualWindowFunctionClassName()); + functionConfig.setWindowConfig(windowConfig); + } else { + functionConfig.setClassName(functionDetails.getClassName()); + } + functionConfig.setUserConfig(userConfig); + + if (functionDetails.hasResources()) { + Resources resources = new Resources(); + resources.setCpu(functionDetails.getResources().getCpu()); + resources.setRam(functionDetails.getResources().getRam()); + resources.setDisk(functionDetails.getResources().getDisk()); + } + + return functionConfig; + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 95803abb08494..545d34419d6ea 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.api.utils.IdentityFunction; @@ -29,9 +30,13 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Type; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; @@ -175,4 +180,56 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas } return functionDetailsBuilder.build(); } + + public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(functionDetails.getTenant()); + sinkConfig.setNamespace(functionDetails.getNamespace()); + sinkConfig.setName(functionDetails.getName()); + sinkConfig.setParallelism(functionDetails.getParallelism()); + sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); + Map consumerConfigMap = new HashMap<>(); + for (Map.Entry input : functionDetails.getSource().getInputSpecsMap().entrySet()) { + ConsumerConfig consumerConfig = new ConsumerConfig(); + if (!isEmpty(input.getValue().getSerdeClassName())) { + consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName()); + } + if (!isEmpty(input.getValue().getSchemaType())) { + consumerConfig.setSchemaType(input.getValue().getSchemaType()); + } + consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern()); + consumerConfigMap.put(input.getKey(), consumerConfig); + } + sinkConfig.setInputSpecs(consumerConfigMap); + if (!isEmpty(functionDetails.getSource().getSubscriptionName())) { + sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName()); + } + if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) { + sinkConfig.setRetainOrdering(true); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); + } else { + sinkConfig.setRetainOrdering(false); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + } + sinkConfig.setAutoAck(functionDetails.getAutoAck()); + sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (!isEmpty(functionDetails.getSink().getClassName())) { + sinkConfig.setClassName(functionDetails.getSink().getClassName()); + } + if (!isEmpty(functionDetails.getSink().getBuiltin())) { + sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin()); + } + if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) { + Type type = new TypeToken>() {}.getType(); + sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type)); + } + if (functionDetails.hasResources()) { + Resources resources = new Resources(); + resources.setCpu(functionDetails.getResources().getCpu()); + resources.setRam(functionDetails.getResources().getRam()); + resources.setDisk(functionDetails.getResources().getDisk()); + } + + return sinkConfig; + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index a132c8a9ef682..34240628392b2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -20,6 +20,8 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; @@ -27,6 +29,8 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils; import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSourceType; @@ -41,10 +45,10 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN); if (!isBuiltin) { - if (sourceConfig.getArchive().startsWith(Utils.FILE)) { + if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) { if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } @@ -127,4 +131,40 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader return functionDetailsBuilder.build(); } + + public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(functionDetails.getTenant()); + sourceConfig.setNamespace(functionDetails.getNamespace()); + sourceConfig.setName(functionDetails.getName()); + sourceConfig.setParallelism(functionDetails.getParallelism()); + sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); + Function.SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getClassName())) { + sourceConfig.setClassName(sourceSpec.getClassName()); + } + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin()); + } + if (!StringUtils.isEmpty(sourceSpec.getConfigs())) { + Type type = new TypeToken>() {}.getType(); + sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type)); + } + Function.SinkSpec sinkSpec = functionDetails.getSink(); + sourceConfig.setTopicName(sinkSpec.getTopic()); + if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) { + sourceConfig.setSchemaType(sinkSpec.getSchemaType()); + } + if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) { + sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName()); + } + if (functionDetails.hasResources()) { + Resources resources = new Resources(); + resources.setCpu(functionDetails.getResources().getCpu()); + resources.setRam(functionDetails.getResources().getRam()); + resources.setDisk(functionDetails.getResources().getDisk()); + sourceConfig.setResources(resources); + } + return sourceConfig; + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index d35be614b1822..adeaee162ed37 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -151,6 +151,15 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) { throw new RuntimeException("Unrecognized runtime: " + runtime.name()); } + public static FunctionConfig.Runtime convertRuntime(Runtime runtime) { + for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) { + if (type.name().equals(runtime.name())) { + return type; + } + } + throw new RuntimeException("Unrecognized runtime: " + runtime.name()); + } + public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees convertProcessingGuarantee( FunctionConfig.ProcessingGuarantees processingGuarantees) { for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type : org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.values()) { @@ -161,6 +170,17 @@ public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees co throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); } + public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee( + org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) { + for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) { + if (type.name().equals(processingGuarantees.name())) { + return type; + } + } + throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); + } + + public static Class getSourceType(String className, ClassLoader classloader) { Object userClass = Reflections.createInstance(className, classloader); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java new file mode 100644 index 0000000000000..1f67798b7c2c5 --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; + +/** + * Unit test of {@link Reflections}. + */ +public class FunctionConfigUtilsTest { + + @Test + public void testConvertBackFidelity() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant("test-tenant"); + functionConfig.setNamespace("test-namespace"); + functionConfig.setName("test-function"); + functionConfig.setClassName(IdentityFunction.class.getName()); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + functionConfig.setInputSpecs(inputSpecs); + functionConfig.setOutput("test-output"); + functionConfig.setOutputSerdeClassName("test-serde"); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + functionConfig.setRetainOrdering(false); + functionConfig.setUserConfig(new HashMap<>()); + functionConfig.setAutoAck(true); + functionConfig.setTimeoutMs(2000l); + Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); + FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(convertedConfig) + ); + } +} diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java new file mode 100644 index 0000000000000..c5d1ea0cd3728 --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.pulsar.functions.proto.Function; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; + +/** + * Unit test of {@link Reflections}. + */ +public class SinkConfigUtilsTest { + + @Test + public void testConvertBackFidelity() throws IOException { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant("test-tenant"); + sinkConfig.setNamespace("test-namespace"); + sinkConfig.setName("test-source"); + sinkConfig.setArchive("builtin://jdbc"); + sinkConfig.setSourceSubscriptionName("test-subscription"); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + sinkConfig.setInputSpecs(inputSpecs); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setConfigs(new HashMap<>()); + sinkConfig.setRetainOrdering(false); + sinkConfig.setAutoAck(true); + sinkConfig.setTimeoutMs(2000l); + Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, null); + SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(convertedConfig) + ); + } +} diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java new file mode 100644 index 0000000000000..ef4ce61e89d42 --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils; + +import com.google.gson.Gson; +import org.apache.pulsar.functions.proto.Function; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; + +import static org.testng.Assert.assertEquals; + +/** + * Unit test of {@link Reflections}. + */ +public class SourceConfigUtilsTest { + + @Test + public void testConvertBackFidelity() throws IOException { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant("test-tenant"); + sourceConfig.setNamespace("test-namespace"); + sourceConfig.setName("test-source"); + sourceConfig.setArchive("builtin://jdbc"); + sourceConfig.setTopicName("test-output"); + sourceConfig.setSerdeClassName("test-serde"); + sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sourceConfig.setConfigs(new HashMap<>()); + Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, null); + SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(convertedConfig) + ); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 4faed11a38166..920063e1064ba 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -141,8 +141,8 @@ public synchronized List getAllFunctionMetaData() { * @param namespace the namespace * @return a list of function names */ - public synchronized Collection listFunctions(String tenant, String namespace) { - List ret = new LinkedList<>(); + public synchronized Collection listFunctions(String tenant, String namespace) { + List ret = new LinkedList<>(); if (!this.functionMetaDataMap.containsKey(tenant)) { return ret; @@ -152,7 +152,7 @@ public synchronized Collection listFunctions(String tenant, String names return ret; } for (FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) { - ret.add(functionMetaData.getFunctionDetails().getName()); + ret.add(functionMetaData); } return ret; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 0b245cf391da6..44bb3bd0f4717 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -84,6 +84,7 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; @@ -97,9 +98,15 @@ import net.jodah.typetools.TypeResolver; +// TODO:-Currently The source/sink/functions all share this backend. In the future it might make sense +// to seperate them out in their own implementations as well. @Slf4j public class FunctionsImpl { + public static final String FUNCTION = "Function"; + public static final String SOURCE = "Source"; + public static final String SINK = "Sink"; + private final Supplier workerServiceSupplier; public FunctionsImpl(Supplier workerServiceSupplier) { @@ -126,11 +133,10 @@ private boolean isWorkerServiceAvailable() { return true; } - public Response registerFunction(final String tenant, final String namespace, final String functionName, + public Response registerFunction(final String tenant, final String namespace, final String componentName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, - final String sourceConfigJson, final String sinkConfigJson, - final String clientRole) { + final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson, + final String componentType, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -138,23 +144,23 @@ public Response registerFunction(final String tenant, final String namespace, fi try { if (!isAuthorizedRole(tenant, clientRole)) { - log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace, - functionName, clientRole); + log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace, + componentName, clientRole, componentType); return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData("client is not authorize to perform operation")).build(); } } catch (PulsarAdminException e) { - log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function {}/{}/{} already exists", tenant, namespace, functionName); + if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s already exists", functionName))).build(); + .entity(new ErrorData(String.format("%s %s already exists", componentType, componentName))).build(); } FunctionDetails functionDetails; @@ -166,14 +172,14 @@ public Response registerFunction(final String tenant, final String namespace, fi // validate parameters try { if (isPkgUrlProvided) { - functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); + functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, + functionDetailsJson, componentConfigJson, componentType); } else { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); + functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, + fileDetail, functionDetailsJson, componentConfigJson, componentType); } } catch (Exception e) { - log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } @@ -181,9 +187,9 @@ public Response registerFunction(final String tenant, final String namespace, fi try { worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails); } catch (Exception e) { - log.error("Function {}/{}/{} cannot be admitted by the runtime factory", tenant, namespace, functionName); + log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build(); + .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build(); } // function state @@ -196,7 +202,7 @@ public Response registerFunction(final String tenant, final String namespace, fi packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); } else { packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); if (!isPkgUrlProvided) { packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); } @@ -207,10 +213,10 @@ public Response registerFunction(final String tenant, final String namespace, fi : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); } - public Response updateFunction(final String tenant, final String namespace, final String functionName, + public Response updateFunction(final String tenant, final String namespace, final String componentName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, - final String sourceConfigJson, final String sinkConfigJson, final String clientRole) { + final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson, + final String componentType, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -218,22 +224,22 @@ public Response updateFunction(final String tenant, final String namespace, fina try { if (!isAuthorizedRole(tenant, clientRole)) { - log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace, - functionName, clientRole); + log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace, + componentName, clientRole, componentType); return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData("client is not authorize to perform operation")).build(); } } catch (PulsarAdminException e) { - log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } FunctionDetails functionDetails; @@ -245,14 +251,14 @@ public Response updateFunction(final String tenant, final String namespace, fina // validate parameters try { if (isPkgUrlProvided) { - functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); + functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, + functionDetailsJson, componentConfigJson, componentType); } else { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); + functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, + fileDetail, functionDetailsJson, componentConfigJson, componentType); } } catch (Exception e) { - log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } @@ -260,9 +266,9 @@ public Response updateFunction(final String tenant, final String namespace, fina try { worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails); } catch (Exception e) { - log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", tenant, namespace, functionName); + log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build(); + .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build(); } // function state @@ -276,7 +282,7 @@ public Response updateFunction(final String tenant, final String namespace, fina packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); } else { packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); if (!isPkgUrlProvided) { packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); } @@ -287,8 +293,8 @@ public Response updateFunction(final String tenant, final String namespace, fina : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); } - public Response deregisterFunction(final String tenant, final String namespace, final String functionName, - String clientRole) { + public Response deregisterFunction(final String tenant, final String namespace, final String componentName, + String componentType, String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -296,35 +302,41 @@ public Response deregisterFunction(final String tenant, final String namespace, try { if (!isAuthorizedRole(tenant, clientRole)) { - log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace, - functionName, clientRole); + log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace, + componentName, clientRole, componentType); return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData("client is not authorize to perform operation")).build(); } } catch (PulsarAdminException e) { - log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); + log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } // validate parameters try { - validateDeregisterRequestParams(tenant, namespace, functionName); + validateDeregisterRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { - log.error("Invalid deregister function request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function to deregister does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } CompletableFuture completableFuture = functionMetaDataManager.deregisterFunction(tenant, - namespace, functionName); + namespace, componentName); RequestResult requestResult = null; try { @@ -334,12 +346,12 @@ public Response deregisterFunction(final String tenant, final String namespace, .entity(new ErrorData(requestResult.getMessage())).build(); } } catch (ExecutionException e) { - log.error("Execution Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName, + log.error("Execution Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.serverError().type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getCause().getMessage())).build(); } catch (InterruptedException e) { - log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName, + log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON).build(); } @@ -347,7 +359,8 @@ public Response deregisterFunction(final String tenant, final String namespace, return Response.status(Status.OK).entity(requestResult.toJson()).build(); } - public Response getFunctionInfo(final String tenant, final String namespace, final String functionName) + public Response getFunctionInfo(final String tenant, final String namespace, final String componentName, + final String componentType) throws IOException { if (!isWorkerServiceAvailable()) { @@ -356,25 +369,38 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, functionName); + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { - log.error("Invalid getFunction request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function in getFunction does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); + } + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } - FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, - functionName); - String functionDetailsJson = org.apache.pulsar.functions.utils.Utils - .printJson(functionMetaData.getFunctionDetails()); - return Response.status(Status.OK).entity(functionDetailsJson).build(); + String retval; + if (componentType.equals(FUNCTION)) { + FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); + retval = new Gson().toJson(config); + } else if (componentType.equals(SOURCE)) { + SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); + retval = new Gson().toJson(config); + } else { + SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); + retval = new Gson().toJson(config); + } + return Response.status(Status.OK).entity(retval).build(); } public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName, @@ -485,7 +511,7 @@ public Response stopFunctionInstances(final String tenant, final String namespac // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, functionName); + validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); } catch (IllegalArgumentException e) { log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) @@ -519,7 +545,7 @@ public Response getFunctionStatus(final String tenant, final String namespace, f // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, functionName); + validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); } catch (IllegalArgumentException e) { log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) @@ -552,7 +578,7 @@ public Response getFunctionStatus(final String tenant, final String namespace, f return Response.status(Status.OK).entity(jsonResponse).build(); } - public Response listFunctions(final String tenant, final String namespace) { + public Response listFunctions(final String tenant, final String namespace, String componentType) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -562,16 +588,22 @@ public Response listFunctions(final String tenant, final String namespace) { try { validateListFunctionRequestParams(tenant, namespace); } catch (IllegalArgumentException e) { - log.error("Invalid listFunctions request @ /{}/{}", tenant, namespace, e); + log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - Collection functionStateList = functionMetaDataManager.listFunctions(tenant, namespace); + Collection functionStateList = functionMetaDataManager.listFunctions(tenant, namespace); + List retval = new LinkedList<>(); + for (FunctionMetaData functionMetaData : functionStateList) { + if (calculateSubjectType(functionMetaData).equals(componentType)) { + retval.add(functionMetaData.getFunctionDetails().getName()); + } + } - return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build(); + return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build(); } private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) { @@ -840,13 +872,13 @@ private void validateListFunctionRequestParams(String tenant, String namespace) private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName, String instanceId) throws IllegalArgumentException { - validateGetFunctionRequestParams(tenant, namespace, functionName); + validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); if (instanceId == null) { throw new IllegalArgumentException("Function Instance Id is not provided"); } } - private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName) + private void validateGetFunctionRequestParams(String tenant, String namespace, String subject, String subjectType) throws IllegalArgumentException { if (tenant == null) { @@ -855,12 +887,12 @@ private void validateGetFunctionRequestParams(String tenant, String namespace, S if (namespace == null) { throw new IllegalArgumentException("Namespace is not provided"); } - if (functionName == null) { - throw new IllegalArgumentException("Function Name is not provided"); + if (subject == null) { + throw new IllegalArgumentException(subjectType + " Name is not provided"); } } - private void validateDeregisterRequestParams(String tenant, String namespace, String functionName) + private void validateDeregisterRequestParams(String tenant, String namespace, String subject, String subjectType) throws IllegalArgumentException { if (tenant == null) { @@ -869,30 +901,30 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St if (namespace == null) { throw new IllegalArgumentException("Namespace is not provided"); } - if (functionName == null) { - throw new IllegalArgumentException("Function Name is not provided"); + if (subject == null) { + throw new IllegalArgumentException(subjectType + " Name is not provided"); } } - private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName, - String functionPkgUrl, String functionDetailsJson, String functionConfigJson, - String sourceConfigJson, String sinkConfigJson) + private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName, + String functionPkgUrl, String functionDetailsJson, String componentConfigJson, + String componentType) throws IllegalArgumentException, IOException, URISyntaxException { if (!isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } - FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null); + FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, + functionDetailsJson, componentConfigJson, componentType, functionPkgUrl, null); return functionDetails; } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson, - String functionConfigJson, String sourceConfigJson, String sinkConfigJson) + String componentConfigJson, String componentType) throws IllegalArgumentException, IOException, URISyntaxException { - FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile); + FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, + functionDetailsJson, componentConfigJson, componentType,null, uploadedInputStreamAsFile); if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) { throw new IllegalArgumentException("Function Package is not provided"); } @@ -964,40 +996,21 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { return null; } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - String functionDetailsJson, String functionConfigJson, String sourceConfigJson, - String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, + String functionDetailsJson, String componentConfigJson, String componentType, + String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } if (namespace == null) { throw new IllegalArgumentException("Namespace is not provided"); } - if (functionName == null) { - throw new IllegalArgumentException("Function Name is not provided"); + if (componentName == null) { + throw new IllegalArgumentException(String.format("%s Name is not provided", componentType)); } - int numDefinitions = 0; - if (!StringUtils.isEmpty(functionDetailsJson)) { - numDefinitions++; - } - if (!StringUtils.isEmpty(functionConfigJson)) { - numDefinitions++; - } - if (!StringUtils.isEmpty(sourceConfigJson)) { - numDefinitions++; - } - if (!StringUtils.isEmpty(sinkConfigJson)) { - numDefinitions++; - } - if (numDefinitions == 0) { - throw new IllegalArgumentException("Function Info is not provided"); - } - if (numDefinitions > 1) { - throw new IllegalArgumentException("Conflicting Info provided"); - } - if (!StringUtils.isEmpty(functionConfigJson)) { - FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class); + if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) { + FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); ClassLoader clsLoader = null; if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); @@ -1008,14 +1021,14 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader); return FunctionConfigUtils.convert(functionConfig, clsLoader); } - if (!StringUtils.isEmpty(sourceConfigJson)) { - SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class); + if (componentType.equals(SOURCE)) { + SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true); ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); return SourceConfigUtils.convert(sourceConfig, clsLoader); } - if (!StringUtils.isEmpty(sinkConfigJson)) { - SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class); + if (componentType.equals(SINK)) { + SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false); ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); return SinkConfigUtils.convert(sinkConfig, clsLoader); @@ -1260,4 +1273,23 @@ public boolean isSuperUser(String clientRole) { return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); } + public String calculateSubjectType(FunctionMetaData functionMetaData) { + SourceSpec sourceSpec = functionMetaData.getFunctionDetails().getSource(); + SinkSpec sinkSpec = functionMetaData.getFunctionDetails().getSink(); + if (sourceSpec.getInputSpecsCount() == 0) { + return SOURCE; + } + // Now its between sink and function + + if (!isEmpty(sinkSpec.getBuiltin())) { + // if its built in, its a sink + return SINK; + } + + if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) { + return FUNCTION; + } + return SINK; + } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index d6e1439cbf204..405f88f84cbc0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -60,7 +61,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId()); } @@ -77,7 +78,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId()); } @@ -86,7 +87,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}/{functionName}") public Response deregisterFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId()); } @GET @@ -96,7 +97,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionInfo( - tenant, namespace, functionName); + tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @GET @@ -123,7 +124,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, public Response listFunctions(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { return functions.listFunctions( - tenant, namespace); + tenant, namespace, FunctionsImpl.FUNCTION); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java index 151c2c1ccf422..488f47d335e71 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java @@ -25,6 +25,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -52,7 +53,7 @@ public Response registerSink(final @PathParam("tenant") String tenant, final @FormDataParam("sinkConfig") String sinkConfigJson) { return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId()); } @@ -68,7 +69,7 @@ public Response updateSink(final @PathParam("tenant") String tenant, final @FormDataParam("sinkConfig") String sinkConfigJson) { return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId()); } @@ -77,7 +78,7 @@ public Response updateSink(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}/{sinkName}") public Response deregisterSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId()); } @GET @@ -86,7 +87,7 @@ public Response getSinkInfo(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) throws IOException { - return functions.getFunctionInfo(tenant, namespace, sinkName); + return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK); } @GET @@ -111,7 +112,7 @@ public Response getSinkStatus(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}") public Response listSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { - return functions.listFunctions(tenant, namespace); + return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java index 44fac19d70593..3b1222ec66019 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java @@ -25,6 +25,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -52,7 +53,7 @@ public Response registerSource(final @PathParam("tenant") String tenant, final @FormDataParam("sourceConfig") String sourceConfigJson) { return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId()); } @@ -68,7 +69,7 @@ public Response updateSource(final @PathParam("tenant") String tenant, final @FormDataParam("sourceConfig") String sourceConfigJson) { return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId()); } @@ -77,7 +78,7 @@ public Response updateSource(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}/{sourceName}") public Response deregisterSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId()); + return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId()); } @GET @@ -86,7 +87,7 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) throws IOException { - return functions.getFunctionInfo(tenant, namespace, sourceName); + return functions.getFunctionInfo(tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @GET @@ -111,7 +112,7 @@ public Response getSourceStatus(final @PathParam("tenant") String tenant, @Path("/{tenant}/{namespace}") public Response listSources(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { - return functions.listFunctions(tenant, namespace); + return functions.listFunctions(tenant, namespace, FunctionsImpl.SOURCE); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index ac99f9a6a4bd0..7fc7c7f8a20a6 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -66,15 +66,16 @@ public void testListFunctions() throws PulsarClientException { mockPulsarClient())); Map functionMetaDataMap1 = new HashMap<>(); - functionMetaDataMap1.put("func-1", Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder().setName("func-1")).build()); - functionMetaDataMap1.put("func-2", - Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder().setName("func-2")).build()); + Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder().setName("func-1")).build(); + functionMetaDataMap1.put("func-1", f1); + Function.FunctionMetaData f2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder().setName("func-2")).build(); + functionMetaDataMap1.put("func-2", f2); + Function.FunctionMetaData f3 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder().setName("func-3")).build(); Map functionMetaDataInfoMap2 = new HashMap<>(); - functionMetaDataInfoMap2.put("func-3", - Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder().setName("func-3")).build()); + functionMetaDataInfoMap2.put("func-3", f3); functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>()); @@ -86,13 +87,13 @@ public void testListFunctions() throws PulsarClientException { Assert.assertEquals(2, functionMetaDataManager.listFunctions( "tenant-1", "namespace-1").size()); Assert.assertTrue(functionMetaDataManager.listFunctions( - "tenant-1", "namespace-1").contains("func-1")); + "tenant-1", "namespace-1").contains(f1)); Assert.assertTrue(functionMetaDataManager.listFunctions( - "tenant-1", "namespace-1").contains("func-2")); + "tenant-1", "namespace-1").contains(f2)); Assert.assertEquals(1, functionMetaDataManager.listFunctions( "tenant-1", "namespace-2").size()); Assert.assertTrue(functionMetaDataManager.listFunctions( - "tenant-1", "namespace-2").contains("func-3")); + "tenant-1", "namespace-2").contains(f3)); } @Test diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index d514faba692d1..460d0509ff604 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -21,10 +21,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.powermock.api.mockito.PowerMockito.doNothing; +import static org.powermock.api.mockito.PowerMockito.doReturn; import static org.powermock.api.mockito.PowerMockito.doThrow; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; @@ -62,6 +61,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.functions.utils.FunctionConfig; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -145,6 +145,7 @@ public void setup() { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); + doReturn("Function").when(this.resource).calculateSubjectType(any()); } // @@ -311,8 +312,7 @@ private void testRegisterFunctionMissingArguments( null, null, new Gson().toJson(functionConfig), - null, - null, + FunctionsImpl.FUNCTION, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -339,8 +339,7 @@ private Response registerDefaultFunction() { null, null, new Gson().toJson(functionConfig), - null, - null, + FunctionsImpl.FUNCTION, null); } @@ -600,8 +599,7 @@ private void testUpdateFunctionMissingArguments( null, null, new Gson().toJson(functionConfig), - null, - null, + FunctionsImpl.FUNCTION, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -629,8 +627,7 @@ private Response updateDefaultFunction() throws IOException { null, null, new Gson().toJson(functionConfig), - null, - null, + FunctionsImpl.FUNCTION, null); } @@ -714,8 +711,7 @@ public void testUpdateFunctionWithUrl() throws IOException { filePackageUrl, null, new Gson().toJson(functionConfig), - null, - null, + FunctionsImpl.FUNCTION, null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -804,7 +800,8 @@ private void testDeregisterFunctionMissingArguments( tenant, namespace, function, - null); + FunctionsImpl.FUNCTION, + null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); @@ -815,7 +812,8 @@ private Response deregisterDefaultFunction() { tenant, namespace, function, - null); + FunctionsImpl.FUNCTION, + null); } @Test @@ -910,7 +908,8 @@ private void testGetFunctionMissingArguments( Response response = resource.getFunctionInfo( tenant, namespace, - function); + function, + FunctionsImpl.FUNCTION); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); @@ -920,7 +919,8 @@ private Response getDefaultFunctionInfo() throws IOException { return resource.getFunctionInfo( tenant, namespace, - function); + function, + FunctionsImpl.FUNCTION); } @Test @@ -960,8 +960,8 @@ public void testGetFunctionSuccess() throws Exception { Response response = getDefaultFunctionInfo(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals( - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), - response.getEntity()); + new Gson().toJson(FunctionConfigUtils.convertFromDetails(functionDetails)), + response.getEntity()); } // @@ -991,7 +991,8 @@ private void testListFunctionsMissingArguments( ) { Response response = resource.listFunctions( tenant, - namespace); + namespace, + FunctionsImpl.FUNCTION); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); @@ -1000,13 +1001,46 @@ private void testListFunctionsMissingArguments( private Response listDefaultFunctions() { return resource.listFunctions( tenant, - namespace); + namespace, + FunctionsImpl.FUNCTION); } @Test public void testListFunctionsSuccess() throws Exception { List functions = Lists.newArrayList("test-1", "test-2"); - when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions); + List metaDataList = new LinkedList<>(); + FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build() + ).build(); + FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build() + ).build(); + metaDataList.add(functionMetaData1); + metaDataList.add(functionMetaData2); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList); + + Response response = listDefaultFunctions(); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + assertEquals(new Gson().toJson(functions), response.getEntity()); + } + + @Test + public void testOnlyGetSources() throws Exception { + List functions = Lists.newArrayList("test-2"); + List functionMetaDataList = new LinkedList<>(); + FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build()).build(); + functionMetaDataList.add(f1); + FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build()).build(); + functionMetaDataList.add(f2); + FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-3").build()).build(); + functionMetaDataList.add(f3); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); + doReturn("Source").when(this.resource).calculateSubjectType(f1); + doReturn("Function").when(this.resource).calculateSubjectType(f2); + doReturn("Sink").when(this.resource).calculateSubjectType(f3); Response response = listDefaultFunctions(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -1068,7 +1102,7 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException { functionConfig.setOutput(outputTopic); functionConfig.setOutputSerdeClassName(outputSerdeClassName); Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, - null, new Gson().toJson(functionConfig), null, null, null); + null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 4e52c954423f0..315f56d2f0f50 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker.rest.api.v2; +import com.google.common.collect.Lists; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; @@ -26,6 +27,8 @@ import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.runtime.RuntimeFactory; @@ -38,6 +41,7 @@ import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.testng.Assert; @@ -51,6 +55,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -143,6 +149,7 @@ public void setup() throws Exception { doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean()); mockStatic(SinkConfigUtils.class); when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build()); + Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any()); } // @@ -188,7 +195,7 @@ public void testRegisterSinkMissingFunctionName() throws IOException { topicsToSerDeClassName, className, parallelism, - "Function Name is not provided"); + "Sink Name is not provided"); } @Test @@ -257,9 +264,8 @@ private void testRegisterSinkMissingArguments( details, null, null, - null, - null, new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -282,9 +288,8 @@ private Response registerDefaultSink() { mockedFormData, null, null, - null, - null, new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, null); } @@ -296,7 +301,7 @@ public void testRegisterExistedSink() throws IOException { Response response = registerDefaultSink(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Sink " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -421,7 +426,7 @@ public void testUpdateSinkMissingFunctionName() throws IOException { topicsToSerDeClassName, className, parallelism, - "Function Name is not provided"); + "Sink Name is not provided"); } @Test @@ -492,9 +497,8 @@ private void testUpdateSinkMissingArguments( details, null, null, - null, - null, new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -518,9 +522,8 @@ private Response updateDefaultSink() throws IOException { mockedFormData, null, null, - null, - null, new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, null); } @@ -530,7 +533,7 @@ public void testUpdateNotExistedSink() throws IOException { Response response = updateDefaultSink(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -600,9 +603,8 @@ public void testUpdateSinkWithUrl() throws IOException { null, filePackageUrl, null, - null, - null, new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -678,7 +680,7 @@ public void testDeregisterSinkMissingFunctionName() throws Exception { tenant, namespace, null, - "Function Name"); + "Sink Name"); } private void testDeregisterSinkMissingArguments( @@ -691,6 +693,7 @@ private void testDeregisterSinkMissingArguments( tenant, namespace, sink, + FunctionsImpl.SINK, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -702,6 +705,7 @@ private Response deregisterDefaultSink() { tenant, namespace, sink, + FunctionsImpl.SINK, null); } @@ -711,7 +715,7 @@ public void testDeregisterNotExistedSink() { Response response = deregisterDefaultSink(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -757,109 +761,116 @@ public void testDeregisterSinkInterrupted() throws Exception { assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason); } - // Source Info doesn't exist. Maybe one day they might be added // - // Get Function Info + // Get Sink Info // - /* @Test - public void testGetFunctionMissingTenant() throws Exception { - testGetFunctionMissingArguments( + public void testGetSinkMissingTenant() throws Exception { + testGetSinkMissingArguments( null, namespace, - source, + sink, "Tenant"); } @Test - public void testGetFunctionMissingNamespace() throws Exception { - testGetFunctionMissingArguments( + public void testGetSinkMissingNamespace() throws Exception { + testGetSinkMissingArguments( tenant, null, - source, + sink, "Namespace"); } @Test - public void testGetFunctionMissingFunctionName() throws Exception { - testGetFunctionMissingArguments( + public void testGetSinkMissingFunctionName() throws Exception { + testGetSinkMissingArguments( tenant, namespace, null, - "Function Name"); + "Sink Name"); } - private void testGetFunctionMissingArguments( + private void testGetSinkMissingArguments( String tenant, String namespace, - String function, + String sink, String missingFieldName ) throws IOException { Response response = resource.getFunctionInfo( tenant, namespace, - function); + sink, + FunctionsImpl.SINK); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } - private Response getDefaultFunctionInfo() throws IOException { + private Response getDefaultSinkInfo() throws IOException { return resource.getFunctionInfo( tenant, namespace, - source); + sink, + FunctionsImpl.SINK); } @Test - public void testGetNotExistedFunction() throws IOException { - when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); + public void testGetNotExistedSink() throws IOException { + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false); - Response response = getDefaultFunctionInfo(); + Response response = getDefaultSinkInfo(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test - public void testGetFunctionSuccess() throws Exception { - when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + public void testGetSinkSuccess() throws Exception { + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); - SinkSpec sinkSpec = SinkSpec.newBuilder() - .setTopic(outputTopic) - .setSerDeClassName(outputSerdeClassName).build(); + Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder() + .setSubscriptionType(Function.SubscriptionType.SHARED) + .setSubscriptionName(subscriptionName) + .putInputSpecs("input", Function.ConsumerSpec.newBuilder() + .setSerdeClassName(TopicSchema.DEFAULT_SERDE) + .setIsRegexPattern(false) + .build()).build(); + Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder() + .setBuiltin("jdbc") + .build(); FunctionDetails functionDetails = FunctionDetails.newBuilder() - .setClassName(className) + .setClassName(IdentityFunction.class.getName()) .setSink(sinkSpec) - .setName(source) + .setName(sink) .setNamespace(namespace) - .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE) + .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE) .setTenant(tenant) .setParallelism(parallelism) - .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) - .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); + .setRuntime(FunctionDetails.Runtime.JAVA) + .setSource(sourceSpec).build(); FunctionMetaData metaData = FunctionMetaData.newBuilder() .setCreateTime(System.currentTimeMillis()) .setFunctionDetails(functionDetails) - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package")) + .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package")) .setVersion(1234) .build(); - when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData); - Response response = getDefaultFunctionInfo(); + Response response = getDefaultSinkInfo(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals( - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + new Gson().toJson(SinkConfigUtils.convertFromDetails(functionDetails)), response.getEntity()); } // - // List Functions + // List Sinks // @Test - public void testListFunctionsMissingTenant() throws Exception { - testListFunctionsMissingArguments( + public void testListSinksMissingTenant() throws Exception { + testListSinksMissingArguments( null, namespace, "Tenant"); @@ -867,39 +878,70 @@ public void testListFunctionsMissingTenant() throws Exception { @Test public void testListFunctionsMissingNamespace() throws Exception { - testListFunctionsMissingArguments( + testListSinksMissingArguments( tenant, null, "Namespace"); } - private void testListFunctionsMissingArguments( + private void testListSinksMissingArguments( String tenant, String namespace, String missingFieldName ) { Response response = resource.listFunctions( tenant, - namespace); + namespace, + FunctionsImpl.SINK); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } - private Response listDefaultFunctions() { + private Response listDefaultSinks() { return resource.listFunctions( tenant, - namespace); + namespace, + FunctionsImpl.SINK); } @Test - public void testListFunctionsSuccess() throws Exception { + public void testListSinksSuccess() throws Exception { List functions = Lists.newArrayList("test-1", "test-2"); - when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions); + List functionMetaDataList = new LinkedList<>(); + functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build() + ).build()); + functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build() + ).build()); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); + + Response response = listDefaultSinks(); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + assertEquals(new Gson().toJson(functions), response.getEntity()); + } - Response response = listDefaultFunctions(); + @Test + public void testOnlyGetSinks() throws Exception { + List functions = Lists.newArrayList("test-3"); + List functionMetaDataList = new LinkedList<>(); + FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build()).build(); + functionMetaDataList.add(f1); + FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build()).build(); + functionMetaDataList.add(f2); + FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-3").build()).build(); + functionMetaDataList.add(f3); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); + doReturn("Source").when(this.resource).calculateSubjectType(f1); + doReturn("Function").when(this.resource).calculateSubjectType(f2); + doReturn("Sink").when(this.resource).calculateSubjectType(f3); + + Response response = listDefaultSinks(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(new Gson().toJson(functions), response.getEntity()); } - */ } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 1ca869ea8bbac..eee684f4d6ca5 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker.rest.api.v2; +import com.google.common.collect.Lists; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; @@ -26,6 +27,7 @@ import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function.*; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.source.TopicSchema; @@ -37,6 +39,7 @@ import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.testng.Assert; @@ -48,6 +51,8 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.*; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -133,6 +138,7 @@ public void setup() throws Exception { doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean()); mockStatic(SourceConfigUtils.class); when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build()); + Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any()); } // @@ -181,7 +187,7 @@ public void testRegisterSourceMissingFunctionName() throws IOException { outputSerdeClassName, className, parallelism, - "Function Name is not provided"); + "Source Name is not provided"); } @Test @@ -256,9 +262,8 @@ private void testRegisterSourceMissingArguments( details, null, null, - null, new Gson().toJson(sourceConfig), - null, + FunctionsImpl.SOURCE, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -282,9 +287,8 @@ private Response registerDefaultSource() { mockedFormData, null, null, - null, new Gson().toJson(sourceConfig), - null, + FunctionsImpl.SOURCE, null); } @@ -296,7 +300,7 @@ public void testRegisterExistedSource() throws IOException { Response response = registerDefaultSource(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Source " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -424,7 +428,7 @@ public void testUpdateSourceMissingFunctionName() throws IOException { outputSerdeClassName, className, parallelism, - "Function Name is not provided"); + "Source Name is not provided"); } @Test @@ -501,9 +505,8 @@ private void testUpdateSourceMissingArguments( details, null, null, - null, new Gson().toJson(sourceConfig), - null, + FunctionsImpl.SOURCE, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -528,9 +531,8 @@ private Response updateDefaultSource() throws IOException { mockedFormData, null, null, - null, new Gson().toJson(sourceConfig), - null, + FunctionsImpl.SOURCE, null); } @@ -540,7 +542,7 @@ public void testUpdateNotExistedSource() throws IOException { Response response = updateDefaultSource(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -611,9 +613,8 @@ public void testUpdateSourceWithUrl() throws IOException { null, filePackageUrl, null, - null, new Gson().toJson(sourceConfig), - null, + FunctionsImpl.SOURCE, null); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -689,7 +690,7 @@ public void testDeregisterSourceMissingFunctionName() throws Exception { tenant, namespace, null, - "Function Name"); + "Source Name"); } private void testDeregisterSourceMissingArguments( @@ -702,6 +703,7 @@ private void testDeregisterSourceMissingArguments( tenant, namespace, function, + FunctionsImpl.SOURCE, null); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -713,6 +715,7 @@ private Response deregisterDefaultSource() { tenant, namespace, source, + FunctionsImpl.SOURCE, null); } @@ -722,7 +725,7 @@ public void testDeregisterNotExistedSource() { Response response = deregisterDefaultSource(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -768,15 +771,13 @@ public void testDeregisterSourceInterrupted() throws Exception { assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason); } - // Source Info doesn't exist. Maybe one day they might be added // - // Get Function Info + // Get Source Info // - /* @Test - public void testGetFunctionMissingTenant() throws Exception { - testGetFunctionMissingArguments( + public void testGetSourceMissingTenant() throws Exception { + testGetSourceMissingArguments( null, namespace, source, @@ -784,8 +785,8 @@ public void testGetFunctionMissingTenant() throws Exception { } @Test - public void testGetFunctionMissingNamespace() throws Exception { - testGetFunctionMissingArguments( + public void testGetSourceMissingNamespace() throws Exception { + testGetSourceMissingArguments( tenant, null, source, @@ -793,62 +794,66 @@ public void testGetFunctionMissingNamespace() throws Exception { } @Test - public void testGetFunctionMissingFunctionName() throws Exception { - testGetFunctionMissingArguments( + public void testGetSourceMissingFunctionName() throws Exception { + testGetSourceMissingArguments( tenant, namespace, null, - "Function Name"); + "Source Name"); } - private void testGetFunctionMissingArguments( + private void testGetSourceMissingArguments( String tenant, String namespace, - String function, + String source, String missingFieldName ) throws IOException { Response response = resource.getFunctionInfo( tenant, namespace, - function); + source, + FunctionsImpl.SOURCE); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } - private Response getDefaultFunctionInfo() throws IOException { + private Response getDefaultSourceInfo() throws IOException { return resource.getFunctionInfo( tenant, namespace, - source); + source, + FunctionsImpl.SOURCE); } @Test - public void testGetNotExistedFunction() throws IOException { + public void testGetNotExistedSource() throws IOException { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); - Response response = getDefaultFunctionInfo(); + Response response = getDefaultSourceInfo(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); + assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test - public void testGetFunctionSuccess() throws Exception { + public void testGetSourceSuccess() throws Exception { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + SourceSpec sourceSpec = SourceSpec.newBuilder().setBuiltin("jdbc").build(); SinkSpec sinkSpec = SinkSpec.newBuilder() .setTopic(outputTopic) .setSerDeClassName(outputSerdeClassName).build(); FunctionDetails functionDetails = FunctionDetails.newBuilder() - .setClassName(className) + .setClassName(IdentityFunction.class.getName()) .setSink(sinkSpec) .setName(source) .setNamespace(namespace) - .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE) + .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE) + .setRuntime(FunctionDetails.Runtime.JAVA) + .setAutoAck(true) .setTenant(tenant) .setParallelism(parallelism) - .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) - .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); + .setSource(sourceSpec).build(); FunctionMetaData metaData = FunctionMetaData.newBuilder() .setCreateTime(System.currentTimeMillis()) .setFunctionDetails(functionDetails) @@ -857,60 +862,90 @@ public void testGetFunctionSuccess() throws Exception { .build(); when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData); - Response response = getDefaultFunctionInfo(); + Response response = getDefaultSourceInfo(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals( - org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), + new Gson().toJson(SourceConfigUtils.convertFromDetails(functionDetails)), response.getEntity()); } // - // List Functions + // List Sources // @Test - public void testListFunctionsMissingTenant() throws Exception { - testListFunctionsMissingArguments( + public void testListSourcesMissingTenant() throws Exception { + testListSourcesMissingArguments( null, namespace, "Tenant"); } @Test - public void testListFunctionsMissingNamespace() throws Exception { - testListFunctionsMissingArguments( + public void testListSourcesMissingNamespace() throws Exception { + testListSourcesMissingArguments( tenant, null, "Namespace"); } - private void testListFunctionsMissingArguments( + private void testListSourcesMissingArguments( String tenant, String namespace, String missingFieldName ) { Response response = resource.listFunctions( tenant, - namespace); + namespace, + FunctionsImpl.SOURCE); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } - private Response listDefaultFunctions() { + private Response listDefaultSources() { return resource.listFunctions( tenant, - namespace); + namespace,FunctionsImpl.SOURCE); } @Test - public void testListFunctionsSuccess() throws Exception { + public void testListSourcesSuccess() throws Exception { List functions = Lists.newArrayList("test-1", "test-2"); - when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions); + List functionMetaDataList = new LinkedList<>(); + functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build() + ).build()); + functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build() + ).build()); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); + + Response response = listDefaultSources(); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + assertEquals(new Gson().toJson(functions), response.getEntity()); + } - Response response = listDefaultFunctions(); + @Test + public void testOnlyGetSources() throws Exception { + List functions = Lists.newArrayList("test-1"); + List functionMetaDataList = new LinkedList<>(); + FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-1").build()).build(); + functionMetaDataList.add(f1); + FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-2").build()).build(); + functionMetaDataList.add(f2); + FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails( + FunctionDetails.newBuilder().setName("test-3").build()).build(); + functionMetaDataList.add(f3); + when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); + doReturn("Source").when(this.resource).calculateSubjectType(f1); + doReturn("Function").when(this.resource).calculateSubjectType(f2); + doReturn("Sink").when(this.resource).calculateSubjectType(f3); + + Response response = listDefaultSources(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(new Gson().toJson(functions), response.getEntity()); } - */ } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index b8452f1d9786c..6659fcf1fdca2 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -214,7 +214,7 @@ protected void getSinkInfoSuccess(SinkTester tester, boolean builtin) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "sink", "get", "--tenant", tenant, "--namespace", namespace, @@ -224,7 +224,7 @@ protected void getSinkInfoSuccess(SinkTester tester, log.info("Get sink info : {}", result.getStdout()); if (builtin) { assertTrue( - result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase() + "\""), + result.getStdout().contains("\"archive\": \"builtin://" + tester.getSinkType().name().toLowerCase() + "\""), result.getStdout() ); } else { @@ -366,7 +366,7 @@ protected void deleteSink(String tenant, String namespace, String sinkName) thro protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "sink", "get", "--tenant", tenant, "--namespace", namespace, @@ -376,7 +376,7 @@ protected void getSinkInfoNotFound(String tenant, String namespace, String sinkN pulsarCluster.getAnyWorker().execCmd(commands); fail("Command should have exited with non-zero"); } catch (ContainerExecException e) { - assertTrue(e.getResult().getStderr().contains("Reason: Function " + sinkName + " doesn't exist")); + assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist")); } } @@ -465,7 +465,7 @@ protected void getSourceInfoSuccess(SourceTester tester, String sourceName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "source", "get", "--tenant", tenant, "--namespace", namespace, @@ -474,7 +474,7 @@ protected void getSourceInfoSuccess(SourceTester tester, ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands); log.info("Get source info : {}", result.getStdout()); assertTrue( - result.getStdout().contains("\"builtin\": \"" + tester.getSourceType() + "\""), + result.getStdout().contains("\"archive\": \"builtin://" + tester.getSourceType() + "\""), result.getStdout() ); } @@ -564,7 +564,7 @@ protected void deleteSource(String tenant, String namespace, String sourceName) protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "source", "get", "--tenant", tenant, "--namespace", namespace, @@ -574,7 +574,7 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou pulsarCluster.getAnyWorker().execCmd(commands); fail("Command should have exited with non-zero"); } catch (ContainerExecException e) { - assertTrue(e.getResult().getStderr().contains("Reason: Function " + sourceName + " doesn't exist")); + assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); } }