Skip to content

Commit

Permalink
Fix GoLint, Adds method for PrefixQueueInDev (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
brettallred authored Oct 28, 2016
1 parent 96ac5e4 commit 0f3bb55
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 1 deletion.
2 changes: 2 additions & 0 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"
)

//Publisher allows you to publish events to RabbitMQ
type Publisher struct {
_connection *amqp.Connection
_channel *amqp.Channel
Expand Down Expand Up @@ -158,6 +159,7 @@ func (p *Publisher) PublishBytes(message []byte, subscriber *Subscriber) error {
})
}

//Close will close the connection and channel for the Publisher
func (p *Publisher) Close() {
p.lock.Lock()
defer p.lock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/streadway/amqp"
)

var _connection *amqp.Connection = nil
var _connection *amqp.Connection

func failOnError(err error, msg string) {
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rabbit

func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
45 changes: 45 additions & 0 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package rabbit

import (
"errors"
"fmt"
"log"
"os"
"os/user"
"sync"
)

Expand Down Expand Up @@ -93,6 +96,7 @@ func Register(s Subscriber, handler func(b []byte) bool) {
Handlers[s.RoutingKey] = handler
}

// CloseSubscribers removes all subscribers, handlers, and closes the amqp connection
func CloseSubscribers() {
lock.Lock()
defer lock.Unlock()
Expand All @@ -106,6 +110,7 @@ func CloseSubscribers() {
}
}

//DeleteQueue does what it says, deletes a queue in rabbit
func DeleteQueue(s Subscriber) error {
conn := connection()
if conn == nil {
Expand All @@ -120,3 +125,43 @@ func DeleteQueue(s Subscriber) error {
}
return deleteQueue(channel, &s)
}

// PrefixQueueInDev will prefix the queue name with the name of the APP_ENV variable.
// This is used for running a worker in your local environment but connecting to a stage
// or prodution rabbit server.
func (s *Subscriber) PrefixQueueInDev() {
env := appEnv()
nonDevEnvironments := []string{"production", "prod", "staging", "stage"}

if stringInSlice(env, nonDevEnvironments) {
return
}

username := currentUsersName()

if env == "test" {
username = "test_" + username
}

s.Queue = fmt.Sprintf("%s_%s", username, s.Queue)
}

func appEnv() string {
env := os.Getenv("APP_ENV")

// Check PLATFORM_ENV for backwards compatibility
if len(env) == 0 {
env = os.Getenv("PLATFORM_ENV")
}
return env
}

func currentUsersName() string {
username := "unknown"

if userData, err := user.Current(); err == nil {
username = userData.Username
}

return username
}
27 changes: 27 additions & 0 deletions subsriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rabbit_test

import (
"fmt"
"os"
"os/user"
"testing"

"github.com/brettallred/rabbit-go"
"github.com/stretchr/testify/assert"
)

func TestPrefixQueueInDev(t *testing.T) {
var subscriber = rabbit.Subscriber{
Concurrency: 5,
Durable: true,
Exchange: "events",
Queue: "test.sample.event.created",
RoutingKey: "sample.event.created",
}

os.Setenv("APP_ENV", "development")
subscriber.PrefixQueueInDev()
userData, _ := user.Current()

assert.Equal(t, subscriber.Queue, fmt.Sprintf("%s_test.sample.event.created", userData.Username))
}

0 comments on commit 0f3bb55

Please sign in to comment.