Skip to content

Commit

Permalink
Added stop/restart functionality in sources/sinks (apache#2810)
Browse files Browse the repository at this point in the history
* Added Get and List source/sink functionality

* Fixed compile

* Removed test that doesnt make sense any more

* Fixed build

* Fixed logic

* Return error response

* Return response on error

* Fix unittest

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Added get/list sinks tests

* Added get/list tests

* Add more unittests

* Added more unittests

* Added TODO

* Took feedback

* Fix unittest

* Fix unittest

* Fix unittest

* Fixed integration tests

* Fixed integration test

* Added restart/stop functionality to the sources/sinks

* Added getstatus method to sources/sink

* Fix integration tests
  • Loading branch information
srkukarni authored Oct 19, 2018
1 parent 1386e6d commit f3a027b
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
tenant, namespace, functionName, instanceId, uri.getRequestUri());
tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}

@GET
Expand All @@ -179,7 +179,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
tenant, namespace, functionName, uri.getRequestUri());
tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}

@GET
Expand Down Expand Up @@ -256,7 +256,7 @@ public Response getFunctionState(final @PathParam("tenant") String tenant,
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -268,7 +268,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
return functions.restartFunctionInstances(tenant, namespace, functionName);
return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}

@POST
Expand All @@ -281,7 +281,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -293,7 +293,7 @@ public Response stopFunction(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
return functions.stopFunctionInstances(tenant, namespace, functionName);
return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
tenant, namespace, sinkName, instanceId, uri.getRequestUri());
tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}

@GET
Expand All @@ -164,7 +164,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
public Response getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
}

@GET
Expand Down Expand Up @@ -194,7 +194,7 @@ public Response listSinks(final @PathParam("tenant") String tenant,
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -206,7 +206,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
return functions.restartFunctionInstances(tenant, namespace, sinkName);
return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}

@POST
Expand All @@ -219,7 +219,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -231,7 +231,7 @@ public Response stopSink(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
return functions.stopFunctionInstances(tenant, namespace, sinkName);
return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
tenant, namespace, sourceName, instanceId, uri.getRequestUri());
tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}

@GET
Expand All @@ -166,7 +166,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
public Response getSourceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
}

@GET
Expand Down Expand Up @@ -197,7 +197,7 @@ public Response listSources(final @PathParam("tenant") String tenant,
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -209,7 +209,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
return functions.restartFunctionInstances(tenant, namespace, sourceName);
return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}

@POST
Expand All @@ -222,7 +222,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}

@POST
Expand All @@ -234,7 +234,7 @@ public Response stopSource(final @PathParam("tenant") String tenant,
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
return functions.stopFunctionInstances(tenant, namespace, sourceName);
return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;

import java.io.File;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class CmdSinks extends CmdBase {
private final DeleteSink deleteSink;
private final ListSinks listSinks;
private final GetSink getSink;
private final GetSinkStatus getSinkStatus;
private final StopSink stopSink;
private final RestartSink restartSink;
private final LocalSinkRunner localSinkRunner;

public CmdSinks(PulsarAdmin admin) {
Expand All @@ -76,13 +80,19 @@ public CmdSinks(PulsarAdmin admin) {
deleteSink = new DeleteSink();
listSinks = new ListSinks();
getSink = new GetSink();
getSinkStatus = new GetSinkStatus();
stopSink = new StopSink();
restartSink = new RestartSink();
localSinkRunner = new LocalSinkRunner();

jcommander.addCommand("create", createSink);
jcommander.addCommand("update", updateSink);
jcommander.addCommand("delete", deleteSink);
jcommander.addCommand("list", listSinks);
jcommander.addCommand("get", getSink);
jcommander.addCommand("getstatus", getSinkStatus);
jcommander.addCommand("stop", stopSink);
jcommander.addCommand("restart", restartSink);
jcommander.addCommand("localrun", localSinkRunner);
jcommander.addCommand("available-sinks", new ListBuiltInSinks());
}
Expand Down Expand Up @@ -590,6 +600,65 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "Check the current status of a Pulsar Sink")
class GetSinkStatus extends SinkCommand {

@Parameter(names = "--instance-id", description = "The sink instanceId (Get-status of all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
String json = Utils.printJson(
isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName)
: admin.sink().getSinkStatus(tenant, namespace, sinkName,
Integer.parseInt(instanceId)));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
}

@Parameters(commandDescription = "Restart sink instance")
class RestartSink extends SinkCommand {

@Parameter(names = "--instance-id", description = "The sink instanceId (restart all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
try {
admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
} catch (NumberFormatException e) {
System.err.println("instance-id must be a number");
}
} else {
admin.sink().restartSink(tenant, namespace, sinkName);
}
System.out.println("Restarted successfully");
}
}

@Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again")
class StopSink extends SinkCommand {

@Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
try {
admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
} catch (NumberFormatException e) {
System.err.println("instance-id must be a number");
}
} else {
admin.sink().stopSink(tenant, namespace, sinkName);
}
System.out.println("Restarted successfully");
}
}

@Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
public class ListBuiltInSinks extends BaseCommand {
@Override
Expand Down
Loading

0 comments on commit f3a027b

Please sign in to comment.