forked from thrasher-corp/gocryptotrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogger_multiwriter.go
152 lines (142 loc) · 4.51 KB
/
logger_multiwriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package log
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"time"
)
var (
errWriterAlreadyLoaded = errors.New("io.Writer already loaded")
errJobsChannelIsFull = errors.New("logger jobs channel is filled")
errWriterIsNil = errors.New("io writer is nil")
message Key = "message"
timestamp Key = "timestamp"
severity Key = "severity"
subLoggerName Key = "sublogger"
botName Key = "botname"
)
// loggerWorker handles all work staged to be written to configured io.Writer(s)
// This worker is generated in init() to handle full workload.
func loggerWorker() {
// Localise a persistent buffer for a worker, this does not need to be
// garbage collected.
buffer := make([]byte, 0, defaultBufferCapacity)
var n int
var err error
structuredOutbound := ExtraFields{}
for j := range jobsChannel {
if j.Passback != nil {
j.Passback <- struct{}{}
continue
}
msg := j.fn()
if j.StructuredLogging {
structuredOutbound[message] = msg
structuredOutbound[timestamp] = time.Now().UnixMilli()
structuredOutbound[severity] = j.Severity
structuredOutbound[subLoggerName] = j.SubLoggerName
structuredOutbound[botName] = j.Instance
for k, v := range j.StructuredFields {
_, ok := structuredOutbound[k]
if ok {
// Disallow overwriting of key values
displayError(fmt.Errorf("structured logging: cannot overwrite key [%s]", k))
continue
}
structuredOutbound[k] = v
}
buffer, err = json.Marshal(structuredOutbound)
if err != nil {
log.Println("log: failed to marshal structured log data:", err)
}
for k := range j.StructuredFields {
// Delete non-persistent structured fields
delete(structuredOutbound, k)
}
buffer = append(buffer, '\n')
} else {
buffer = append(buffer, j.Header...)
if j.ShowLogSystemName {
buffer = append(buffer, j.Spacer...)
buffer = append(buffer, []byte(j.SubLoggerName)...)
}
buffer = append(buffer, j.Spacer...)
if j.TimestampFormat != "" {
buffer = time.Now().AppendFormat(buffer, j.TimestampFormat)
}
buffer = append(buffer, j.Spacer...)
buffer = append(buffer, msg...)
if msg == "" || msg[len(msg)-1] != '\n' {
buffer = append(buffer, '\n')
}
}
for x := range j.Writers {
// NOTE: byte slice is not copied, this is a pointer to the buffer.
// This is only safe if the buffer is not modified after this point.
n, err = j.Writers[x].Write(buffer)
if err != nil {
displayError(fmt.Errorf("%T %w", j.Writers[x], err))
} else if n != len(buffer) {
displayError(fmt.Errorf("%T %w", j.Writers[x], io.ErrShortWrite))
}
}
buffer = buffer[:0] // Clean buffer for next use
jobsPool.Put(j)
}
}
// deferral defines functionality that will capture data string processing and
// defer that to the worker pool if needed.
type deferral func() string
// StageLogEvent stages a new logger event in a jobs channel to be processed by
// a worker pool. This segregates the need to process the log string and the
// writes to the required io.Writer.
func (mw *multiWriterHolder) StageLogEvent(fn deferral, header, slName, spacer, timestampFormat, instance, level string, showLogSystemName, bypassWarning, structuredLog bool, fields map[Key]interface{}) {
newJob := jobsPool.Get().(*job) //nolint:forcetypeassert // Not necessary from a pool
newJob.Writers = mw.writers
newJob.fn = fn
newJob.Header = header
newJob.SubLoggerName = slName
newJob.ShowLogSystemName = showLogSystemName
newJob.Spacer = spacer
newJob.TimestampFormat = timestampFormat
newJob.Instance = instance
newJob.StructuredFields = fields
newJob.StructuredLogging = structuredLog
newJob.Severity = level
select {
case jobsChannel <- newJob:
default:
// This will cause temporary caller impedance, which can have a knock
// on effect in processing.
if !bypassWarning {
log.Printf("Logger warning: %v\n", errJobsChannelIsFull)
}
jobsChannel <- newJob
}
}
// multiWriter make and return a new copy of multiWriterHolder
func multiWriter(writers ...io.Writer) (*multiWriterHolder, error) {
mw := &multiWriterHolder{}
for x := range writers {
err := mw.add(writers[x])
if err != nil {
return nil, err
}
}
return mw, nil
}
// Add appends a new writer to the multiwriter slice
func (mw *multiWriterHolder) add(writer io.Writer) error {
if writer == nil {
return errWriterIsNil
}
for i := range mw.writers {
if mw.writers[i] == writer {
return errWriterAlreadyLoaded
}
}
mw.writers = append(mw.writers, writer)
return nil
}