Skip to content

Commit

Permalink
Function Serverside validation Part 2 (apache#2752)
Browse files Browse the repository at this point in the history
* First cut of adding new endpoints for source/sink

* More better check

* Plugged in source/sink

* Made pulsar admin to use Source/Sink endpoints

* Switched cli to use functionConfig instead of function details

* Fix unittests

* Fixed unittest

* Logic correction

* Fix bugs after mergin

* More compile fixes

* Fixed path for sink

* Added source unittests

* Added sink tests

* Fixed unittest

* Have default value for maxRetries

* Fixed unittest

* Added example jar

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Updated license

* Correct the links for sources/sinks

* Strip the builtin://

* Removed unused imports

* Disabling presto integration test for now until we figure out the whole refactor

* Revert accidental yaml file change

* Dont expose to v1 endpoints
  • Loading branch information
srkukarni authored Oct 17, 2018
1 parent c848860 commit a7310ee
Show file tree
Hide file tree
Showing 42 changed files with 4,395 additions and 532 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
}

@PUT
Expand All @@ -106,7 +106,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

public class SinkBase extends AdminResource implements Supplier<WorkerService> {

private final FunctionsImpl functions;

public SinkBase() {
this.functions = new FunctionsImpl(this);
}

@Override
public WorkerService get() {
return pulsar().getWorkerService();
}

@POST
@ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "Pulsar Function successfully created")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response registerSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("sinkConfig") String sinkConfigJson) {

return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
}

@PUT
@ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
@ApiResponse(code = 200, message = "Pulsar Function successfully updated")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response updateSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("sinkConfig") String sinkConfigJson) {

return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());

}


@DELETE
@ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function doesn't exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "The function was successfully deleted")
})
@Path("/{tenant}/{namespace}/{sinkName}")
public Response deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
}

@GET
@ApiOperation(
value = "Fetches information about a Pulsar Sink currently running in cluster mode",
response = FunctionMetaData.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 404, message = "The function doesn't exist")
})
@Path("/{tenant}/{namespace}/{sinkName}")
public Response getSinkInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
return functions.getFunctionInfo(tenant, namespace, sinkName);
}

@GET
@ApiOperation(
value = "Displays the status of a Pulsar Sink instance",
response = FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The function doesn't exist")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
tenant, namespace, sinkName, instanceId, uri.getRequestUri());
}

@GET
@ApiOperation(
value = "Displays the status of a Pulsar Sink running in cluster mode",
response = FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{sinkName}/status")
public Response getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
}

@GET
@ApiOperation(
value = "Lists all Pulsar Sinks currently deployed in a given namespace",
response = String.class,
responseContainer = "Collection"
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}")
public Response listSinks(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(tenant, namespace);

}

@POST
@ApiOperation(value = "Restart sink instance", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
}

@POST
@ApiOperation(value = "Restart all sink instances", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sinkName}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
return functions.restartFunctionInstances(tenant, namespace, sinkName);
}

@POST
@ApiOperation(value = "Stop sink instance", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
}

@POST
@ApiOperation(value = "Stop all sink instances", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sinkName}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
return functions.stopFunctionInstances(tenant, namespace, sinkName);
}

@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode",
response = List.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/builtinsinks")
public List<ConnectorDefinition> getSinkList() {
List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
List<ConnectorDefinition> retval = new ArrayList<>();
for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
retval.add(connectorDefinition);
}
}
return retval;
}
}
Loading

0 comments on commit a7310ee

Please sign in to comment.