Skip to content

Commit

Permalink
DIV-8274 | certificate_processor enabled services error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
egov-joy committed Feb 17, 2022
1 parent 3b631f7 commit e999f1d
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions backend/vaccination_api/cmd/certificate_processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func main() {

func initializeCreateUserInKeycloak() {
log.Infof("Using kafka for certificate_processor %s", config.Config.Kafka.BootstrapServers)
c := createConsumer("certificate_processor", "earliest", "false")
c.SubscribeTopics([]string{config.Config.Kafka.CertifyTopic}, nil)

c, err := createConsumer("certificate_processor", "earliest", "false")
err = c.SubscribeTopics([]string{config.Config.Kafka.CertifyTopic}, nil)
if err != nil {
panic(err)
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
Expand Down Expand Up @@ -116,13 +118,15 @@ func initializeRevokeCertificate() {
log.Infof("Using kafka for revoke_cert %s", config.Config.Kafka.BootstrapServers)

servers := config.Config.Kafka.BootstrapServers
producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": servers})
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": servers})
services.InitializeKafkaForRevocationService(producer)
services.InitRedis()
startRevokeCertificateErrorTopicProducer(producer)
c := createConsumer("revoke_cert", "earliest", "false")
c.SubscribeTopics([]string{config.Config.Kafka.RevokeCertTopic}, nil)

c, err := createConsumer("revoke_cert", "earliest", "false")
err = c.SubscribeTopics([]string{config.Config.Kafka.RevokeCertTopic}, nil)
if err != nil {
panic(err)
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
Expand Down Expand Up @@ -151,17 +155,13 @@ func initializeRevokeCertificate() {
c.Close()
}

func createConsumer(groupId string, autoOffsetReset string, enableAutoCommit string) *kafka.Consumer {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
func createConsumer(groupId string, autoOffsetReset string, enableAutoCommit string) (*kafka.Consumer, error) {
return kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": config.Config.Kafka.BootstrapServers,
"group.id": groupId,
"auto.offset.reset": autoOffsetReset,
"enable.auto.commit": enableAutoCommit,
})
if err != nil {
panic(err)
}
return c
}

func startRevokeCertificateErrorTopicProducer(producer *kafka.Producer) {
Expand Down

0 comments on commit e999f1d

Please sign in to comment.