Skip to content

Commit

Permalink
Merge pull request apache#1883 from rinuaby13/master
Browse files Browse the repository at this point in the history
[ISSUE apache#1566]Added logger to record the catched exception
close apache#1566
  • Loading branch information
xwm1992 authored Oct 26, 2022
2 parents f322985 + 2dafa87 commit 0ad1972
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerImpl {
public static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
private final KafkaConsumer<String, CloudEvent> kafkaConsumer;
private final Properties properties;
private AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -102,6 +106,7 @@ public synchronized void subscribe(String topic) {
List<String> topics = new ArrayList<>(topicsSet);
this.kafkaConsumer.subscribe(topics);
} catch (Exception e) {
logger.error("Error while subscribing the Kafka consumer to topic: ",e);
throw new ConnectorRuntimeException(
String.format("Kafka consumer can't attach to %s.", topic));
}
Expand All @@ -115,6 +120,7 @@ public synchronized void unsubscribe(String topic) {
List<String> topics = new ArrayList<>(topicsSet);
this.kafkaConsumer.subscribe(topics);
} catch (Exception e) {
logger.error("Error while unsubscribing the Kafka consumer: ",e);
throw new ConnectorRuntimeException(String.format("kafka push consumer fails to unsubscribe topic: %s", topic));
}
}
Expand Down

0 comments on commit 0ad1972

Please sign in to comment.