forked from activecm/rita-legacy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer.go
179 lines (158 loc) · 6.59 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package database
import (
"sync"
"github.com/activecm/rita/config"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
log "github.com/sirupsen/logrus"
)
type (
// BulkChange represents mgo upserts, updates, and removals
BulkChange struct {
Selector interface{} // The selector document
Update interface{} // The update document if updating the document
Upsert bool // Whether to insert in case the document isn't found
Remove bool // Whether to remove the document found rather than updating
SelectAll bool // Whether to use RemoveAll/ UpdateAll
}
// BulkChanges is a map of collections to the changes that should be applied to each one
BulkChanges map[string][]BulkChange
// MgoBulkWriter is a pipeline worker which properly batches bulk updates for MongoDB
MgoBulkWriter struct {
db *DB // provides access to MongoDB
conf *config.Config // contains details needed to access MongoDB
log *log.Logger // main logger for RITA
writeChannel chan BulkChanges // holds analyzed data
writeWg *sync.WaitGroup // wait for writing to finish
writerName string // used in error reporting
unordered bool // if the operations can be applied in any order, MongoDB can run the updates in parallel
maxBulkCount int // max number of changes to include in each bulk update
maxBulkSize int // max total size of BSON documents making up each bulk update
}
)
// Size serializes the changes to BSON using provided buffer and returns total size
// of the BSON description of the changes. Note this method slightly underestimates the
// total amount BSON needed to describe the changes since extra flags may be sent along.
func (m BulkChange) Size(buffer []byte) ([]byte, int) {
size := 0
if len(buffer) > 0 { // in case the byte slice has something in it already
buffer = buffer[:0]
}
if m.Selector != nil {
buffer, _ = bson.MarshalBuffer(m.Selector, buffer)
size += len(buffer)
buffer = buffer[:0]
}
if m.Update != nil {
buffer, _ = bson.MarshalBuffer(m.Update, buffer)
size += len(buffer)
buffer = buffer[:0]
}
return buffer, size
}
// Apply adds the change described to a bulk buffer
func (m BulkChange) Apply(bulk *mgo.Bulk) {
if m.Selector == nil {
return // can't describe a change without a selector
}
if m.Remove && m.SelectAll {
bulk.RemoveAll(m.Selector)
} else if m.Remove /*&& !m.SelectAll*/ {
bulk.Remove(m.Selector)
} else if m.Update != nil && m.Upsert {
bulk.Upsert(m.Selector, m.Update)
} else if m.Update != nil && m.SelectAll {
bulk.UpdateAll(m.Selector, m.Update)
} else if m.Update != nil /*&& !m.Upsert && !m.SelectAll*/ {
bulk.Update(m.Selector, m.Update)
}
}
// NewBulkWriter creates a new writer object to write output data to collections
func NewBulkWriter(db *DB, conf *config.Config, log *log.Logger, unorderedWritesOK bool, writerName string) *MgoBulkWriter {
return &MgoBulkWriter{
db: db,
conf: conf,
log: log,
writeChannel: make(chan BulkChanges),
writeWg: new(sync.WaitGroup),
writerName: writerName,
unordered: unorderedWritesOK,
// Cap the bulk buffers at 500 updates. 1000 should theoretically work, but we've run into issues in the past, so we halved it.
maxBulkCount: 500,
// Cap the bulk buffers at 15MB. This cap ensures that our bulk transactions don't exceed the 16MB limit imposed on MongoDB docs/ operations.
maxBulkSize: 15 * 1000 * 1000,
}
}
// Collect sends a group of results to the writer for writing out to the database
func (w *MgoBulkWriter) Collect(data BulkChanges) {
w.writeChannel <- data
}
// close waits for the write threads to finish
func (w *MgoBulkWriter) Close() {
close(w.writeChannel)
w.writeWg.Wait()
}
// start kicks off a new write thread
func (w *MgoBulkWriter) Start() {
w.writeWg.Add(1)
go func() {
ssn := w.db.Session.Copy()
defer ssn.Close()
bulkBuffers := map[string]*mgo.Bulk{} // stores a mgo.Bulk buffer for each collection
bulkBufferSizes := map[string]int{} // stores the size in bytes of the BSON documents in each mgo.Bulk buffer
bulkBufferLengths := map[string]int{} // stores the number of changes stored in each mgo.Bulk buffer
var sizeBuffer []byte // used (and re-used) for BSON serialization in order to calculate the size of each BSON doc
var changeSize int // holds the total size of each BSON serialized change before being added to bulkBufferSizes
for data := range w.writeChannel { // process data as it streams into the writer
for tgtColl, bulkChanges := range data { // loop through each collection that needs updated
// initialize/ grab the bulk buffer associated with this collection
bulkBuffer, bufferExists := bulkBuffers[tgtColl]
if !bufferExists {
bulkBuffer = ssn.DB(w.db.GetSelectedDB()).C(tgtColl).Bulk()
if w.unordered {
// if the order in which the updates occur doesn't matter, allow MongoDB to apply the updates in parallel
bulkBuffer.Unordered()
}
bulkBuffers[tgtColl] = bulkBuffer
}
for _, change := range bulkChanges { // loop through each change that needs to be applied to the collection
sizeBuffer, changeSize = change.Size(sizeBuffer)
// if the bulk buffer has already reached the max number of changes or
// if the total size of the bulk buffer would exceed the max size after inserting the current change
// run the existing bulk buffer against MongoDB
if bulkBufferLengths[tgtColl] >= w.maxBulkCount || bulkBufferSizes[tgtColl]+changeSize >= w.maxBulkSize {
info, err := bulkBuffer.Run()
if err != nil {
w.log.WithFields(log.Fields{
"Module": w.writerName,
"Collection": tgtColl,
"Info": info,
}).Error(err)
}
// make sure to reset the stats we are tracking about the bulk buffer
bulkBufferLengths[tgtColl] = 0
bulkBufferSizes[tgtColl] = 0
}
// insert the change into the bulk buffer and update the stats we are tracking about the bulk buffer
change.Apply(bulkBuffer)
bulkBufferLengths[tgtColl]++
bulkBufferSizes[tgtColl] += changeSize
}
}
}
// after the writer is done receiving inputs, make sure to drain all of the buffers before exiting
for tgtColl, bulkBuffer := range bulkBuffers {
info, err := bulkBuffer.Run()
if err != nil {
w.log.WithFields(log.Fields{
"Module": w.writerName,
"Collection": tgtColl,
"Info": info,
}).Error(err)
}
bulkBufferLengths[tgtColl] = 0
bulkBufferSizes[tgtColl] = 0
}
w.writeWg.Done()
}()
}