-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslash.go
264 lines (218 loc) · 6.98 KB
/
slash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package slash
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/shafiqaimanx/slash/pkg/limiter"
"github.com/shafiqaimanx/slash/pkg/worker"
)
// SlashClient provides the client instance for a request.
type SlashClient struct {
// Client is the HTTP client used to send requests.
Client *http.Client
// RateLimit controls the rate of requests to avoid overwhelming the server.
// For example, set it to `200 * time.Millisecond` (default).
RateLimit *limiter.Limiter
// Retries specifies the number of retry attempts for failed requests.
// Default is 1, which allows a total of 2 attempts (initial request + 1 retry).
// For example, setting it to 2 makes 3 total attempts.
Retries int
// Cache enables or disables response caching.
Cache bool
// WorkerPool manages the concurrency of requests by limiting the number
// of simultaneous operations.
WorkerPool *worker.WorkerPool
Ready bool // Ready indicates if the client is ready to handle requests.
errorQueue []error // Store errors until OnError is defined
visit *Visit // visit provides methods for performing HTTP requests.
cache sync.Map // cache stores cached responses for reuse when caching is enabled.
dCtx context.Context // dCtx is the default context for requests, supporting timeouts and cancellation.
mu sync.Mutex // mu ensures thread-safe access to shared fields.
errorHandler *ErrorCallback // errorHandler handles errors that occur during requests.
requestHandler *RequestCallback // requestHandler runs custom logic before each request is sent.
}
// NewSlashClient creates a new SlashClient instance with default configuration.
func NewSlashClient(options ...SlashClientOption) *SlashClient {
sc := &SlashClient{}
sc.Init()
for _, opt := range options {
opt(sc)
}
sc.visit = &Visit{slashClient: sc}
return sc
}
// Init initializes the SlashClient with default settings.
func (sc *SlashClient) Init() {
sc.Client = &http.Client{}
sc.Retries = 1
sc.Cache = true
sc.RateLimit = limiter.NewLimiter(200 * time.Millisecond)
sc.WorkerPool = worker.NewWorkerPool(5)
sc.dCtx = context.Background()
sc.Ready = true
sc.requestHandler = &RequestCallback{}
sc.errorHandler = &ErrorCallback{}
}
// OnRequest registers a callback HTTP request before sent.
func (sc *SlashClient) OnRequest(handler func(*Request)) {
sc.requestHandler.Handler = handler
}
// OnResponse registers a callback for responses on SlashClient.
func (sc *SlashClient) OnResponse(handler func(*Response)) {
sc.visit.onResponse = handler
}
// OnError registers a callback for errors on SlashClient.
func (sc *SlashClient) OnError(handler func(r *Response, err error)) {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.visit == nil {
sc.visit = &Visit{}
}
sc.visit.onError = handler
queuedErrors := sc.errorQueue
sc.errorQueue = nil // Clear the queue after processing
for _, err := range queuedErrors {
handler(nil, err)
}
}
// HandleError centralizes error handling and sets client state to not ready.
func (sc *SlashClient) HandleError(err error) {
sc.mu.Lock()
defer sc.mu.Unlock()
// Queue the error
sc.errorQueue = append(sc.errorQueue, err)
// Trigger the OnError callback if set
if sc.visit != nil && sc.visit.onError != nil {
sc.visit.onError(nil, err)
}
}
// Visit method in SlashClient that delegates to Visit.Call
func (sc *SlashClient) Visit(URL string) (*Response, error) {
return sc.visit.Call(URL)
}
// SetReady safely sets the ready state of the client.
func (sc *SlashClient) SetReady(ready bool) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.Ready = ready
}
// IsReady safely checks if the client is ready.
func (sc *SlashClient) IsReady() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.Ready
}
// Response wraps http.Response for additional methods.
type Response struct {
*http.Response
Body string
}
// Request wraps http.Request for additional methods.
type Request struct {
*http.Request
slashClient *SlashClient
}
// ErrorCallback struct holds the centralized error handler function.
type ErrorCallback struct {
Handler func(r *Response, err error)
}
// RequestCallback struct holds the centralized request handler function.
type RequestCallback struct {
Handler func(r *Request)
}
// Visit allows HTTP method requests like GET, POST, etc.
type Visit struct {
slashClient *SlashClient
onRequest func(*Request)
onResponse func(*Response)
onError func(*Response, error)
}
func (v *Visit) Call(URL string) (*Response, error) {
// Check if the client is ready before proceeding
if !v.slashClient.IsReady() {
return nil, fmt.Errorf("client is not ready for use")
}
ctx := v.slashClient.dCtx
req, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil)
if err != nil {
return nil, err
}
raw := &Request{Request: req, slashClient: v.slashClient}
// Call OnRequest handler if defined
if v.slashClient.requestHandler.Handler != nil {
v.slashClient.requestHandler.Handler(raw)
}
// If OnRequest handler was also set in Visit
if v.onRequest != nil {
v.onRequest(raw)
}
// Check if client is ready `before` entering the worker pool
if !v.slashClient.IsReady() {
return nil, fmt.Errorf("client is not ready for use")
}
var resp *Response
var requestError error
v.slashClient.WorkerPool.Run(func() {
resp, requestError = v.slashClient.execute(raw.Request)
})
v.slashClient.WorkerPool.Wait()
if requestError != nil {
v.slashClient.Ready = false
return nil, requestError
}
return resp, nil
}
// execute performs the HTTP request with retries, throttling, and caching.
func (sc *SlashClient) execute(req *http.Request) (*Response, error) {
if sc.Cache {
if v, ok := sc.cache.Load(req.URL.String()); ok {
return v.(*Response), nil
}
}
var resp *Response
var err error
sc.WorkerPool.Run(func() {
sc.RateLimit.Limit(func() {
resp, err = sc.doWithRetries(req)
})
})
if err == nil && sc.Cache {
sc.cache.Store(req.URL.String(), resp)
}
return resp, err
}
// doWithRetries performs the request with retries.
func (sc *SlashClient) doWithRetries(req *http.Request) (*Response, error) {
var lastErr error
for i := 0; i <= sc.Retries; i++ {
resp, err := sc.do(req)
if err == nil {
return resp, nil
}
lastErr = err
time.Sleep(time.Duration(i) * time.Second)
}
return nil, lastErr
}
// do executes the HTTP request.
func (sc *SlashClient) do(req *http.Request) (*Response, error) {
resp, err := sc.Client.Do(req)
if err != nil {
sc.HandleError(fmt.Errorf("request failed: %v", err))
return nil, err
}
defer resp.Body.Close()
// Stream response body
bBytes, _ := io.ReadAll(resp.Body)
wResp := &Response{
Response: resp,
Body: string(bBytes),
}
if sc.visit.onResponse != nil {
sc.visit.onResponse(wResp)
}
return wResp, nil
}