forked from ali-sdk/ali-rds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.js
119 lines (105 loc) · 2.63 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
'use strict';
const util = require('util');
const mysql = require('mysql');
const wrap = require('co-wrap-all');
const Operator = require('./operator');
const RDSConnection = require('./connection');
const RDSTransaction = require('./transaction');
const promisify = require('pify');
module.exports = RDSClient;
module.exports.literals = require('./literals');
function RDSClient(options) {
if (!(this instanceof RDSClient)) {
return new RDSClient(options);
}
Operator.call(this);
this.pool = mysql.createPool(options);
[
'query',
'getConnection',
].forEach(method => {
this.pool[method] = promisify(this.pool[method]);
});
}
util.inherits(RDSClient, Operator);
const proto = RDSClient.prototype;
proto._query = function(sql) {
return this.pool.query(sql);
};
proto.getConnection = function() {
return this.pool.getConnection().then(onConnection, onError);
function onConnection(conn) {
return new RDSConnection(conn);
}
function onError(err) {
if (err.name === 'Error') {
err.name = 'RDSClientGetConnectionError';
}
throw err;
}
};
/**
* Begin a transaction
*
* @return {Transaction} transaction instance
*/
proto.beginTransaction = function* () {
const conn = yield this.getConnection();
try {
yield conn.beginTransaction();
} catch (err) {
conn.release();
throw err;
}
return new RDSTransaction(conn);
};
/**
* Auto commit or rollback on a transaction scope
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @return {Object} - scope return result
*/
proto.beginTransactionScope = function* (scope, ctx) {
ctx = ctx || {};
if (!ctx._transactionConnection) {
ctx._transactionConnection = yield this.beginTransaction();
ctx._transactionScopeCount = 1;
} else {
ctx._transactionScopeCount++;
}
const tran = ctx._transactionConnection;
try {
const result = yield scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
yield tran.commit();
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
yield tran.rollback();
}
throw err;
}
};
proto.end = function(callback) {
// callback style
if (callback) {
return this.pool.end(callback);
}
// promise style
const that = this;
return new Promise(function(resolve, reject) {
that.pool.end(function(err) {
if (err) {
return reject(err);
}
resolve();
});
});
};
wrap(proto);