Skip to content

Commit

Permalink
Add permanent error check (Trendyol#55)
Browse files Browse the repository at this point in the history
* Add permanent error check
  • Loading branch information
mhmtszr authored Jun 1, 2023
1 parent 623c17c commit cf137b2
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package producer
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"syscall"
"time"

"github.com/Trendyol/go-dcp-client/logger"
Expand Down Expand Up @@ -85,6 +89,9 @@ func (b *producerBatch) FlushMessages() {
startedTime := time.Now()
err := b.Writer.WriteMessages(context.Background(), b.messages...)
if err != nil {
if isFatalError(err) {
panic(fmt.Errorf("permanent error on Kafka side %e", err))
}
b.errorLogger.Printf("batch producer flush error %v", err)
return
}
Expand All @@ -96,3 +103,16 @@ func (b *producerBatch) FlushMessages() {
}
b.dcpCheckpointCommit()
}

func isFatalError(err error) bool {
e, ok := err.(kafka.Error)

if (ok && e.Temporary()) ||
errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, syscall.ECONNREFUSED) ||
errors.Is(err, syscall.ECONNRESET) ||
errors.Is(err, syscall.EPIPE) {
return false
}
return true
}

0 comments on commit cf137b2

Please sign in to comment.