Skip to content

Commit

Permalink
Merge pull request wework#59 from wework/shutdown_fix
Browse files Browse the repository at this point in the history
Shutdown fix
  • Loading branch information
Guy Baron authored May 1, 2019
2 parents 96ebe02 + 7caa110 commit eea9d56
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
10 changes: 6 additions & 4 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,20 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
b.log().WithError(err).Error("could not stop worker")
}
}

b.Outgoing.shutdown()
b.started = false

if b.IsTxnl {

err := b.Outbox.Stop()

if err != nil {
b.log().WithError(err).Error("could not shutdown outbox")
return err
}
b.TxProvider.Dispose()
b.TxProvider.Dispose()

}

return nil
}

Expand Down Expand Up @@ -639,7 +641,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.log().WithField("message_id", msg.MessageId).Info("sending message to outbox")
b.log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, key, msg)
if saveErr != nil {
log.WithError(saveErr).Error("failed to save to transactional outbox")
Expand Down
2 changes: 1 addition & 1 deletion gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error {

//Stop forcess the transactional outbox to stop processing additional messages
func (outbox *TxOutbox) Stop() error {
outbox.exit <- true
close(outbox.exit)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery

func (worker *worker) ack(delivery amqp.Delivery) error {
ack := func(attempts uint) error { return delivery.Ack(false /*multiple*/) }
worker.log().WithField("message_id", delivery.MessageId).Info("acking message")
worker.log().WithField("message_id", delivery.MessageId).Debug("acking message")
err := retry.Retry(ack,
strategy.Wait(100*time.Millisecond))

if err != nil {
worker.log().WithError(err).Error("could not ack the message")
worker.span.LogFields(slog.Error(err))
} else {
worker.log().WithField("message_id", delivery.MessageId).Info("message acked")
worker.log().WithField("message_id", delivery.MessageId).Debug("message acked")
}

return err
Expand Down

0 comments on commit eea9d56

Please sign in to comment.