-
Notifications
You must be signed in to change notification settings - Fork 17
/
session.go
96 lines (82 loc) · 1.89 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package rabtap
import (
"context"
"crypto/tls"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// taken from streadways amqplib examples
const (
retryDelay = 3 * time.Second
FailEarly = true
)
// Session composes an amqp.Connection with an amqp.Channel
type Session struct {
*amqp.Connection
*amqp.Channel
}
// NewChannel opens a new Channel on the connection. Call when current
// got closed due to errors.
func (s *Session) NewChannel() error {
if s.Channel != nil {
s.Channel.Close()
}
ch, err := s.Connection.Channel()
s.Channel = ch
return err
}
// redial continually connects to the URL and provides a AMQP connection and
// channel in a Session struct. Closes returned chan when initial connection
// attempt fails.
func redial(ctx context.Context, url string, tlsConfig *tls.Config,
logger Logger, failEarly bool) chan chan Session {
sessions := make(chan chan Session)
go func() {
sess := make(chan Session)
defer close(sessions)
for {
select {
case sessions <- sess:
case <-ctx.Done():
logger.Debugf("session: shutting down factory (cancel)")
close(sess)
return
}
// try to connect. fail early if initial connection cant be
// established.
var conn *amqp.Connection
var ch *amqp.Channel
var err error
for {
conn, err = DialTLS(url, tlsConfig)
if err == nil {
ch, err = conn.Channel()
if err == nil {
break
}
}
logger.Errorf("session: cannot (re-)dial: %v: %q", err, url)
if failEarly {
close(sess)
return
}
select {
case <-ctx.Done():
logger.Debugf("session: shutting down factory (cancel)")
close(sess)
return
case <-time.After(retryDelay):
}
}
failEarly = false
select {
case sess <- Session{conn, ch}:
case <-ctx.Done():
logger.Debugf("session: shutting down factory (cancel)")
close(sess)
return
}
}
}()
return sessions
}