Skip to content

Commit

Permalink
[improve][connector] Add getSourceConfig method on SourceContext (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jul 13, 2022
1 parent 840e920 commit c122928
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
Expand All @@ -76,6 +77,7 @@
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -264,6 +266,11 @@ public String getOutputTopic() {
return config.getFunctionDetails().getSink().getTopic();
}

@Override
public SourceConfig getSourceConfig() {
return SourceConfigUtils.convertFromDetails(config.getFunctionDetails());
}

@Override
public String getOutputSchemaType() {
SinkSpec sink = config.getFunctionDetails().getSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ public void testGetSinkConfig() {
Assert.assertNotNull(sinkConfig);
}

@Test
public void testGetSourceConfig() {
SinkContext sourceContext = context;
SinkConfig sinkConfig = sourceContext.getSinkConfig();
Assert.assertNotNull(sinkConfig);
}

@Test
public void testIncrCounterStateEnabled() throws Exception {
context.defaultStateStore = mock(BKStateStoreImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
Expand Down Expand Up @@ -123,6 +124,11 @@ public String getOutputTopic() {
return null;
}

@Override
public SourceConfig getSourceConfig() {
return null;
}

@Override
public String getTenant() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.functions.api.BaseContext;

/**
Expand All @@ -47,6 +48,13 @@ public interface SourceContext extends BaseContext {
*/
String getOutputTopic();

/**
* Get the source config.
*
* @return source config
*/
SourceConfig getSourceConfig();

/**
* New output message using schema for serializing to the topic.
*
Expand Down

0 comments on commit c122928

Please sign in to comment.