Skip to content

Commit

Permalink
On publish failures, log error and count them as sys exceptions (apac…
Browse files Browse the repository at this point in the history
…he#3704)

* On publish failures, log error and count them as sys exceptions

* Took feedback
  • Loading branch information
srkukarni authored Mar 5, 2019
1 parent 9dc3df6 commit 07cebb1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;

private ComponentStatsManager statsManager;

Map<String, String[]> userMetricsLabels = new HashMap<>();
private final String[] metricsLabels;
private final Summary userMetricsSummary;
Expand All @@ -103,12 +105,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Utils.ComponentType componentType) {
Utils.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;

this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -359,7 +362,13 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O>
}
}

return producer.sendAsync(object).thenApply(msgId -> null);
CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null);
future.exceptionally(e -> {
this.statsManager.incrSysExceptions(e);
logger.error("Failed to publish to topic {} with error {}", topicName, e);
return null;
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ ContextImpl setupContext() {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
collectorRegistry, metricsLabels, this.componentType);
collectorRegistry, metricsLabels, this.componentType, this.stats);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() {
client,
new ArrayList<>(),
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
Utils.ComponentType.FUNCTION);
Utils.ComponentType.FUNCTION, null);
}

@Test(expectedExceptions = IllegalStateException.class)
Expand Down

0 comments on commit 07cebb1

Please sign in to comment.