forked from elasticsearch-dump/elasticsearch-dump
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elasticdump.js
87 lines (74 loc) · 2.89 KB
/
elasticdump.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const http = require('http')
const https = require('https')
const TransportProcessor = require('./lib/processor')
const vm = require('vm')
const { promisify } = require('util')
const ioHelper = require('./lib/ioHelper')
const path = require('path')
class ElasticDump extends TransportProcessor {
constructor (input, output, options) {
super()
this.input = input
this.output = output
this.options = options
this.modifiers = []
if (output !== '$' && (this.options.toLog === null || this.options.toLog === undefined)) {
this.options.toLog = true
}
this.validationErrors = this.validateOptions()
if (options.maxSockets) {
this.log(`globally setting maxSockets=${options.maxSockets}`)
http.globalAgent.maxSockets = options.maxSockets
https.globalAgent.maxSockets = options.maxSockets
}
ioHelper(this, 'input')
ioHelper(this, 'output')
if (this.options.type === 'data' && this.options.transform) {
if (!(this.options.transform instanceof Array)) {
this.options.transform = [this.options.transform]
}
this.modifiers = this.options.transform.map(transform => {
if (transform[0] === '@') {
return doc => {
const filePath = transform.slice(1).split('?')
const resolvedFilePath = path.resolve(process.cwd(), filePath[0])
return require(resolvedFilePath)(doc, ElasticDump.getParams(filePath[1]))
}
} else {
const modificationScriptText = `(function(doc) { ${transform} })`
return new vm.Script(modificationScriptText).runInThisContext()
}
})
}
}
dump (callback, continuing, limit, offset, totalWrites) {
if (this.validationErrors.length > 0) {
this.emit('error', { errors: this.validationErrors })
callback(new Error('There was an error starting this dump'))
return
}
// promisify helpers
this.get = promisify(this.output.get).bind(this.input)
this.set = promisify(this.output.set).bind(this.output)
if (!limit) { limit = this.options.limit }
if (!offset) { offset = this.options.offset }
if (!totalWrites) { totalWrites = 0 }
if (continuing !== true) {
this.log('starting dump')
if (this.options.offset) {
this.log(`Warning: offsetting ${this.options.offset} rows.`)
this.log(' * Using an offset doesn\'t guarantee that the offset rows have already been written, please refer to the HELP text.')
}
if (this.modifiers.length) {
this.log(`Will modify documents using these scripts: ${this.options.transform}`)
}
}
this._loop(limit, offset, totalWrites)
.then((totalWrites) => {
if (typeof callback === 'function') { return callback(null, totalWrites) }
}, (error) => {
if (typeof callback === 'function') { return callback(error/*, totalWrites */) }
})
}
}
module.exports = ElasticDump