Skip to content

Commit

Permalink
Prefer rest path over Function/Source/Sink Config values (apache#2918)
Browse files Browse the repository at this point in the history
### Motivation

When users create/update sink/source/function, they call the rest endpoint that has a tenant/namespace/name components and pass it a Function/Source/SinkConfig. These configs also have tenant/namespace/name parameter that might be different from the rest path.
This pr gives preference to the rest path by overwriting the config fields.
  • Loading branch information
srkukarni authored and sijie committed Nov 3, 2018
1 parent 080107a commit 6b5f737
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ logs
pulsar-broker/tmp.*
pulsar-broker/src/test/resources/log4j2.yaml
pulsar-functions/worker/test-tenant/
pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar
*.log
*.nar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,13 +1103,21 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp

if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
// The rest end points take precendence over whatever is there in functionconfig
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
FunctionConfigUtils.inferMissingArguments(functionConfig);
ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile);
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
if (componentType.equals(SOURCE)) {
Path archivePath = null;
SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
// The rest end points take precendence over whatever is there in sourceconfig
sourceConfig.setTenant(tenant);
sourceConfig.setNamespace(namespace);
sourceConfig.setName(componentName);
SourceConfigUtils.inferMissingArguments(sourceConfig);
if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
String builtinArchive = sourceConfig.getArchive();
Expand All @@ -1128,6 +1136,10 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
if (componentType.equals(SINK)) {
Path archivePath = null;
SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
// The rest end points take precendence over whatever is there in sinkConfig
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
sinkConfig.setName(componentName);
SinkConfigUtils.inferMissingArguments(sinkConfig);
if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
String builtinArchive = sinkConfig.getArchive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,4 +1200,36 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {

assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

@Test
public void testRegisterFunctionWithConflictingFields() throws IOException {
Configurator.setRootLevel(Level.DEBUG);
String actualTenant = "DIFFERENT_TENANT";
String actualNamespace = "DIFFERENT_NAMESPACE";
String actualName = "DIFFERENT_NAME";

String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);

RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
Response response = resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null);

assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,48 @@ public void testRegisterSinkSuccess() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

@Test
public void testRegisterSinkConflictingFields() throws Exception {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
String actualTenant = "DIFFERENT_TENANT";
String actualNamespace = "DIFFERENT_NAMESPACE";
String actualName = "DIFFERENT_NAME";

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);

RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("source registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);

SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
sinkConfig.setName(sink);
sinkConfig.setClassName(className);
sinkConfig.setParallelism(parallelism);
sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
Response response = resource.registerFunction(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
null,
new Gson().toJson(sinkConfig),
FunctionsImpl.SINK,
null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

@Test
public void testRegisterSinkFailure() throws Exception {
mockStatic(Utils.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,49 @@ public void testRegisterSourceSuccess() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

@Test
public void testRegisterSourceConflictingFields() throws Exception {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
String actualTenant = "DIFFERENT_TENANT";
String actualNamespace = "DIFFERENT_NAMESPACE";
String actualName = "DIFFERENT_NAME";

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);

RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("source registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);

SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant(tenant);
sourceConfig.setNamespace(namespace);
sourceConfig.setName(source);
sourceConfig.setClassName(className);
sourceConfig.setParallelism(parallelism);
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
Response response = resource.registerFunction(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
null,
new Gson().toJson(sourceConfig),
FunctionsImpl.SOURCE,
null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

@Test
public void testRegisterSourceFailure() throws Exception {
mockStatic(Utils.class);
Expand Down

0 comments on commit 6b5f737

Please sign in to comment.