Skip to content

Commit

Permalink
Fix: handle AMQP reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
Camille Meulien committed Jun 7, 2020
1 parent 9fc48ab commit 6c78ff9
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 78 deletions.
3 changes: 1 addition & 2 deletions cmd/rango/rango.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func main() {
log.Fatal().Msgf("creating new AMQP session failed: %s", err.Error())
return
}
ach, err := mq.Stream(*exName, qName)
err = mq.Stream(*exName, qName, hub.ReceiveMsg)
defer mq.Close(qName)

if err != nil {
Expand All @@ -154,7 +154,6 @@ func main() {
}

go hub.ListenWebsocketEvents()
go hub.ListenAMQP(ach)

wsHandler := func(w http.ResponseWriter, r *http.Request) {
routing.NewClient(hub, w, r)
Expand Down
73 changes: 35 additions & 38 deletions pkg/routing/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,52 +103,49 @@ func (h *Hub) ListenWebsocketEvents() {
}
}

func (h *Hub) ListenAMQP(q <-chan amqp.Delivery) {
for delivery := range q {
if isTrace() {
log.Trace().Msgf("AMQP msg received: %s -> %s", delivery.RoutingKey, delivery.Body)
}
s := strings.Split(delivery.RoutingKey, ".")
// ReceiveMsg handles AMQP messages
func (h *Hub) ReceiveMsg(delivery amqp.Delivery) {
if isTrace() {
log.Trace().Msgf("AMQP msg received: %s -> %s", delivery.RoutingKey, delivery.Body)
}
s := strings.Split(delivery.RoutingKey, ".")

var o interface{}
err := json.Unmarshal(delivery.Body, &o)
var o interface{}
err := json.Unmarshal(delivery.Body, &o)

if err != nil {
log.Error().Msgf("JSON parse error: %s, msg: %s", err.Error(), delivery.Body)
delivery.Ack(true)
continue
}
if err != nil {
log.Error().Msgf("JSON parse error: %s, msg: %s", err.Error(), delivery.Body)
delivery.Ack(true)
return
}

switch len(s) {
case 2:
msg := Event{
Scope: s[0],
Stream: "",
Type: s[1],
Topic: getTopic(s[0], s[0], s[1]),
Body: o,
}
switch len(s) {
case 2:
msg := Event{
Scope: s[0],
Stream: "",
Type: s[1],
Topic: getTopic(s[0], s[0], s[1]),
Body: o,
}

h.routeMessage(&msg)
h.routeMessage(&msg)

case 3:
msg := Event{
Scope: s[0],
Stream: s[1],
Type: s[2],
Topic: getTopic(s[0], s[1], s[2]),
Body: o,
}
case 3:
msg := Event{
Scope: s[0],
Stream: s[1],
Type: s[2],
Topic: getTopic(s[0], s[1], s[2]),
Body: o,
}

h.routeMessage(&msg)
h.routeMessage(&msg)

default:
log.Error().Msgf("Bad routing key: %s", delivery.RoutingKey)
}
delivery.Ack(true)
default:
log.Error().Msgf("Bad routing key: %s", delivery.RoutingKey)
}

panic("Unexpected end of AMQP events")
delivery.Ack(true)
}

func (h *Hub) handleSnapshot(msg *Event) (string, error) {
Expand Down
Loading

0 comments on commit 6c78ff9

Please sign in to comment.