Skip to content

Commit

Permalink
Allow functions to be triggered without specifying topic name (apache…
Browse files Browse the repository at this point in the history
…#1696)

* Re-added trigger functionality with no need for topic name

* Unified paths

* Fix
  • Loading branch information
srkukarni authored and sijie committed May 1, 2018
1 parent cbdce09 commit fb7198a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ public Response getAssignments() {
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("topic") String topic,
final @FormDataParam("data") String triggerValue,
final @FormDataParam("dataStream") InputStream triggerStream) {
final @FormDataParam("dataStream") InputStream triggerStream,
final @FormDataParam("topic") String topic) {

return functions.triggerFunction(
tenant, namespace, functionName, topic, triggerValue, triggerStream);
tenant, namespace, functionName, triggerValue, triggerStream, topic);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException;
String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException;

/**
* Upload Data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void updateFunction(FunctionDetails functionDetails, String fileName) thr
}

@Override
public String triggerFunction(String tenant, String namespace, String functionName, String triggerValue, String triggerFile) throws PulsarAdminException {
public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
if (triggerFile != null) {
Expand All @@ -160,9 +160,11 @@ public String triggerFunction(String tenant, String namespace, String functionNa
if (triggerValue != null) {
mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
}
String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
if (topic != null && !topic.isEmpty()) {
mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
}
return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
return response;
} catch (Exception e) {
throw getApiException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,12 +776,14 @@ class TriggerFunction extends FunctionCommand {
protected String triggerValue;
@Parameter(names = "--triggerFile", description = "The path to the file that contains the data with which you'd like to trigger the function")
protected String triggerFile;
@Parameter(names = "--topic", description = "The specific topic name that the function consumes from that you want to inject the data to")
protected String topic;
@Override
void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
}
String retval = admin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
System.out.println(retval);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,14 @@ public Response getAssignments() {
}

@POST
@Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger")
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("name") String functionName,
final @PathParam("topic") String topic,
final @FormDataParam("data") String input,
final @FormDataParam("dataStream") InputStream uploadedInputStream) {
final @FormDataParam("dataStream") InputStream uploadedInputStream,
final @FormDataParam("topic") String topic) {
FunctionDetails functionDetails;
// validate parameters
try {
Expand All @@ -480,9 +480,15 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);

String inputTopicToWrite;
// only if the source is PulsarSource
if (functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
inputTopicToWrite = topic;
// only if the source is PulsarSource and if the function consumes only one topic
if (!functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
return Response.status(Status.BAD_REQUEST).build();
}
if (topic != null) {
inputTopicToWrite = topic;
} else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) {
inputTopicToWrite =
functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
} else {
return Response.status(Status.BAD_REQUEST).build();
}
Expand Down Expand Up @@ -707,9 +713,6 @@ private void validateTriggerRequestParams(String tenant,
if (functionName == null) {
throw new IllegalArgumentException("Function Name is not provided");
}
if (topic == null) {
throw new IllegalArgumentException("Topic Name is not provided");
}
if (uploadedInputStream == null && input == null) {
throw new IllegalArgumentException("Trigger Data is not provided");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ public Response getAssignments() {
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("name") String functionName,
final @PathParam("topic") String topic,
final @FormDataParam("data") String input,
final @FormDataParam("dataStream") InputStream uploadedInputStream) {
return functions.triggerFunction(tenant, namespace, functionName, topic, input, uploadedInputStream);
final @FormDataParam("dataStream") InputStream uploadedInputStream,
final @FormDataParam("topic") String topic) {
return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
}

@POST
Expand Down

0 comments on commit fb7198a

Please sign in to comment.