Skip to content

Commit

Permalink
Fixed the behavior of Function start/stop (apache#3477)
Browse files Browse the repository at this point in the history
* Added a state in the function metadata about what the state of the instances should be

* Have start api for sources/sinks

* Add missing pieces

* more checks while handling request

* Fixed bugs

* Added unittests

* Added unittest

* Fix the all instances side logic
  • Loading branch information
srkukarni authored Feb 1, 2019
1 parent 4151413 commit 020a1d5
Show file tree
Hide file tree
Showing 22 changed files with 781 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,37 @@ public void stopFunction(final @PathParam("tenant") String tenant,
functions.stopFunctionInstances(tenant, namespace, functionName);
}

@POST
@ApiOperation(value = "Start function instance", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
}

@POST
@ApiOperation(value = "Start all function instances", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
functions.startFunctionInstances(tenant, namespace, functionName);
}

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,37 @@ public void stopSink(final @PathParam("tenant") String tenant,
sink.stopFunctionInstances(tenant, namespace, sinkName);
}

@POST
@ApiOperation(value = "Start sink instance", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
}

@POST
@ApiOperation(value = "Start all sink instances", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{sinkName}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
sink.startFunctionInstances(tenant, namespace, sinkName);
}

@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,35 @@ public void stopSource(final @PathParam("tenant") String tenant,
source.stopFunctionInstances(tenant, namespace, sourceName);
}

@POST
@ApiOperation(value = "Start source instance", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
}

@POST
@ApiOperation(value = "Start all source instances", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{sourceName}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
source.startFunctionInstances(tenant, namespace, sourceName);
}

@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,39 @@ FunctionStats getFunctionStats(String tenant, String namespace, String function)
*/
void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;

/**
* Start all function instances
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @throws PulsarAdminException
* Unexpected error
*/
void startFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Start function instance
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @param instanceId
* Function instanceId
*
* @throws PulsarAdminException
* Unexpected error
*/
void startFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;

/**
* Stop all function instances
*
Expand All @@ -299,6 +332,7 @@ FunctionStats getFunctionStats(String tenant, String namespace, String function)
*/
void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;


/**
* Triggers the function by writing to the input topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,40 @@ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant
*/
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException;

/**
* Start sink instance
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param sink
* Sink name
*
* @param instanceId
* Sink instanceId
*
* @throws PulsarAdminException
* Unexpected error
*/
void startSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException;

/**
* Start all sink instances
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param sink
* Sink name
*
* @throws PulsarAdminException
* Unexpected error
*/
void startSink(String tenant, String namespace, String sink) throws PulsarAdminException;


/**
* Fetches a list of supported Pulsar IO sinks currently running in cluster mode
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,40 @@ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(Strin
*/
void stopSource(String tenant, String namespace, String source) throws PulsarAdminException;

/**
* Start source instance
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param source
* Source name
*
* @param instanceId
* Source instanceId
*
* @throws PulsarAdminException
* Unexpected error
*/
void startSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException;

/**
* Start all source instances
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param source
* Source name
*
* @throws PulsarAdminException
* Unexpected error
*/
void startSource(String tenant, String namespace, String source) throws PulsarAdminException;


/**
* Fetches a list of supported Pulsar IO sources currently running in cluster mode
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,27 @@ public void stopFunction(String tenant, String namespace, String functionName) t
}
}

@Override
public void startFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
.path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void startFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path("start"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,27 @@ public void stopSink(String tenant, String namespace, String sinkName) throws Pu
}
}

@Override
public void startSink(String tenant, String namespace, String sinkName, int instanceId)
throws PulsarAdminException {
try {
request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
.path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void startSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
request(sink.path(tenant).path(namespace).path(sinkName).path("start"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,27 @@ public void stopSource(String tenant, String namespace, String sourceName) throw
}
}

@Override
public void startSource(String tenant, String namespace, String sourceName, int instanceId)
throws PulsarAdminException {
try {
request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
.path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void startSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
try {
request(source.path(tenant).path(namespace).path(sourceName).path("start"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,35 @@ public void stopFunctionInstances() throws Exception {
verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
}

@Test
public void startFunction() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
int instanceId = 0;
cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", namespace, "--name", fnName,
"--instance-id", Integer.toString(instanceId)});

CmdFunctions.StartFunction stop = cmd.getStarter();
assertEquals(fnName, stop.getFunctionName());

verify(functions, times(1)).startFunction(tenant, namespace, fnName, instanceId);
}

@Test
public void startFunctionInstances() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", namespace, "--name", fnName });

CmdFunctions.StartFunction stop = cmd.getStarter();
assertEquals(fnName, stop.getFunctionName());

verify(functions, times(1)).startFunction(tenant, namespace, fnName);
}


@Test
public void testGetFunctionStatus() throws Exception {
String fnName = TEST_NAME + "-function";
Expand Down
Loading

0 comments on commit 020a1d5

Please sign in to comment.