From aa40ad6ddeb4208d439df2c9080fc73b1e6ffa3b Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Sun, 26 May 2019 21:48:16 -0700 Subject: [PATCH] renaming sinks and sources api to be consistent with the rest of Pulsar (#4363) * Rename sources and sinks CLI to be consistent with rest of Pulsar * renaming sinks and sources api to be consistent with the rest of Pulsar * use new interfaces in cmd --- pulsar-broker/pom.xml | 4 +- .../impl/{SinkBase.java => SinksBase.java} | 10 +- .../{SourceBase.java => SourcesBase.java} | 12 +- .../apache/pulsar/broker/admin/v3/Sink.java | 8 +- .../apache/pulsar/broker/admin/v3/Sinks.java | 34 ++ .../apache/pulsar/broker/admin/v3/Source.java | 8 +- .../pulsar/broker/admin/v3/Sources.java | 34 ++ .../pulsar/client/admin/PulsarAdmin.java | 38 +- .../org/apache/pulsar/client/admin/Sink.java | 329 +--------------- .../org/apache/pulsar/client/admin/Sinks.java | 350 ++++++++++++++++++ .../apache/pulsar/client/admin/Source.java | 330 +---------------- .../apache/pulsar/client/admin/Sources.java | 350 ++++++++++++++++++ .../{SinkImpl.java => SinksImpl.java} | 5 +- .../{SourceImpl.java => SourcesImpl.java} | 5 +- .../org/apache/pulsar/admin/cli/CmdSinks.java | 36 +- .../apache/pulsar/admin/cli/CmdSources.java | 36 +- .../pulsar/admin/cli/PulsarAdminTool.java | 29 +- .../apache/pulsar/admin/cli/TestCmdSinks.java | 8 +- .../pulsar/admin/cli/TestCmdSources.java | 8 +- .../functions/worker/rest/Resources.java | 12 +- .../api/{SinkImpl.java => SinksImpl.java} | 4 +- .../api/{SourceImpl.java => SourcesImpl.java} | 4 +- ...ource.java => FunctionsApiV2Resource.java} | 4 +- ...ource.java => FunctionsApiV3Resource.java} | 4 +- .../worker/rest/api/v3/SinkApiV3Resource.java | 224 +---------- .../rest/api/v3/SinksApiV3Resource.java | 241 ++++++++++++ .../rest/api/v3/SourceApiV3Resource.java | 228 +----------- .../rest/api/v3/SourcesApiV3Resource.java | 257 +++++++++++++ .../api/v2/FunctionApiV2ResourceTest.java | 2 +- .../api/v3/FunctionApiV3ResourceTest.java | 5 +- .../rest/api/v3/SinkApiV3ResourceTest.java | 8 +- .../rest/api/v3/SourceApiV3ResourceTest.java | 8 +- 32 files changed, 1448 insertions(+), 1187 deletions(-) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/{SinkBase.java => SinksBase.java} (98%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/{SourceBase.java => SourcesBase.java} (97%) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java create mode 100644 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java create mode 100644 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java rename pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/{SinkImpl.java => SinksImpl.java} (98%) rename pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/{SourceImpl.java => SourcesImpl.java} (98%) rename pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/{SinkImpl.java => SinksImpl.java} (99%) rename pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/{SourceImpl.java => SourcesImpl.java} (99%) rename pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/{FunctionApiV2Resource.java => FunctionsApiV2Resource.java} (99%) rename pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/{FunctionApiV3Resource.java => FunctionsApiV3Resource.java} (99%) create mode 100644 pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java create mode 100644 pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index ac63f007437a7..c9f7e364ea86b 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -404,7 +404,7 @@ false - org.apache.pulsar.broker.admin.v3.Source + org.apache.pulsar.broker.admin.v3.Sources http,https /admin/v3 @@ -421,7 +421,7 @@ false - org.apache.pulsar.broker.admin.v3.Sink + org.apache.pulsar.broker.admin.v3.Sinks http,https /admin/v3 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index f15003cf15401..50839d825b901 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -28,7 +28,7 @@ import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.functions.worker.WorkerService; -import org.apache.pulsar.functions.worker.rest.api.SinkImpl; +import org.apache.pulsar.functions.worker.rest.api.SinksImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -47,12 +47,12 @@ import java.util.List; import java.util.function.Supplier; -public class SinkBase extends AdminResource implements Supplier { +public class SinksBase extends AdminResource implements Supplier { - private final SinkImpl sink; + private final SinksImpl sink; - public SinkBase() { - this.sink = new SinkImpl(this); + public SinksBase() { + this.sink = new SinksImpl(this); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index f55337e582612..e65eda7084320 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -28,7 +28,7 @@ import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.functions.worker.WorkerService; -import org.apache.pulsar.functions.worker.rest.api.SourceImpl; +import org.apache.pulsar.functions.worker.rest.api.SourcesImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -47,12 +47,12 @@ import java.util.List; import java.util.function.Supplier; -public class SourceBase extends AdminResource implements Supplier { +public class SourcesBase extends AdminResource implements Supplier { - private final SourceImpl source; + private final SourcesImpl source; - public SourceBase() { - this.source = new SourceImpl(this); + public SourcesBase() { + this.source = new SourcesImpl(this); } @Override @@ -188,6 +188,7 @@ public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant, @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) + @Consumes(MediaType.APPLICATION_JSON) @Path("/{tenant}/{namespace}") public List listSources(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { @@ -291,6 +292,7 @@ public void startSource(final @PathParam("tenant") String tenant, @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 408, message = "Request timeout") }) + @Produces(MediaType.APPLICATION_JSON) @Path("/builtinsources") public List getSourceList() { List connectorDefinitions = source.getListOfConnectors(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java index 9191d8c0a9400..667e88e391271 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.admin.v3; import io.swagger.annotations.Api; -import org.apache.pulsar.broker.admin.impl.SinkBase; +import org.apache.pulsar.broker.admin.impl.SinksBase; import javax.ws.rs.Consumes; import javax.ws.rs.Path; @@ -30,5 +30,9 @@ @Api(value = "/sink", description = "Sink admin apis", tags = "sink") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class Sink extends SinkBase { +@Deprecated +/** + * @deprecated in favor of {@link Sinks} + */ +public class Sink extends SinksBase { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java new file mode 100644 index 0000000000000..a693ea204e1f0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v3; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SinksBase; + +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/sinks") +@Api(value = "/sinks", description = "Sinks admin apis", tags = "sinks") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class Sinks extends SinksBase { +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java index b8acd193ba290..f68d2287ee8ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.admin.v3; import io.swagger.annotations.Api; -import org.apache.pulsar.broker.admin.impl.SourceBase; +import org.apache.pulsar.broker.admin.impl.SourcesBase; import javax.ws.rs.Consumes; import javax.ws.rs.Path; @@ -30,5 +30,9 @@ @Api(value = "/source", description = "Source admin apis", tags = "source") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class Source extends SourceBase { +@Deprecated +/** + * @deprecated in favor of {@link Sources} + */ +public class Source extends SourcesBase { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java new file mode 100644 index 0000000000000..a36ab08d7ecaa --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v3; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SourcesBase; + +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/sources") +@Api(value = "/sources", description = "Sources admin apis", tags = "sources") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class Sources extends SourcesBase { +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index 9ab3d6df84a20..2f182f78468a9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -40,8 +40,8 @@ import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl; import org.apache.pulsar.client.admin.internal.SchemasImpl; -import org.apache.pulsar.client.admin.internal.SinkImpl; -import org.apache.pulsar.client.admin.internal.SourceImpl; +import org.apache.pulsar.client.admin.internal.SinksImpl; +import org.apache.pulsar.client.admin.internal.SourcesImpl; import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.admin.internal.WorkerImpl; @@ -87,8 +87,8 @@ public class PulsarAdmin implements Closeable { private final String serviceUrl; private final Lookup lookups; private final Functions functions; - private final Source source; - private final Sink sink; + private final Sources sources; + private final Sinks sinks; private final Worker worker; private final Schemas schemas; protected final WebTarget root; @@ -193,8 +193,8 @@ public PulsarAdmin(String serviceUrl, this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.lookups = new LookupImpl(root, auth, useTls); this.functions = new FunctionsImpl(root, auth, httpAsyncClient); - this.source = new SourceImpl(root, auth, httpAsyncClient); - this.sink = new SinkImpl(root, auth, httpAsyncClient); + this.sources = new SourcesImpl(root, auth, httpAsyncClient); + this.sinks = new SinksImpl(root, auth, httpAsyncClient); this.worker = new WorkerImpl(root, auth); this.schemas = new SchemasImpl(root, auth); this.bookies = new BookiesImpl(root, auth); @@ -340,23 +340,35 @@ public Functions functions() { } /** - * - * @return the source management object + * @return the sources management object + * @deprecated in favor of {@link #sources()} */ + @Deprecated public Source source() { - return source; + return (Source) sources; + } + + public Sources sources() { + return sources; } /** - * - * @return the sink management object + * @return the sinks management object + * @deprecated in favor of {@link #sinks} */ + @Deprecated public Sink sink() { - return sink; + return (Sink) sinks; + } + + /** + * @return the sinks management object + */ + public Sinks sinks() { + return sinks; } /** - * * @return the Worker stats */ public Worker worker() { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java index 8db53aaf72fad..c270a758e0f7a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java @@ -18,333 +18,10 @@ */ package org.apache.pulsar.client.admin; -import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; -import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; -import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; -import org.apache.pulsar.common.functions.UpdateOptions; -import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.policies.data.SinkStatus; -import org.apache.pulsar.common.io.SinkConfig; - -import java.util.List; - /** - * Admin interface for Sink management. + * @deprecated in favor of {@link Sinks} */ -public interface Sink { - /** - * Get the list of sinks. - *

- * Get the list of all the Pulsar Sinks. - *

- * Response Example: - * - *

-     * ["f1", "f2", "f3"]
-     * 
- * - * @throws NotAuthorizedException - * Don't have admin permission - * @throws PulsarAdminException - * Unexpected error - */ - List listSinks(String tenant, String namespace) throws PulsarAdminException; - - /** - * Get the configuration for the specified sink. - *

- * Response Example: - * - *

-     * { serviceUrl : "http://my-broker.example.com:8080/" }
-     * 
- * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @return the sink configuration - * - * @throws NotAuthorizedException - * You don't have admin permission to get the configuration of the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException; - - /** - * Create a new sink. - * - * @param sinkConfig - * the sink configuration object - * - * @throws PulsarAdminException - * Unexpected error - */ - void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; - - /** - *
-     * Create a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sinkConfig - * the sink configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @throws PulsarAdminException - */ - void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; - - /** - * Update the configuration for a sink. - *

- * - * @param sinkConfig - * the sink configuration object - * - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; - - /** - * Update the configuration for a sink. - *

- * - * @param sinkConfig - * the sink configuration object - * @param updateOptions - * options for the update operations - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException; - - /** - * Update the configuration for a sink. - *

-     * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sinkConfig - * the sink configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; - - /** - * Update the configuration for a sink. - *
-     * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sinkConfig - * the sink configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @param updateOptions - * options for the update operations - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException; - - /** - * Delete an existing sink - *

- * Delete a sink - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @throws NotAuthorizedException - * You don't have admin permission - * @throws NotFoundException - * Cluster does not exist - * @throws PreconditionFailedException - * Cluster is not empty - * @throws PulsarAdminException - * Unexpected error - */ - void deleteSink(String tenant, String namespace, String sink) throws PulsarAdminException; - - /** - * Gets the current status of a sink. - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @throws PulsarAdminException - * Unexpected error - */ - SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException; - - /** - * Gets the current status of a sink instance. - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * @param id - * Sink instance-id - * @return - * @throws PulsarAdminException - */ - SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) - throws PulsarAdminException; - - /** - * Restart sink instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @param instanceId - * Sink instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; - - /** - * Restart all sink instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @throws PulsarAdminException - * Unexpected error - */ - void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException; - - - /** - * Stop sink instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @param instanceId - * Sink instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; - - /** - * Stop all sink instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @throws PulsarAdminException - * Unexpected error - */ - void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException; - - /** - * Start sink instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @param instanceId - * Sink instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void startSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; - - /** - * Start all sink instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param sink - * Sink name - * - * @throws PulsarAdminException - * Unexpected error - */ - void startSink(String tenant, String namespace, String sink) throws PulsarAdminException; - +@Deprecated +public interface Sink extends Sinks { - /** - * Fetches a list of supported Pulsar IO sinks currently running in cluster mode - * - * @throws PulsarAdminException - * Unexpected error - * - */ - List getBuiltInSinks() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java new file mode 100644 index 0000000000000..afa44e0c43944 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; +import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.common.io.SinkConfig; + +import java.util.List; + +/** + * Admin interface for Sink management. + */ +public interface Sinks { + /** + * Get the list of sinks. + *

+ * Get the list of all the Pulsar Sinks. + *

+ * Response Example: + * + *

+     * ["f1", "f2", "f3"]
+     * 
+ * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws PulsarAdminException + * Unexpected error + */ + List listSinks(String tenant, String namespace) throws PulsarAdminException; + + /** + * Get the configuration for the specified sink. + *

+ * Response Example: + * + *

+     * { serviceUrl : "http://my-broker.example.com:8080/" }
+     * 
+ * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @return the sink configuration + * + * @throws NotAuthorizedException + * You don't have admin permission to get the configuration of the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Create a new sink. + * + * @param sinkConfig + * the sink configuration object + * + * @throws PulsarAdminException + * Unexpected error + */ + void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; + + /** + *
+     * Create a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sinkConfig + * the sink configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws PulsarAdminException + */ + void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + *

+ * + * @param sinkConfig + * the sink configuration object + * + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + *

+ * + * @param sinkConfig + * the sink configuration object + * @param updateOptions + * options for the update operations + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + *

+     * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sinkConfig + * the sink configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + *
+     * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sinkConfig + * the sink configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @param updateOptions + * options for the update operations + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException; + + /** + * Delete an existing sink + *

+ * Delete a sink + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws NotAuthorizedException + * You don't have admin permission + * @throws NotFoundException + * Cluster does not exist + * @throws PreconditionFailedException + * Cluster is not empty + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Gets the current status of a sink. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Gets the current status of a sink instance. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * @param id + * Sink instance-id + * @return + * @throws PulsarAdminException + */ + SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) + throws PulsarAdminException; + + /** + * Restart sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Restart all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + + /** + * Stop sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Stop all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Start sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Start all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + + /** + * Fetches a list of supported Pulsar IO sinks currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + List getBuiltInSinks() throws PulsarAdminException; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java index 058364ec3fc33..aba325113f22d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java @@ -18,333 +18,9 @@ */ package org.apache.pulsar.client.admin; -import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; -import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; -import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; -import org.apache.pulsar.common.functions.UpdateOptions; -import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.io.SourceConfig; -import org.apache.pulsar.common.policies.data.SourceStatus; - -import java.util.List; - /** - * Admin interface for Source management. + * @deprecated in favor of {@link Sources} */ -public interface Source { - /** - * Get the list of sources. - *

- * Get the list of all the Pulsar Sources. - *

- * Response Example: - * - *

-     * ["f1", "f2", "f3"]
-     * 
- * - * @throws NotAuthorizedException - * Don't have admin permission - * @throws PulsarAdminException - * Unexpected error - */ - List listSources(String tenant, String namespace) throws PulsarAdminException; - - /** - * Get the configuration for the specified source. - *

- * Response Example: - * - *

-     * { serviceUrl : "http://my-broker.example.com:8080/" }
-     * 
- * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @return the source configuration - * - * @throws NotAuthorizedException - * You don't have admin permission to get the configuration of the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException; - - /** - * Create a new source. - * - * @param sourceConfig - * the source configuration object - * - * @throws PulsarAdminException - * Unexpected error - */ - void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; - - /** - *
-     * Create a new source by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sourceConfig - * the source configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @throws PulsarAdminException - */ - void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; - - /** - * Update the configuration for a source. - *

- * - * @param sourceConfig - * the source configuration object - * - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; - - /** - * Update the configuration for a source. - *

- * - * @param sourceConfig - * the source configuration object - * @param updateOptions - * options for the update operations - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSource(SourceConfig sourceConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException; - - /** - * Update the configuration for a source. - *

-     * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sourceConfig - * the source configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; - - /** - * Update the configuration for a source. - *
-     * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * 
- * - * @param sourceConfig - * the source configuration object - * @param pkgUrl - * url from which pkg can be downloaded - * @param updateOptions - * options for the update operations - * @throws NotAuthorizedException - * You don't have admin permission to create the cluster - * @throws NotFoundException - * Cluster doesn't exist - * @throws PulsarAdminException - * Unexpected error - */ - void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException; - - /** - * Delete an existing source - *

- * Delete a source - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @throws NotAuthorizedException - * You don't have admin permission - * @throws NotFoundException - * Cluster does not exist - * @throws PreconditionFailedException - * Cluster is not empty - * @throws PulsarAdminException - * Unexpected error - */ - void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException; - - /** - * Gets the current status of a source. - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @throws PulsarAdminException - * Unexpected error - */ - SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException; - - /** - * Gets the current status of a source instance. - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * @param id - * Source instance-id - * @return - * @throws PulsarAdminException - */ - SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) - throws PulsarAdminException; - - /** - * Restart source instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @param instanceId - * Source instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; - - /** - * Restart all source instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @throws PulsarAdminException - * Unexpected error - */ - void restartSource(String tenant, String namespace, String source) throws PulsarAdminException; - - - /** - * Stop source instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @param instanceId - * Source instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; - - /** - * Stop all source instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @throws PulsarAdminException - * Unexpected error - */ - void stopSource(String tenant, String namespace, String source) throws PulsarAdminException; - - /** - * Start source instance - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @param instanceId - * Source instanceId - * - * @throws PulsarAdminException - * Unexpected error - */ - void startSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; - - /** - * Start all source instances - * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param source - * Source name - * - * @throws PulsarAdminException - * Unexpected error - */ - void startSource(String tenant, String namespace, String source) throws PulsarAdminException; - - - /** - * Fetches a list of supported Pulsar IO sources currently running in cluster mode - * - * @throws PulsarAdminException - * Unexpected error - * - */ - List getBuiltInSources() throws PulsarAdminException; +@Deprecated +public interface Source extends Sources { } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java new file mode 100644 index 0000000000000..ef108e2b14a97 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; +import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.policies.data.SourceStatus; + +import java.util.List; + +/** + * Admin interface for Source management. + */ +public interface Sources { + /** + * Get the list of sources. + *

+ * Get the list of all the Pulsar Sources. + *

+ * Response Example: + * + *

+     * ["f1", "f2", "f3"]
+     * 
+ * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws PulsarAdminException + * Unexpected error + */ + List listSources(String tenant, String namespace) throws PulsarAdminException; + + /** + * Get the configuration for the specified source. + *

+ * Response Example: + * + *

+     * { serviceUrl : "http://my-broker.example.com:8080/" }
+     * 
+ * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @return the source configuration + * + * @throws NotAuthorizedException + * You don't have admin permission to get the configuration of the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Create a new source. + * + * @param sourceConfig + * the source configuration object + * + * @throws PulsarAdminException + * Unexpected error + */ + void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; + + /** + *
+     * Create a new source by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sourceConfig + * the source configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws PulsarAdminException + */ + void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a source. + *

+ * + * @param sourceConfig + * the source configuration object + * + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; + + /** + * Update the configuration for a source. + *

+ * + * @param sourceConfig + * the source configuration object + * @param updateOptions + * options for the update operations + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSource(SourceConfig sourceConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException; + + /** + * Update the configuration for a source. + *

+     * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sourceConfig + * the source configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a source. + *
+     * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param sourceConfig + * the source configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @param updateOptions + * options for the update operations + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException; + + /** + * Delete an existing source + *

+ * Delete a source + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws NotAuthorizedException + * You don't have admin permission + * @throws NotFoundException + * Cluster does not exist + * @throws PreconditionFailedException + * Cluster is not empty + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Gets the current status of a source. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Gets the current status of a source instance. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * @param id + * Source instance-id + * @return + * @throws PulsarAdminException + */ + SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) + throws PulsarAdminException; + + /** + * Restart source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Restart all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSource(String tenant, String namespace, String source) throws PulsarAdminException; + + + /** + * Stop source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Stop all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Start source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Start all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSource(String tenant, String namespace, String source) throws PulsarAdminException; + + + /** + * Fetches a list of supported Pulsar IO sources currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + List getBuiltInSources() throws PulsarAdminException; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java similarity index 98% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index 3da82ce9c6fa2..77a25cca926cf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Sink; +import org.apache.pulsar.client.admin.Sinks; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -48,12 +49,12 @@ import static org.asynchttpclient.Dsl.put; @Slf4j -public class SinkImpl extends ComponentResource implements Sink { +public class SinksImpl extends ComponentResource implements Sinks, Sink { private final WebTarget sink; private final AsyncHttpClient asyncHttpClient; - public SinkImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) { + public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) { super(auth); this.sink = web.path("/admin/v3/sink"); this.asyncHttpClient = asyncHttpClient; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java similarity index 98% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index cd7b8382db569..d8f57a31c2268 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Source; +import org.apache.pulsar.client.admin.Sources; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -48,12 +49,12 @@ import static org.asynchttpclient.Dsl.put; @Slf4j -public class SourceImpl extends ComponentResource implements Source { +public class SourcesImpl extends ComponentResource implements Sources, Source { private final WebTarget source; private final AsyncHttpClient asyncHttpClient; - public SourceImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) { + public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) { super(auth); this.source = web.path("/admin/v3/source"); this.asyncHttpClient = asyncHttpClient; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 0f2cecbe2bb4a..b6029a998b397 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -70,7 +70,7 @@ public class CmdSinks extends CmdBase { private final LocalSinkRunner localSinkRunner; public CmdSinks(PulsarAdmin admin) { - super("sink", admin); + super("sinks", admin); createSink = new CreateSink(); updateSink = new UpdateSink(); deleteSink = new DeleteSink(); @@ -202,9 +202,9 @@ protected class CreateSink extends SinkDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.sink().createSinkWithUrl(sinkConfig, sinkConfig.getArchive()); + admin.sinks().createSinkWithUrl(sinkConfig, sinkConfig.getArchive()); } else { - admin.sink().createSink(sinkConfig, sinkConfig.getArchive()); + admin.sinks().createSink(sinkConfig, sinkConfig.getArchive()); } print("Created successfully"); } @@ -221,9 +221,9 @@ void runCmd() throws Exception { UpdateOptions updateOptions = new UpdateOptions(); updateOptions.setUpdateAuthData(updateAuthData); if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.sink().updateSinkWithUrl(sinkConfig, sinkConfig.getArchive(), updateOptions); + admin.sinks().updateSinkWithUrl(sinkConfig, sinkConfig.getArchive(), updateOptions); } else { - admin.sink().updateSink(sinkConfig, sinkConfig.getArchive(), updateOptions); + admin.sinks().updateSink(sinkConfig, sinkConfig.getArchive(), updateOptions); } print("Updated successfully"); } @@ -465,7 +465,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { protected String validateSinkType(String sinkType) throws IOException { Set availableSinks; try { - availableSinks = admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); + availableSinks = admin.sinks().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); } catch (PulsarAdminException e) { throw new IOException(e); } @@ -515,7 +515,7 @@ protected class DeleteSink extends SinkCommand { @Override void runCmd() throws Exception { - admin.sink().deleteSink(tenant, namespace, sinkName); + admin.sinks().deleteSink(tenant, namespace, sinkName); print("Deleted successfully"); } } @@ -525,7 +525,7 @@ protected class GetSink extends SinkCommand { @Override void runCmd() throws Exception { - SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, sinkName); + SinkConfig sinkConfig = admin.sinks().getSink(tenant, namespace, sinkName); Gson gson = new GsonBuilder().setPrettyPrinting().create(); System.out.println(gson.toJson(sinkConfig)); } @@ -554,7 +554,7 @@ public void processArguments() { @Override void runCmd() throws Exception { - List sinks = admin.sink().listSinks(tenant, namespace); + List sinks = admin.sinks().listSinks(tenant, namespace); Gson gson = new GsonBuilder().setPrettyPrinting().create(); System.out.println(gson.toJson(sinks)); } @@ -569,9 +569,9 @@ class GetSinkStatus extends SinkCommand { @Override void runCmd() throws Exception { if (isBlank(instanceId)) { - print(admin.sink().getSinkStatus(tenant, namespace, sinkName)); + print(admin.sinks().getSinkStatus(tenant, namespace, sinkName)); } else { - print(admin.sink().getSinkStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId))); + print(admin.sinks().getSinkStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId))); } } } @@ -586,12 +586,12 @@ class RestartSink extends SinkCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + admin.sinks().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.sink().restartSink(tenant, namespace, sinkName); + admin.sinks().restartSink(tenant, namespace, sinkName); } System.out.println("Restarted successfully"); } @@ -607,12 +607,12 @@ class StopSink extends SinkCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + admin.sinks().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.sink().stopSink(tenant, namespace, sinkName); + admin.sinks().stopSink(tenant, namespace, sinkName); } System.out.println("Stopped successfully"); } @@ -628,12 +628,12 @@ class StartSink extends SinkCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.sink().startSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + admin.sinks().startSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.sink().startSink(tenant, namespace, sinkName); + admin.sinks().startSink(tenant, namespace, sinkName); } System.out.println("Started successfully"); } @@ -643,7 +643,7 @@ void runCmd() throws Exception { public class ListBuiltInSinks extends BaseCommand { @Override void runCmd() throws Exception { - admin.sink().getBuiltInSinks().stream().filter(x -> isNotBlank(x.getSinkClass())) + admin.sinks().getBuiltInSinks().stream().filter(x -> isNotBlank(x.getSinkClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 782b0f6500d44..535bb332f39d3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -73,7 +73,7 @@ public class CmdSources extends CmdBase { private final LocalSourceRunner localSourceRunner; public CmdSources(PulsarAdmin admin) { - super("source", admin); + super("sources", admin); createSource = new CreateSource(); updateSource = new UpdateSource(); deleteSource = new DeleteSource(); @@ -206,9 +206,9 @@ protected class CreateSource extends SourceDetailsCommand { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) { - admin.source().createSourceWithUrl(sourceConfig, sourceConfig.getArchive()); + admin.sources().createSourceWithUrl(sourceConfig, sourceConfig.getArchive()); } else { - admin.source().createSource(sourceConfig, sourceConfig.getArchive()); + admin.sources().createSource(sourceConfig, sourceConfig.getArchive()); } print("Created successfully"); } @@ -225,9 +225,9 @@ void runCmd() throws Exception { UpdateOptions updateOptions = new UpdateOptions(); updateOptions.setUpdateAuthData(updateAuthData); if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) { - admin.source().updateSourceWithUrl(sourceConfig, sourceConfig.getArchive(), updateOptions); + admin.sources().updateSourceWithUrl(sourceConfig, sourceConfig.getArchive(), updateOptions); } else { - admin.source().updateSource(sourceConfig, sourceConfig.getArchive(), updateOptions); + admin.sources().updateSource(sourceConfig, sourceConfig.getArchive(), updateOptions); } print("Updated successfully"); } @@ -419,7 +419,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { protected String validateSourceType(String sourceType) throws IOException { Set availableSources; try { - availableSources = admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); + availableSources = admin.sources().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); } catch (PulsarAdminException e) { throw new IOException(e); } @@ -469,7 +469,7 @@ protected class DeleteSource extends SourceCommand { @Override void runCmd() throws Exception { - admin.source().deleteSource(tenant, namespace, sourceName); + admin.sources().deleteSource(tenant, namespace, sourceName); print("Delete source successfully"); } } @@ -479,7 +479,7 @@ protected class GetSource extends SourceCommand { @Override void runCmd() throws Exception { - SourceConfig sourceConfig = admin.source().getSource(tenant, namespace, sourceName); + SourceConfig sourceConfig = admin.sources().getSource(tenant, namespace, sourceName); Gson gson = new GsonBuilder().setPrettyPrinting().create(); System.out.println(gson.toJson(sourceConfig)); } @@ -508,7 +508,7 @@ public void processArguments() { @Override void runCmd() throws Exception { - List sources = admin.source().listSources(tenant, namespace); + List sources = admin.sources().listSources(tenant, namespace); Gson gson = new GsonBuilder().setPrettyPrinting().create(); System.out.println(gson.toJson(sources)); } @@ -523,9 +523,9 @@ class GetSourceStatus extends SourceCommand { @Override void runCmd() throws Exception { if (isBlank(instanceId)) { - print(admin.source().getSourceStatus(tenant, namespace, sourceName)); + print(admin.sources().getSourceStatus(tenant, namespace, sourceName)); } else { - print(admin.source().getSourceStatus(tenant, namespace, sourceName, Integer.parseInt(instanceId))); + print(admin.sources().getSourceStatus(tenant, namespace, sourceName, Integer.parseInt(instanceId))); }; } } @@ -540,12 +540,12 @@ class RestartSource extends SourceCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.source().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + admin.sources().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.source().restartSource(tenant, namespace, sourceName); + admin.sources().restartSource(tenant, namespace, sourceName); } System.out.println("Restarted successfully"); } @@ -561,12 +561,12 @@ class StopSource extends SourceCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.source().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + admin.sources().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.source().stopSource(tenant, namespace, sourceName); + admin.sources().stopSource(tenant, namespace, sourceName); } System.out.println("Stopped successfully"); } @@ -582,12 +582,12 @@ class StartSource extends SourceCommand { void runCmd() throws Exception { if (isNotBlank(instanceId)) { try { - admin.source().startSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + admin.sources().startSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); } catch (NumberFormatException e) { System.err.println("instance-id must be a number"); } } else { - admin.source().startSource(tenant, namespace, sourceName); + admin.sources().startSource(tenant, namespace, sourceName); } System.out.println("Started successfully"); } @@ -597,7 +597,7 @@ void runCmd() throws Exception { public class ListBuiltInSources extends BaseCommand { @Override void runCmd() throws Exception { - admin.source().getBuiltInSources().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) + admin.sources().getBuiltInSources().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index 705a38662d449..c3a7fd90bd0b5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -108,6 +108,11 @@ public class PulsarAdminTool { commandMap.put("resource-quotas", CmdResourceQuotas.class); commandMap.put("functions", CmdFunctions.class); commandMap.put("functions-worker", CmdFunctionWorker.class); + commandMap.put("sources", CmdSources.class); + commandMap.put("sinks", CmdSinks.class); + + // To remain backwards compatibility for "source" and "sink" commands + // TODO eventually remove this commandMap.put("source", CmdSources.class); commandMap.put("sink", CmdSinks.class); } @@ -119,11 +124,16 @@ private void setupCommands(Function a PulsarAdmin admin = adminFactory.apply(adminBuilder); for (Map.Entry> c : commandMap.entrySet()) { if (admin != null) { - // Other mode, all components are initialized. - jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin)); - } else if (c.getKey().equals("functions") || c.getKey().equals("source") || c.getKey().equals("sink")) { - // In mode localrun, only some components are initialized, such as source, sink and functions - jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin)); + // To remain backwards compatibility for "source" and "sink" commands + // TODO eventually remove this + if (c.getKey().equals("sources") || c.getKey().equals("source")) { + jcommander.addCommand("sources", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source"); + } else if (c.getKey().equals("sinks") || c.getKey().equals("sink")) { + jcommander.addCommand("sinks", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink"); + } else { + // Other mode, all components are initialized. + jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin)); + } } } } catch (Exception e) { @@ -187,6 +197,15 @@ boolean run(String[] args, Function a } else { setupCommands(adminFactory); String cmd = args[cmdPos]; + + // To remain backwards compatibility for "source" and "sink" commands + // TODO eventually remove this + if (cmd.equals("source")) { + cmd = "sources"; + } else if (cmd.equals("sink")) { + cmd = "sinks"; + } + JCommander obj = jcommander.getCommands().get(cmd); CmdBase cmdObj = (CmdBase) obj.getObjects().get(0); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index 159edaffb323d..2a485869565a8 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -38,7 +38,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.admin.cli.utils.CmdUtils; -import org.apache.pulsar.client.admin.Sink; +import org.apache.pulsar.client.admin.Sinks; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -94,7 +94,7 @@ public IObjectFactory getObjectFactory() { private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}"; private PulsarAdmin pulsarAdmin; - private Sink sink; + private Sinks sink; private CmdSinks cmdSinks; private CmdSinks.CreateSink createSink; private CmdSinks.UpdateSink updateSink; @@ -105,8 +105,8 @@ public IObjectFactory getObjectFactory() { public void setup() throws Exception { pulsarAdmin = mock(PulsarAdmin.class); - sink = mock(Sink.class); - when(pulsarAdmin.sink()).thenReturn(sink); + sink = mock(Sinks.class); + when(pulsarAdmin.sinks()).thenReturn(sink); cmdSinks = spy(new CmdSinks(pulsarAdmin)); createSink = spy(cmdSinks.getCreateSink()); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 19c6c5914e9c1..da6e99f872058 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.admin.cli.utils.CmdUtils; -import org.apache.pulsar.client.admin.Source; +import org.apache.pulsar.client.admin.Sources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -78,7 +78,7 @@ public IObjectFactory getObjectFactory() { private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}"; private PulsarAdmin pulsarAdmin; - private Source source; + private Sources source; private CmdSources CmdSources; private CmdSources.CreateSource createSource; private CmdSources.UpdateSource updateSource; @@ -89,8 +89,8 @@ public IObjectFactory getObjectFactory() { public void setup() throws Exception { pulsarAdmin = mock(PulsarAdmin.class); - source = mock(Source.class); - when(pulsarAdmin.source()).thenReturn(source); + source = mock(Sources.class); + when(pulsarAdmin.sources()).thenReturn(source); CmdSources = spy(new CmdSources(pulsarAdmin)); createSource = spy(CmdSources.getCreateSource()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index 4a53d3e762414..b2f33392aefcd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -23,12 +23,14 @@ import java.util.Set; import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; -import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource; -import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.FunctionsApiV3Resource; import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.SinksApiV3Resource; import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.SourcesApiV3Resource; import org.glassfish.jersey.media.multipart.MultiPartFeature; public final class Resources { @@ -39,7 +41,7 @@ private Resources() { public static Set> getApiV2Resources() { return new HashSet<>( Arrays.asList( - FunctionApiV2Resource.class, + FunctionsApiV2Resource.class, WorkerApiV2Resource.class, WorkerStatsApiV2Resource.class, MultiPartFeature.class @@ -50,9 +52,11 @@ public static Set> getApiV3Resources() { return new HashSet<>( Arrays.asList( MultiPartFeature.class, + SourcesApiV3Resource.class, SourceApiV3Resource.class, + SinksApiV3Resource.class, SinkApiV3Resource.class, - FunctionApiV3Resource.class + FunctionsApiV3Resource.class )); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java similarity index 99% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index e7b9ff127ddcb..fa071603315b9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -44,7 +44,7 @@ import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; @Slf4j -public class SinkImpl extends ComponentImpl { +public class SinksImpl extends ComponentImpl { private class GetSinkStatus extends GetStatus { @@ -206,7 +206,7 @@ private ExceptionInformation getExceptionInformation(InstanceCommunication.Funct return exceptionInformation; } - public SinkImpl(Supplier workerServiceSupplier) { + public SinksImpl(Supplier workerServiceSupplier) { super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java similarity index 99% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index a2b59ef63bf36..d35724ed58f9e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -44,7 +44,7 @@ import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; @Slf4j -public class SourceImpl extends ComponentImpl { +public class SourcesImpl extends ComponentImpl { private class GetSourceStatus extends GetStatus { @Override @@ -208,7 +208,7 @@ public SourceStatus emptyStatus(final int parallelism) { } } - public SourceImpl(Supplier workerServiceSupplier) { + public SourcesImpl(Supplier workerServiceSupplier) { super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java similarity index 99% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java index c978cad76f978..64861878c2eeb 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java @@ -47,11 +47,11 @@ @Slf4j @Path("/functions") -public class FunctionApiV2Resource extends FunctionApiResource { +public class FunctionsApiV2Resource extends FunctionApiResource { protected final FunctionsImplV2 functions; - public FunctionApiV2Resource() { + public FunctionsApiV2Resource() { this.functions = new FunctionsImplV2(this); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java similarity index 99% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java index 7c8136d6460bc..38f4c4bfd03cc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java @@ -50,11 +50,11 @@ @Slf4j @Path("/functions") -public class FunctionApiV3Resource extends FunctionApiResource { +public class FunctionsApiV3Resource extends FunctionApiResource { protected final FunctionsImpl functions; - public FunctionApiV3Resource() { + public FunctionsApiV3Resource() { this.functions = new FunctionsImpl(this); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java index b12c3813ddaaa..3731b92e3403e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java @@ -18,220 +18,20 @@ */ package org.apache.pulsar.functions.worker.rest.api.v3; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.common.functions.UpdateOptions; -import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.common.policies.data.SinkStatus; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; -import org.apache.pulsar.functions.worker.rest.api.SinkImpl; -import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import org.glassfish.jersey.media.multipart.FormDataParam; +import io.swagger.annotations.Api; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -@Slf4j +@Api(value = "/sink", description = "Sink admin apis", tags = "sink") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) @Path("/sink") -public class SinkApiV3Resource extends FunctionApiResource { - - protected final SinkImpl sink; - - public SinkApiV3Resource() { - this.sink = new SinkImpl(this); - } - - @POST - @Path("/{tenant}/{namespace}/{sinkName}") - @Consumes(MediaType.MULTIPART_FORM_DATA) - public void registerSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("sinkConfig") String sinkConfigJson) { - - sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData()); - } - - @PUT - @Path("/{tenant}/{namespace}/{sinkName}") - @Consumes(MediaType.MULTIPART_FORM_DATA) - public void updateSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("sinkConfig") String sinkConfigJson, - final @FormDataParam("updateOptions") UpdateOptions updateOptions) { - - sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions); - } - - @DELETE - @Path("/{tenant}/{namespace}/{sinkName}") - public void deregisterSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) { - sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData()); - } - - @GET - @Path("/{tenant}/{namespace}/{sinkName}") - public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) - throws IOException { - return sink.getSinkInfo(tenant, namespace, sinkName); - } - - @GET - @ApiOperation( - value = "Displays the status of a Pulsar Sink instance", - response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class - ) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The sink doesn't exist") - }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status") - public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus( - final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @PathParam("instanceId") String instanceId) throws IOException { - return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @GET - @ApiOperation( - value = "Displays the status of a Pulsar Sink running in cluster mode", - response = SinkStatus.class - ) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The sink doesn't exist") - }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{sinkName}/status") - public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) throws IOException { - return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @GET - @Path("/{tenant}/{namespace}") - public List listSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace) { - return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Restart sink instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart") - @Consumes(MediaType.APPLICATION_JSON) - public void restartSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @PathParam("instanceId") String instanceId) { - sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Restart all sink instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/restart") - @Consumes(MediaType.APPLICATION_JSON) - public void restartSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) { - sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Stop sink instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop") - @Consumes(MediaType.APPLICATION_JSON) - public void stopSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @PathParam("instanceId") String instanceId) { - sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Stop all sink instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/stop") - @Consumes(MediaType.APPLICATION_JSON) - public void stopSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) { - sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Start sink instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") - @Consumes(MediaType.APPLICATION_JSON) - public void startSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName, - final @PathParam("instanceId") String instanceId) { - sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Start all sink instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sinkName}/start") - @Consumes(MediaType.APPLICATION_JSON) - public void startSink(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sinkName") String sinkName) { - sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); - } - - @GET - @Path("/builtinsinks") - public List getSinkList() { - List connectorDefinitions = sink.getListOfConnectors(); - List retVal = new ArrayList<>(); - for (ConnectorDefinition connectorDefinition : connectorDefinitions) { - if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) { - retVal.add(connectorDefinition); - } - } - return retVal; - } +@Deprecated +/** + * @deprecated in favor of {@link SinksApiV3Resource} + */ +public class SinkApiV3Resource extends SinksApiV3Resource { } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java new file mode 100644 index 0000000000000..e699544a403c7 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v3; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.apache.pulsar.functions.worker.rest.api.SinksImpl; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Api(value = "/sinks", description = "Sinks admin apis", tags = "sinks") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Path("/sinks") +public class SinksApiV3Resource extends FunctionApiResource { + + protected final SinksImpl sink; + + public SinksApiV3Resource() { + this.sink = new SinksImpl(this); + } + + @POST + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public void registerSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson) { + + sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData()); + } + + @PUT + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public void updateSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson, + final @FormDataParam("updateOptions") UpdateOptions updateOptions) { + + sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions); + } + + @DELETE + @Path("/{tenant}/{namespace}/{sinkName}") + public void deregisterSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData()); + } + + @GET + @Path("/{tenant}/{namespace}/{sinkName}") + public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) + throws IOException { + return sink.getSinkInfo(tenant, namespace, sinkName); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Sink instance", + response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The sink doesn't exist") + }) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status") + public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus( + final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) throws IOException { + return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Sink running in cluster mode", + response = SinkStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The sink doesn't exist") + }) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}/{sinkName}/status") + public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) throws IOException { + return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @GET + @Path("/{tenant}/{namespace}") + public List listSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Restart sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public void restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Restart all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public void restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Stop sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public void stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Stop all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public void stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Start sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Start all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); + } + + @GET + @Path("/builtinsinks") + public List getSinkList() { + List connectorDefinitions = sink.getListOfConnectors(); + List retVal = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) { + retVal.add(connectorDefinition); + } + } + return retVal; + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java index 96d208215ab8e..65d68f9d8a77f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java @@ -18,224 +18,20 @@ */ package org.apache.pulsar.functions.worker.rest.api.v3; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.common.functions.UpdateOptions; -import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.io.SourceConfig; -import org.apache.pulsar.common.policies.data.SourceStatus; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; -import org.apache.pulsar.functions.worker.rest.api.SourceImpl; -import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import org.glassfish.jersey.media.multipart.FormDataParam; +import io.swagger.annotations.Api; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -@Slf4j @Path("/source") -public class SourceApiV3Resource extends FunctionApiResource { - - protected final SourceImpl source; - - public SourceApiV3Resource() { - this.source = new SourceImpl(this); - } - - @POST - @Path("/{tenant}/{namespace}/{sourceName}") - @Consumes(MediaType.MULTIPART_FORM_DATA) - public void registerSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("sourceConfig") String sourceConfigJson) { - - source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData()); - - } - - @PUT - @Path("/{tenant}/{namespace}/{sourceName}") - @Consumes(MediaType.MULTIPART_FORM_DATA) - public void updateSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("sourceConfig") String sourceConfigJson, - final @FormDataParam("updateOptions") UpdateOptions updateOptions) { - - source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions); - } - - - @DELETE - @Path("/{tenant}/{namespace}/{sourceName}") - public void deregisterSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) { - source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData()); - } - - @GET - @Path("/{tenant}/{namespace}/{sourceName}") - public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) - throws IOException { - return source.getSourceInfo(tenant, namespace, sourceName); - } - - @GET - @ApiOperation( - value = "Displays the status of a Pulsar Source instance", - response = SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class - ) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The source doesn't exist") - }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status") - public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus( - final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @PathParam("instanceId") String instanceId) throws IOException { - return source.getSourceInstanceStatus( - tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @GET - @ApiOperation( - value = "Displays the status of a Pulsar Source running in cluster mode", - response = SourceStatus.class - ) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The source doesn't exist") - }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{sourceName}/status") - public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) throws IOException { - return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @GET - @Path("/{tenant}/{namespace}") - public List listSources(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace) { - return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Restart source instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart") - @Consumes(MediaType.APPLICATION_JSON) - public void restartSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @PathParam("instanceId") String instanceId) { - source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Restart all source instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/restart") - @Consumes(MediaType.APPLICATION_JSON) - public void restartSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) { - source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Stop source instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop") - @Consumes(MediaType.APPLICATION_JSON) - public void stopSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @PathParam("instanceId") String instanceId) { - source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Stop all source instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/stop") - @Consumes(MediaType.APPLICATION_JSON) - public void stopSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) { - source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Start source instance", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start") - @Consumes(MediaType.APPLICATION_JSON) - public void startSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @PathParam("instanceId") String instanceId) { - source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); - } - - @POST - @ApiOperation(value = "Start all source instances", response = Void.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") }) - @Path("/{tenant}/{namespace}/{sourceName}/start") - @Consumes(MediaType.APPLICATION_JSON) - public void startSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName) { - source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); - } - - @GET - @Path("/builtinsources") - public List getSourceList() { - List connectorDefinitions = source.getListOfConnectors(); - List retval = new ArrayList<>(); - for (ConnectorDefinition connectorDefinition : connectorDefinitions) { - if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) { - retval.add(connectorDefinition); - } - } - return retval; - } +@Api(value = "/source", description = "Source admin apis", tags = "source") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Deprecated +/** + * @deprecated in favor of {@link SourcesApiV3Resource} + */ +public class SourceApiV3Resource extends SourcesApiV3Resource { } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java new file mode 100644 index 0000000000000..de6df1a02755f --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v3; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.apache.pulsar.functions.worker.rest.api.SourcesImpl; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Api(value = "/sources", description = "Sources admin apis", tags = "sources") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Path("/sources") +public class SourcesApiV3Resource extends FunctionApiResource { + + protected final SourcesImpl source; + + public SourcesApiV3Resource() { + this.source = new SourcesImpl(this); + } + + @POST + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public void registerSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { + + source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData()); + + } + + @PUT + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public void updateSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson, + final @FormDataParam("updateOptions") UpdateOptions updateOptions) { + + source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions); + } + + + @DELETE + @Path("/{tenant}/{namespace}/{sourceName}") + public void deregisterSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData()); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}/{sourceName}") + public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) + throws IOException { + return source.getSourceInfo(tenant, namespace, sourceName); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Source instance", + response = SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The source doesn't exist") + }) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status") + public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus( + final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) throws IOException { + return source.getSourceInstanceStatus( + tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Source running in cluster mode", + response = SourceStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The source doesn't exist") + }) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}/{sourceName}/status") + public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) throws IOException { + return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/{tenant}/{namespace}") + public List listSources(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Restart source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public void restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Restart all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public void restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Stop source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public void stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Stop all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public void stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Start source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData()); + } + + @POST + @ApiOperation(value = "Start all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData()); + } + + @GET + @ApiOperation( + value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode", + response = List.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout") + }) + @Produces(MediaType.APPLICATION_JSON) + @Path("/builtinsources") + public List getSourceList() { + List connectorDefinitions = source.getListOfConnectors(); + List retval = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) { + retval.add(connectorDefinition); + } + } + return retval; + } +} diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index b05b0cd21320a..4405c2115db9e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -92,7 +92,7 @@ import static org.testng.Assert.assertEquals; /** - * Unit test of {@link FunctionApiV2Resource}. + * Unit test of {@link FunctionsApiV2Resource}. */ @PrepareForTest({WorkerUtils.class, InstanceUtils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" }) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index c36490665fe52..434cfc295e317 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -52,7 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; -import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -76,7 +76,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -90,7 +89,7 @@ import static org.testng.Assert.assertEquals; /** - * Unit test of {@link FunctionApiV2Resource}. + * Unit test of {@link FunctionsApiV2Resource}. */ @PrepareForTest({WorkerUtils.class, InstanceUtils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" }) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 2988e3a97f998..3b3548ae92780 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -49,7 +49,7 @@ import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; -import org.apache.pulsar.functions.worker.rest.api.SinkImpl; +import org.apache.pulsar.functions.worker.rest.api.SinksImpl; import org.apache.pulsar.io.cassandra.CassandraStringSink; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.api.mockito.PowerMockito; @@ -88,7 +88,7 @@ import static org.testng.Assert.assertEquals; /** - * Unit test of {@link SinkApiV3Resource}. + * Unit test of {@link SinksApiV3Resource}. */ @PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" }) @@ -125,7 +125,7 @@ public IObjectFactory getObjectFactory() { private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; private Namespace mockedNamespace; - private SinkImpl resource; + private SinksImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; private FunctionMetaData mockedFunctionMetaData; @@ -175,7 +175,7 @@ public void setup() throws Exception { .setPulsarServiceUrl("pulsar://localhost:6650/"); when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); - this.resource = spy(new SinkImpl(() -> mockedWorkerService)); + this.resource = spy(new SinksImpl(() -> mockedWorkerService)); mockStatic(InstanceUtils.class); PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 0381af7ab343c..cd55369821e83 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -52,7 +52,7 @@ import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; -import org.apache.pulsar.functions.worker.rest.api.SourceImpl; +import org.apache.pulsar.functions.worker.rest.api.SourcesImpl; import org.apache.pulsar.io.twitter.TwitterFireHose; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.api.mockito.PowerMockito; @@ -87,7 +87,7 @@ import static org.testng.Assert.assertEquals; /** - * Unit test of {@link SourceApiV3Resource}. + * Unit test of {@link SourcesApiV3Resource}. */ @PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @@ -121,7 +121,7 @@ public IObjectFactory getObjectFactory() { private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; private Namespace mockedNamespace; - private SourceImpl resource; + private SourcesImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; private FunctionMetaData mockedFunctionMetaData; @@ -170,7 +170,7 @@ public void setup() throws Exception { .setPulsarServiceUrl("pulsar://localhost:6650/"); when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); - this.resource = spy(new SourceImpl(() -> mockedWorkerService)); + this.resource = spy(new SourcesImpl(() -> mockedWorkerService)); mockStatic(InstanceUtils.class); PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE); }