-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhealth.go
158 lines (135 loc) · 4.21 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"context"
"sync/atomic"
"time"
"github.com/valyala/fasthttp"
kclock "k8s.io/utils/clock"
)
const (
initialDelay = time.Second * 1
failureThreshold = 2
requestTimeout = time.Second * 2
interval = time.Second * 5
successStatusCode = 200
)
// Option is a function that applies a health check option.
type Option func(o *healthCheckOptions)
type healthCheckOptions struct {
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int32
interval time.Duration
successStatusCode int
clock kclock.WithTicker
}
// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(ctx context.Context, endpointAddress string, opts ...Option) <-chan bool {
options := &healthCheckOptions{}
applyDefaults(options)
for _, o := range opts {
o(options)
}
signalChan := make(chan bool, 1)
ticker := options.clock.NewTicker(options.interval)
ch := ticker.C()
go func() {
failureCount := &atomic.Int32{}
client := &fasthttp.Client{
MaxConnsPerHost: 5, // Limit Keep-Alive connections
ReadTimeout: options.requestTimeout,
MaxIdemponentCallAttempts: 1,
}
if options.initialDelay > 0 {
select {
case <-options.clock.After(options.initialDelay):
case <-ctx.Done():
}
}
for {
select {
case <-ch:
go doHealthCheck(client, endpointAddress, signalChan, failureCount, options)
case <-ctx.Done():
return
}
}
}()
return signalChan
}
func doHealthCheck(client *fasthttp.Client, endpointAddress string, signalChan chan bool, failureCount *atomic.Int32, options *healthCheckOptions) {
req := fasthttp.AcquireRequest()
req.SetRequestURI(endpointAddress)
req.Header.SetMethod(fasthttp.MethodGet)
defer fasthttp.ReleaseRequest(req)
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
err := client.DoTimeout(req, resp, options.requestTimeout)
if err != nil || resp.StatusCode() != options.successStatusCode {
if failureCount.Add(1) >= options.failureThreshold {
failureCount.Store(options.failureThreshold - 1)
signalChan <- false
}
} else {
signalChan <- true
failureCount.Store(0)
}
}
func applyDefaults(o *healthCheckOptions) {
o.failureThreshold = failureThreshold
o.initialDelay = initialDelay
o.requestTimeout = requestTimeout
o.successStatusCode = successStatusCode
o.interval = interval
o.clock = &kclock.RealClock{}
}
// WithInitialDelay sets the initial delay for the health check.
func WithInitialDelay(delay time.Duration) Option {
return func(o *healthCheckOptions) {
o.initialDelay = delay
}
}
// WithFailureThreshold sets the failure threshold for the health check.
func WithFailureThreshold(threshold int32) Option {
return func(o *healthCheckOptions) {
o.failureThreshold = threshold
}
}
// WithRequestTimeout sets the request timeout for the health check.
func WithRequestTimeout(timeout time.Duration) Option {
return func(o *healthCheckOptions) {
o.requestTimeout = timeout
}
}
// WithSuccessStatusCode sets the status code for the health check.
func WithSuccessStatusCode(code int) Option {
return func(o *healthCheckOptions) {
o.successStatusCode = code
}
}
// WithInterval sets the interval for the health check.
func WithInterval(interval time.Duration) Option {
return func(o *healthCheckOptions) {
o.interval = interval
}
}
// WithClock sets a custom clock (for mocking time).
func WithClock(clock kclock.WithTicker) Option {
return func(o *healthCheckOptions) {
o.clock = clock
}
}