Skip to content

Commit

Permalink
Misc Function fixes:- (apache#3907)
Browse files Browse the repository at this point in the history
1) Collect input topics from the function details spec
2) Catch all errors during source/sink close since its user code
  • Loading branch information
srkukarni authored and sijie committed Mar 28, 2019
1 parent a4a53c7 commit 19dd255
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private Map<String, Producer<?>> publishProducers;
private ProducerBuilderImpl<?> producerBuilder;

private final List<String> inputTopics;

private final TopicSchema topicSchema;

private final SecretsProvider secretsProvider;
Expand All @@ -102,13 +100,12 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
private final Utils.ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Utils.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;

Expand Down Expand Up @@ -169,7 +166,7 @@ public Record<?> getCurrentRecord() {

@Override
public Collection<String> getInputTopics() {
return inputTopics;
return config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,9 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
}

ContextImpl setupContext() {
List<String> inputTopics = null;
if (source instanceof PulsarSource) {
inputTopics = ((PulsarSource<?>) source).getInputTopics();
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats);
}

Expand Down Expand Up @@ -470,7 +466,7 @@ public void close() {
if (source != null) {
try {
source.close();
} catch (Exception e) {
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);

}
Expand All @@ -479,7 +475,7 @@ public void close() {
if (sink != null) {
try {
sink.close();
} catch (Exception e) {
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void setup() {
config,
logger,
client,
new ArrayList<>(),
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
Utils.ComponentType.FUNCTION, null);
}
Expand Down

0 comments on commit 19dd255

Please sign in to comment.