Skip to content

Commit

Permalink
update rabbit
Browse files Browse the repository at this point in the history
  • Loading branch information
Antonboom committed Jun 24, 2021
1 parent 65ac69c commit 8bdb87a
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 14 deletions.
5 changes: 5 additions & 0 deletions 29-queues/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.PHONY: rabbit

rabbit:
# http://localhost:15672/ guest:guest
docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
9 changes: 7 additions & 2 deletions 29-queues/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package simpleconsumer
import (
"context"
"fmt"
"github.com/streadway/amqp"
"log"

"github.com/streadway/amqp"
)

type RMQConnection interface {
Expand Down Expand Up @@ -49,6 +50,11 @@ func (c *Consumer) Consume(ctx context.Context, queue string) (<-chan Message, e
}

go func() {
defer func() {
close(messages)
log.Println("close messages channel")
}()

for {
select {
case <-ctx.Done():
Expand All @@ -74,4 +80,3 @@ func (c *Consumer) Consume(ctx context.Context, queue string) (<-chan Message, e

return messages, nil
}

3 changes: 1 addition & 2 deletions 29-queues/consumer/reconnect/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"context"
"errors"
"fmt"
"github.com/cenkalti/backoff/v3"
"log"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -114,7 +114,6 @@ func (c *Consumer) announceQueue() (<-chan amqp.Delivery, error) {
false,
nil,
)

if err != nil {
return nil, fmt.Errorf("queue Declare: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion 29-queues/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/OtusGolang/webinars_practical_part/29-queues

go 1.15
go 1.16

require (
github.com/cenkalti/backoff/v3 v3.2.2
Expand Down
2 changes: 0 additions & 2 deletions 29-queues/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cenkalti/backoff v1.1.0 h1:QnvVp8ikKCDWOsFheytRCoYWYPO/ObCTBGxT19Hc+yE=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
Expand Down
16 changes: 11 additions & 5 deletions 29-queues/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,38 @@ package main
import (
"context"
"flag"
"log"
"os/signal"
"syscall"

simpleconsumer "github.com/OtusGolang/webinars_practical_part/29-queues/consumer"
"github.com/streadway/amqp"
"log"
)

var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
)
var uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")

func init() {
flag.Parse()
}

// http://localhost:15672/ guest:guest
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

conn, err := amqp.Dial(*uri)
failOnErr(err)

c := simpleconsumer.New("simple consumer", conn)
msgs, err := c.Consume(context.Background(), "hello")
msgs, err := c.Consume(ctx, "hello")

log.Println("start consuming...")

for m := range msgs {
log.Println("receive new message: ", string(m.Data))
}

log.Println("teardown")
}

func failOnErr(err error) {
Expand Down
3 changes: 2 additions & 1 deletion 29-queues/official_examples/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package main
import (
"flag"
"fmt"
"github.com/streadway/amqp"
"log"
"time"

"github.com/streadway/amqp"
)

var (
Expand Down
1 change: 0 additions & 1 deletion 29-queues/official_examples/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func main() {
}

func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error {

// This function dials, connects, declares, publishes, and tears down,
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.
Expand Down

0 comments on commit 8bdb87a

Please sign in to comment.