Skip to content

Commit

Permalink
Merge pull request #171 from astromechza/test-contrib
Browse files Browse the repository at this point in the history
chore: initial integration test
  • Loading branch information
wagslane authored Sep 5, 2024
2 parents be104ac + 6454487 commit cd74be0
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ jobs:
runs-on: ubuntu-latest
env:
GOPROXY: "https://proxy.golang.org,direct"
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE

steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: "1.20"
go-version: "1.22"
id: go

- name: Check out code into the Go module directory
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte

Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`.

## Integration testing

By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions.

See [integration_test.go](integration_test.go).

## 💬 Contact

[![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/wagslane/go-rabbitmq

go 1.20
go 1.22.6

require github.com/rabbitmq/amqp091-go v1.10.0
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
156 changes: 156 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package rabbitmq

import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"time"
)

const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`

func prepareDockerTest(t *testing.T) (connStr string) {
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
if err != nil {
t.Log("container id", string(out))
t.Fatalf("error launching rabbitmq in docker: %v", err)
}
t.Cleanup(func() {
containerId := strings.TrimSpace(string(out))
t.Logf("attempting to shutdown container '%s'", containerId)
if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
t.Logf("failed to stop: %v", err)
}
})
return "amqp://guest:guest@localhost:5672/"
}

func waitForHealthyAmqp(t *testing.T, connStr string) *Conn {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tkr := time.NewTicker(time.Second)

// only log connection-level logs when connection has succeeded
muted := true
connLogger := simpleLogF(func(s string, i ...interface{}) {
if !muted {
t.Logf(s, i...)
}
})

var lastErr error
for {
select {
case <-ctx.Done():
t.Fatal("timed out waiting for healthy amqp", lastErr)
return nil
case <-tkr.C:
t.Log("attempting connection")
conn, err := NewConn(connStr, WithConnectionOptionsLogger(connLogger))
if err != nil {
lastErr = err
t.Log("connection attempt failed - retrying")
} else {
if err := func() error {
pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
return fmt.Errorf("failed to setup publisher: %v", err)
}
t.Log("attempting publish")
return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
}(); err != nil {
_ = conn.Close()
t.Log("publish ping failed", err.Error())
} else {
t.Log("ping successful")
muted = true
return conn
}
}
}
}
}

// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
// to ensure the result is as expected.
func TestSimplePubSub(t *testing.T) {
connStr := prepareDockerTest(t)
conn := waitForHealthyAmqp(t, connStr)
defer conn.Close()

t.Logf("new consumer")
consumerQueue := "my_queue"
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating consumer", err)
}
defer consumer.CloseWithContext(context.Background())

// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
// it does not block.
consumed := make(chan Delivery)
defer close(consumed)

go func() {
err = consumer.Run(func(d Delivery) Action {
t.Log("consumed")
select {
case consumed <- d:
default:
}
return Ack
})
if err != nil {
t.Log("consumer run failed", err)
}
}()

// Setup a publisher with notifications enabled
t.Logf("new publisher")
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating publisher", err)
}
publisher.NotifyPublish(func(p Confirmation) {
})
defer publisher.Close()

// For test stability we cannot rely on the fact that the consumer go routines are up and running before the
// publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
// pass after we see the first message come through.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tkr := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
t.Fatal("timed out waiting for pub sub", ctx.Err())
case <-tkr.C:
t.Logf("new publish")
confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue})
if err != nil {
// publish should always succeed since we've verified the ping previously
t.Fatal("failed to publish", err)
}
for _, confirm := range confirms {
if _, err := confirm.WaitContext(ctx); err != nil {
t.Fatal("failed to wait for publish", err)
}
}
case d := <-consumed:
t.Logf("successfully saw message round trip: '%s'", string(d.Body))
return
}
}
}
26 changes: 26 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmq
import (
"fmt"
"log"
"os"

"github.com/wagslane/go-rabbitmq/internal/logger"
)
Expand Down Expand Up @@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) {
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}

// simpleLogF is used to support logging in the test functions.
// This could be exposed publicly for integration in more simple logging interfaces.
type simpleLogF func(string, ...interface{})

func (l simpleLogF) Fatalf(format string, v ...interface{}) {
l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
os.Exit(1)
}

func (l simpleLogF) Errorf(format string, v ...interface{}) {
l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Warnf(format string, v ...interface{}) {
l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Infof(format string, v ...interface{}) {
l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
}

func (l simpleLogF) Debugf(format string, v ...interface{}) {
l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}

0 comments on commit cd74be0

Please sign in to comment.