-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathrabbitmq_rest_client.go
217 lines (188 loc) · 7.44 KB
/
rabbitmq_rest_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// rabbitmq http api client
// Copyright (C) 2017-2022 Jan Delgado
package rabtap
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"time"
"net/http"
"net/url"
"reflect"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/sync/errgroup"
)
// RabbitHTTPClient is a minimal client to the rabbitmq management REST api.
// It implements only functions needed by this tool (i.e. GET on some of the
// resources). The messages structs were generated using json-to-go (
// https://mholt.github.io/json-to-go/).
type RabbitHTTPClient struct {
url *url.URL // base URL
client *http.Client
}
// NewRabbitHTTPClient returns a new instance of an RabbitHTTPClient. url
// is the base API URL of the REST server.
func NewRabbitHTTPClient(url *url.URL, tlsConfig *tls.Config) *RabbitHTTPClient {
tr := &http.Transport{
TLSClientConfig: tlsConfig,
DisableCompression: false,
Dial: Dialer}
client := &http.Client{Transport: tr, Timeout: time.Duration(2 * time.Second)}
return &RabbitHTTPClient{url, client}
}
type httpRequest struct {
path string // relative path
t reflect.Type // type of expected result
}
// getResource gets resource constructed from s.url and equest.url and
// deserialized the resource into an request.t type, which is returned.
func (s *RabbitHTTPClient) getResource(ctx context.Context, request httpRequest) (interface{}, error) {
r := reflect.New(request.t).Interface()
url := s.url.String() + "/" + request.path
resp, err := ctxhttp.Get(ctx, s.client, url)
if err != nil {
return r, err
}
if resp.StatusCode != 200 {
return r, errors.New(resp.Status)
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(r)
return r, err
}
// delResource make DELETE request to given relative path
func (s *RabbitHTTPClient) delResource(ctx context.Context, path string) error {
url := s.url.String() + "/" + path
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return err
}
resp, err := ctxhttp.Do(ctx, s.client, req)
if err != nil {
return err
}
if resp.StatusCode != 200 && resp.StatusCode != 204 {
return errors.New(resp.Status)
}
defer resp.Body.Close()
return nil
}
// BrokerInfo represents the state of various RabbitMQ ressources as
// returned by the RabbitMQ REST API
type BrokerInfo struct {
Overview RabbitOverview
Connections []RabbitConnection
Exchanges []RabbitExchange
Queues []RabbitQueue
Consumers []RabbitConsumer
Bindings []RabbitBinding
Channels []RabbitChannel
Vhosts []RabbitVhost
}
// Overview returns the /overview resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Overview(ctx context.Context) (RabbitOverview, error) {
res, err := s.getResource(ctx, httpRequest{"overview", reflect.TypeOf(RabbitOverview{})})
return *res.(*RabbitOverview), err
}
// Connections returns the /connections resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Connections(ctx context.Context) ([]RabbitConnection, error) {
res, err := s.getResource(ctx, httpRequest{"connections", reflect.TypeOf([]RabbitConnection{})})
return *res.(*[]RabbitConnection), err
}
// Channels returns the /channels resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Channels(ctx context.Context) ([]RabbitChannel, error) {
res, err := s.getResource(ctx, httpRequest{"channels", reflect.TypeOf([]RabbitChannel{})})
return *res.(*[]RabbitChannel), err
}
// Exchanges returns the /exchanges resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Exchanges(ctx context.Context) ([]RabbitExchange, error) {
res, err := s.getResource(ctx, httpRequest{"exchanges", reflect.TypeOf([]RabbitExchange{})})
return *res.(*[]RabbitExchange), err
}
// Queues returns the /queues resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Queues(ctx context.Context) ([]RabbitQueue, error) {
res, err := s.getResource(ctx, httpRequest{"queues", reflect.TypeOf([]RabbitQueue{})})
return *res.(*[]RabbitQueue), err
}
// Consumers returns the /consumers resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Consumers(ctx context.Context) ([]RabbitConsumer, error) {
res, err := s.getResource(ctx, httpRequest{"consumers", reflect.TypeOf([]RabbitConsumer{})})
return *res.(*[]RabbitConsumer), err
}
// Bindings returns the /bindings resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Bindings(ctx context.Context) ([]RabbitBinding, error) {
res, err := s.getResource(ctx, httpRequest{"bindings", reflect.TypeOf([]RabbitBinding{})})
return *res.(*[]RabbitBinding), err
}
// Vhosts returns the /vhosts resource of the RabbitMQ REST API
func (s *RabbitHTTPClient) Vhosts(ctx context.Context) ([]RabbitVhost, error) {
res, err := s.getResource(ctx, httpRequest{"vhosts", reflect.TypeOf([]RabbitVhost{})})
return *res.(*[]RabbitVhost), err
}
// BrokerInfo gets all resources of the broker in parallel
func (s *RabbitHTTPClient) BrokerInfo(ctx context.Context) (BrokerInfo, error) {
g, ctx := errgroup.WithContext(ctx)
var r BrokerInfo
g.Go(func() (err error) { r.Overview, err = s.Overview(ctx); return })
g.Go(func() (err error) { r.Connections, err = s.Connections(ctx); return })
g.Go(func() (err error) { r.Exchanges, err = s.Exchanges(ctx); return })
g.Go(func() (err error) { r.Queues, err = s.Queues(ctx); return })
g.Go(func() (err error) { r.Consumers, err = s.Consumers(ctx); return })
g.Go(func() (err error) { r.Bindings, err = s.Bindings(ctx); return })
g.Go(func() (err error) { r.Channels, err = s.Channels(ctx); return })
g.Go(func() (err error) { r.Vhosts, err = s.Vhosts(ctx); return })
return r, g.Wait()
}
// CloseConnection closes a connection by DELETING the associated resource
func (s *RabbitHTTPClient) CloseConnection(ctx context.Context, conn, reason string) error {
return s.delResource(ctx, "connections/"+conn)
}
// UnmarshalJSON is a workaround to deserialize int attributes in the
// RabbitMQ API which are sometimes returned as strings, (i.e. the
// value "undefined").
func (d *OptInt) UnmarshalJSON(data []byte) error {
if data[0] == '"' {
return nil
}
type Alias int
aux := (*Alias)(d)
return json.Unmarshal(data, aux)
}
// helper for UnmarshalJSON. Unfortunately we can not use generic here and define
// an "type Alias T" here (see://go101.org/generics/888-the-status-quo-of-go-custom-generics.html)
// So some boiler plate is left in the UnmarshalJSON functions.
func unmarshalEmptyArrayOrObject(data []byte, v any) error {
if data[0] == '[' {
// JSON array detected
return nil
}
return json.Unmarshal(data, v)
}
// UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API
// returning "[]" instead of null. To make sure deserialization does not
// break, we catch this case, and return an empty ChannelDetails struct.
// see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424
func (d *ChannelDetails) UnmarshalJSON(data []byte) error {
// alias ChannelDetails to avoid recursion when calling Unmarshal
type Alias ChannelDetails
aux := &struct {
*Alias
}{
Alias: (*Alias)(d),
}
return unmarshalEmptyArrayOrObject(data, &aux)
}
// UnmarshalJSON is a custom unmarshaler as a WORKAROUND for RabbitMQ API
// returning "[]" instead of null. To make sure deserialization does not
// break, we catch this case, and return an empty ChannelDetails struct.
// see e.g. https://github.com/rabbitmq/rabbitmq-management/issues/424
func (d *ConnectionDetails) UnmarshalJSON(data []byte) error {
type Alias ConnectionDetails
aux := &struct {
*Alias
}{
Alias: (*Alias)(d),
}
return unmarshalEmptyArrayOrObject(data, &aux)
}