-
Notifications
You must be signed in to change notification settings - Fork 63
/
mutex.go
97 lines (74 loc) · 1.93 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package yiigo
import (
"context"
"runtime/debug"
"time"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// MutexHandler the function to execute after lock acquired.
type MutexHandler func(ctx context.Context) error
// Mutex is a reader/writer mutual exclusion lock.
type Mutex interface {
// Acquire attempt to acquire lock at regular intervals.
Acquire(ctx context.Context, callback MutexHandler, interval, timeout time.Duration) error
}
type distributed struct {
pool RedisPool
key string
expire int64
}
func (d *distributed) Acquire(ctx context.Context, callback MutexHandler, interval, timeout time.Duration) error {
mutexCtx := ctx
if timeout > 0 {
var cancel context.CancelFunc
mutexCtx, cancel = context.WithTimeout(mutexCtx, timeout)
defer cancel()
}
conn, err := d.pool.Get(mutexCtx)
if err != nil {
return errors.Wrap(err, "redis conn")
}
defer d.pool.Put(conn)
for {
select {
case <-mutexCtx.Done():
// timeout or canceled
return errors.Wrap(mutexCtx.Err(), "mutex context")
default:
}
// attempt to acquire lock with `setnx`
reply, err := redis.String(conn.Do("SET", d.key, time.Now().Nanosecond(), "EX", d.expire, "NX"))
if err != nil && err != redis.ErrNil {
return errors.Wrap(err, "redis setnx")
}
if reply == "OK" {
break
}
time.Sleep(interval)
}
// release lock
defer func() {
defer conn.Do("DEL", d.key)
if err := recover(); err != nil {
logger.Error("mutex callback panic",
zap.Any("error", err),
zap.ByteString("stack", debug.Stack()),
)
}
}()
return callback(ctx)
}
// DistributedMutex returns a simple distributed mutual exclusion lock.
func DistributedMutex(redisName, mutexKey string, expire time.Duration) Mutex {
mutex := &distributed{
pool: Redis(redisName),
key: mutexKey,
expire: 10,
}
if seconds := expire.Seconds(); seconds > 0 {
mutex.expire = int64(seconds)
}
return mutex
}