Skip to content

Commit

Permalink
[Issue 10579] Fix inputs to return a list of topic (apache#11094)
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 authored Jun 29, 2021
1 parent a6ab3af commit 0ee3956
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -250,6 +251,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setParallelism(functionDetails.getParallelism());
sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
List<String> inputs = new ArrayList<>();
for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
if (!isEmpty(input.getValue().getSerdeClassName())) {
Expand All @@ -267,7 +269,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap());
consumerConfigMap.put(input.getKey(), consumerConfig);
inputs.add(input.getKey());
}
sinkConfig.setInputs(inputs);
sinkConfig.setInputSpecs(consumerConfigMap);
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void testConvertBackFidelity() throws IOException {
sinkConfig.setSourceSubscriptionName("test-subscription");
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build());
sinkConfig.setInputs(Collections.singleton("test-input"));
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -41,6 +42,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -1461,6 +1463,10 @@ public void testGetSinkSuccess() {
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData);

getDefaultSinkInfo();

assertNotNull(getDefaultSinkInfo().getInputs());
assertEquals(getDefaultSinkInfo().getInputs(), Collections.singleton("input"));

assertEquals(
SinkConfigUtils.convertFromDetails(functionDetails),
getDefaultSinkInfo());
Expand Down

0 comments on commit 0ee3956

Please sign in to comment.