Skip to content

Commit

Permalink
Refactoring Function Component implementation (apache#4541)
Browse files Browse the repository at this point in the history
* Refactoring Function Component implementation

* cleaning up
  • Loading branch information
jerrypeng authored Jun 24, 2019
1 parent d53d336 commit 95df092
Show file tree
Hide file tree
Showing 14 changed files with 1,290 additions and 584 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
*/
package org.apache.pulsar.broker.admin.impl;

import io.swagger.annotations.*;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -163,10 +168,10 @@ public void registerFunction(
)
)
)
final @FormDataParam("functionConfig") String functionConfigJson) {
final @FormDataParam("functionConfig") FunctionConfig functionConfig) {

functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}

@PUT
Expand Down Expand Up @@ -270,12 +275,12 @@ public void updateFunction(
)
)
)
final @FormDataParam("functionConfig") String functionConfigJson,
final @FormDataParam("functionConfig") FunctionConfig functionConfig,
@ApiParam(value = "The update options is for the Pulsar Function that needs to be updated.")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) throws IOException {

functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void registerSink(@ApiParam(value = "The sink's tenant")
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config playload. All available configuration options are: \n" +
Expand Down Expand Up @@ -136,10 +136,9 @@ public void registerSink(@ApiParam(value = "The sink's tenant")
)
)
)
final @FormDataParam("sinkConfig") String sinkConfigJson) {

sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData());
}

@PUT
Expand All @@ -162,7 +161,8 @@ public void updateSink(@ApiParam(value = "The sink's tenant")
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(value = "URL of sink's archive")
final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config playload. All available configuration options are: \n" +
Expand Down Expand Up @@ -221,12 +221,11 @@ public void updateSink(@ApiParam(value = "The sink's tenant")
)
)
)
final @FormDataParam("sinkConfig") String sinkConfigJson,
@ApiParam()
final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
@ApiParam(value = "Update options for sink")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {

sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);
sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData(), updateOptions);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void registerSource(
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here. \n" +
"classname \n" +
Expand Down Expand Up @@ -126,10 +126,9 @@ public void registerSource(
)
)
)
final @FormDataParam("sourceConfig") String sourceConfigJson) {

source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
source.registerSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData());
}

@PUT
Expand All @@ -154,7 +153,8 @@ public void updateSource(
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(value = "URL of sources' archive")
final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here. \n" +
"classname \n" +
Expand Down Expand Up @@ -201,11 +201,11 @@ public void updateSource(
)
)
)
final @FormDataParam("sourceConfig") String sourceConfigJson,
final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
@ApiParam(value = "Update options for source")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {

source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
source.updateSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData(), updateOptions);
}


Expand Down
Loading

0 comments on commit 95df092

Please sign in to comment.