Skip to content

Commit

Permalink
Ensure AMQP conns are dropped under partial errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Aug 31, 2020
1 parent daf9b4f commit dfc6925
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 18 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.

## Unreleased

### Fixed

- Eliminated situations where an `amqp_0_9` or `amqp_1` component would abandon
a connection reset due to partial errors.

## 3.26.0 - 2020-08-30

### Added
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ lint:
@go vet $(GO_FLAGS) ./...
@golint -min_confidence 0.5 ./cmd/... ./lib/...

test:
test: $(APPS)
@go test $(GO_FLAGS) -timeout 300s -race ./...
@go run $(GO_FLAGS) ./cmd/benthos/main.go lint ./config/...
@go run $(GO_FLAGS) ./cmd/benthos/main.go test ./config/test/...
@$(PATHINSTBIN)/benthos lint ./config/...
@$(PATHINSTBIN)/benthos test ./config/test/...

test-wasm-build:
@GOOS=js GOARCH=wasm go build -ldflags="-s -w" -o ./target/wasm_test ./cmd/benthos
Expand Down
14 changes: 6 additions & 8 deletions lib/input/reader/amqp_0_9.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,16 @@ func (a *AMQP09) disconnect() error {
defer a.m.Unlock()

if a.amqpChan != nil {
err := a.amqpChan.Cancel(a.conf.ConsumerTag, true)
a.amqpChan = nil
if err != nil {
return fmt.Errorf("consumer cancel failed: %s", err)
if err := a.amqpChan.Cancel(a.conf.ConsumerTag, true); err != nil {
a.log.Errorf("Failed to cancel consumer: %v\n", err)
}
a.amqpChan = nil
}
if a.conn != nil {
err := a.conn.Close()
a.conn = nil
if err != nil {
return fmt.Errorf("AMQP 0.9 connection close error: %s", err)
if err := a.conn.Close(); err != nil {
a.log.Errorf("Failed to close connection cleanly: %v\n", err)
}
a.conn = nil
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions lib/input/reader/amqp_1.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func (a *AMQP1) disconnect(ctx context.Context) error {
}

if err := a.receiver.Close(ctx); err != nil {
return err
a.log.Errorf("Failed to cleanly close receiver: %v\n", err)
}
if err := a.session.Close(ctx); err != nil {
return err
a.log.Errorf("Failed to cleanly close session: %v\n", err)
}
if err := a.client.Close(); err != nil {
return err
a.log.Errorf("Failed to cleanly close client: %v\n", err)
}
a.client = nil
a.session = nil
Expand Down
2 changes: 1 addition & 1 deletion lib/output/writer/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (a *AMQP) disconnect() error {
}
if a.conn != nil {
if err := a.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
a.log.Errorf("Failed to close connection cleanly: %v\n", err)
}
a.conn = nil
}
Expand Down
6 changes: 3 additions & 3 deletions lib/output/writer/amqp_1.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ func (a *AMQP1) disconnect(ctx context.Context) error {
}

if err := a.sender.Close(ctx); err != nil {
return err
a.log.Errorf("Failed to cleanly close sender: %v\n", err)
}
if err := a.session.Close(ctx); err != nil {
return err
a.log.Errorf("Failed to cleanly close session: %v\n", err)
}
if err := a.client.Close(); err != nil {
return err
a.log.Errorf("Failed to cleanly close client: %v\n", err)
}
a.client = nil
a.session = nil
Expand Down
1 change: 1 addition & 0 deletions website/src/pages/community.module.css
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.button {
text-decoration: none !important;
padding: 20px;
margin: 10px;
border-radius: 5px;
border: 1px solid var(--ifm-heading-color);
display: inline-block;
Expand Down

0 comments on commit dfc6925

Please sign in to comment.