From 0f3bb55dbfb12da6e968cf7f31dcb79c022c319b Mon Sep 17 00:00:00 2001 From: Brett Allred Date: Fri, 28 Oct 2016 09:46:19 -0600 Subject: [PATCH] Fix GoLint, Adds method for PrefixQueueInDev (#9) --- publisher.go | 2 ++ rabbit.go | 2 +- slice.go | 10 ++++++++++ subscriber.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ subsriber_test.go | 27 +++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 slice.go create mode 100644 subsriber_test.go diff --git a/publisher.go b/publisher.go index ada2e2f..651f275 100644 --- a/publisher.go +++ b/publisher.go @@ -9,6 +9,7 @@ import ( "time" ) +//Publisher allows you to publish events to RabbitMQ type Publisher struct { _connection *amqp.Connection _channel *amqp.Channel @@ -158,6 +159,7 @@ func (p *Publisher) PublishBytes(message []byte, subscriber *Subscriber) error { }) } +//Close will close the connection and channel for the Publisher func (p *Publisher) Close() { p.lock.Lock() defer p.lock.Unlock() diff --git a/rabbit.go b/rabbit.go index d84ea57..c85d4bf 100644 --- a/rabbit.go +++ b/rabbit.go @@ -7,7 +7,7 @@ import ( "github.com/streadway/amqp" ) -var _connection *amqp.Connection = nil +var _connection *amqp.Connection func failOnError(err error, msg string) { if err != nil { diff --git a/slice.go b/slice.go new file mode 100644 index 0000000..2e3bfa1 --- /dev/null +++ b/slice.go @@ -0,0 +1,10 @@ +package rabbit + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} diff --git a/subscriber.go b/subscriber.go index d7e4f92..abd2f6f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,7 +2,10 @@ package rabbit import ( "errors" + "fmt" "log" + "os" + "os/user" "sync" ) @@ -93,6 +96,7 @@ func Register(s Subscriber, handler func(b []byte) bool) { Handlers[s.RoutingKey] = handler } +// CloseSubscribers removes all subscribers, handlers, and closes the amqp connection func CloseSubscribers() { lock.Lock() defer lock.Unlock() @@ -106,6 +110,7 @@ func CloseSubscribers() { } } +//DeleteQueue does what it says, deletes a queue in rabbit func DeleteQueue(s Subscriber) error { conn := connection() if conn == nil { @@ -120,3 +125,43 @@ func DeleteQueue(s Subscriber) error { } return deleteQueue(channel, &s) } + +// PrefixQueueInDev will prefix the queue name with the name of the APP_ENV variable. +// This is used for running a worker in your local environment but connecting to a stage +// or prodution rabbit server. +func (s *Subscriber) PrefixQueueInDev() { + env := appEnv() + nonDevEnvironments := []string{"production", "prod", "staging", "stage"} + + if stringInSlice(env, nonDevEnvironments) { + return + } + + username := currentUsersName() + + if env == "test" { + username = "test_" + username + } + + s.Queue = fmt.Sprintf("%s_%s", username, s.Queue) +} + +func appEnv() string { + env := os.Getenv("APP_ENV") + + // Check PLATFORM_ENV for backwards compatibility + if len(env) == 0 { + env = os.Getenv("PLATFORM_ENV") + } + return env +} + +func currentUsersName() string { + username := "unknown" + + if userData, err := user.Current(); err == nil { + username = userData.Username + } + + return username +} diff --git a/subsriber_test.go b/subsriber_test.go new file mode 100644 index 0000000..ca14c98 --- /dev/null +++ b/subsriber_test.go @@ -0,0 +1,27 @@ +package rabbit_test + +import ( + "fmt" + "os" + "os/user" + "testing" + + "github.com/brettallred/rabbit-go" + "github.com/stretchr/testify/assert" +) + +func TestPrefixQueueInDev(t *testing.T) { + var subscriber = rabbit.Subscriber{ + Concurrency: 5, + Durable: true, + Exchange: "events", + Queue: "test.sample.event.created", + RoutingKey: "sample.event.created", + } + + os.Setenv("APP_ENV", "development") + subscriber.PrefixQueueInDev() + userData, _ := user.Current() + + assert.Equal(t, subscriber.Queue, fmt.Sprintf("%s_test.sample.event.created", userData.Username)) +}