forked from lightningnetwork/lnd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
write.go
100 lines (84 loc) · 2.96 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package pool
import (
"bytes"
"time"
"github.com/lightningnetwork/lnd/buffer"
)
// Write is a worker pool specifically designed for sharing access to
// buffer.Write objects amongst a set of worker goroutines. This enables an
// application to limit the total number of buffer.Write objects allocated at
// any given time.
type Write struct {
workerPool *Worker
bufferPool *WriteBuffer
}
// NewWrite creates a Write pool, using an underlying Writebuffer pool to
// recycle buffer.Write objects across the lifetime of the Write pool's
// workers.
func NewWrite(writeBufferPool *WriteBuffer, numWorkers int,
workerTimeout time.Duration) *Write {
w := &Write{
bufferPool: writeBufferPool,
}
w.workerPool = NewWorker(&WorkerConfig{
NewWorkerState: w.newWorkerState,
NumWorkers: numWorkers,
WorkerTimeout: workerTimeout,
})
return w
}
// Start safely spins up the Write pool.
func (w *Write) Start() error {
return w.workerPool.Start()
}
// Stop safely shuts down the Write pool.
func (w *Write) Stop() error {
return w.workerPool.Stop()
}
// Submit accepts a function closure that provides access to a fresh
// bytes.Buffer backed by a buffer.Write object. The function's execution will
// be allocated to one of the underlying Worker pool's goroutines.
func (w *Write) Submit(inner func(*bytes.Buffer) error) error {
return w.workerPool.Submit(func(s WorkerState) error {
state := s.(*writeWorkerState)
return inner(state.buf)
})
}
// writeWorkerState is the per-goroutine state maintained by a Write pool's
// goroutines.
type writeWorkerState struct {
// bufferPool is the pool to which the writeBuf will be returned when
// the goroutine exits.
bufferPool *WriteBuffer
// writeBuf is the buffer taken from the bufferPool on initialization,
// which will be used to back the buf object provided to any tasks that
// the goroutine processes before exiting.
writeBuf *buffer.Write
// buf is a buffer backed by writeBuf, that can be written to by tasks
// submitted to the Write pool. The buf will be reset between each task
// processed by a goroutine before exiting, and allows the task
// submitters to interact with the writeBuf as if it were an io.Writer.
buf *bytes.Buffer
}
// newWorkerState initializes a new writeWorkerState, which will be called
// whenever a new goroutine is allocated to begin processing write tasks.
func (w *Write) newWorkerState() WorkerState {
writeBuf := w.bufferPool.Take()
return &writeWorkerState{
bufferPool: w.bufferPool,
writeBuf: writeBuf,
buf: bytes.NewBuffer(writeBuf[0:0:len(writeBuf)]),
}
}
// Cleanup returns the writeBuf to the underlying buffer pool, and removes the
// goroutine's reference to the readBuf and encapsulating buf.
func (w *writeWorkerState) Cleanup() {
w.bufferPool.Return(w.writeBuf)
w.writeBuf = nil
w.buf = nil
}
// Reset resets the bytes.Buffer so that it is zero-length and has the capacity
// of the underlying buffer.Write.k
func (w *writeWorkerState) Reset() {
w.buf.Reset()
}