Skip to content

Commit

Permalink
Fix validation of function's update (apache#6888)
Browse files Browse the repository at this point in the history
### Motivation

The validation of parameters for function's update was not properly implemented for the outputSerdeClassName parameter. It was checking the outputSchemaType field instead.

### Modifications

Updated the if conditions and added tests.
  • Loading branch information
vzhikserg authored May 7, 2020
1 parent 464df75 commit bfec523
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName().equals(existingConfig.getOutputSerdeClassName())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setInputSpecs(inputSpecs);
functionConfig.setOutput("test-output");
functionConfig.setOutputSerdeClassName("test-serde");
functionConfig.setOutputSchemaType("json");
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
Expand Down Expand Up @@ -506,4 +507,18 @@ public void testFunctionConfigConvertFromDetails() {
assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Serde mismatch")
public void testMergeDifferentSerde() {
FunctionConfig functionConfig = createFunctionConfig();
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSerdeClassName", "test-updated-serde");
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output Schema mismatch")
public void testMergeDifferentOutputSchemaTypes() {
FunctionConfig functionConfig = createFunctionConfig();
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSchemaType", "avro");
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}
}

0 comments on commit bfec523

Please sign in to comment.