forked from redis/go-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi.go
132 lines (110 loc) · 2.31 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package redis
import (
"fmt"
"sync"
)
type MultiClient struct {
*Client
execMtx sync.Mutex
}
func (c *Client) MultiClient() (*MultiClient, error) {
return &MultiClient{
Client: &Client{
BaseClient: &BaseClient{
ConnPool: NewSingleConnPool(c.ConnPool, true),
InitConn: c.InitConn,
},
},
}, nil
}
func (c *MultiClient) Close() error {
c.Unwatch()
return c.Client.Close()
}
func (c *MultiClient) Watch(keys ...string) *StatusReq {
args := append([]string{"WATCH"}, keys...)
req := NewStatusReq(args...)
c.Process(req)
return req
}
func (c *MultiClient) Unwatch(keys ...string) *StatusReq {
args := append([]string{"UNWATCH"}, keys...)
req := NewStatusReq(args...)
c.Process(req)
return req
}
func (c *MultiClient) Discard() {
c.reqsMtx.Lock()
if c.reqs == nil {
panic("Discard can be used only inside Exec")
}
c.reqs = c.reqs[:1]
c.reqsMtx.Unlock()
}
func (c *MultiClient) Exec(do func()) ([]Req, error) {
c.reqsMtx.Lock()
c.reqs = []Req{NewStatusReq("MULTI")}
c.reqsMtx.Unlock()
do()
c.Queue(NewIfaceSliceReq("EXEC"))
c.reqsMtx.Lock()
reqs := c.reqs
c.reqs = nil
c.reqsMtx.Unlock()
if len(reqs) == 2 {
return []Req{}, nil
}
conn, err := c.conn()
if err != nil {
return nil, err
}
// Synchronize writes and reads to the connection using mutex.
c.execMtx.Lock()
err = c.ExecReqs(reqs, conn)
c.execMtx.Unlock()
if err != nil {
c.ConnPool.Remove(conn)
return nil, err
}
c.ConnPool.Add(conn)
return reqs[1 : len(reqs)-1], nil
}
func (c *MultiClient) ExecReqs(reqs []Req, conn *Conn) error {
err := c.WriteReq(conn, reqs...)
if err != nil {
return err
}
statusReq := NewStatusReq()
// Omit last request (EXEC).
reqsLen := len(reqs) - 1
// Parse queued replies.
for i := 0; i < reqsLen; i++ {
_, err = statusReq.ParseReply(conn.Rd)
if err != nil {
return err
}
}
// Parse number of replies.
line, err := readLine(conn.Rd)
if err != nil {
return err
}
if line[0] != '*' {
return fmt.Errorf("Expected '*', but got line %q", line)
}
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return Nil
}
// Parse replies.
// Loop starts from 1 to omit first request (MULTI).
for i := 1; i < reqsLen; i++ {
req := reqs[i]
val, err := req.ParseReply(conn.Rd)
if err != nil {
req.SetErr(err)
} else {
req.SetVal(val)
}
}
return nil
}