-
Notifications
You must be signed in to change notification settings - Fork 41
/
service.go
137 lines (121 loc) · 3.22 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"errors"
"fmt"
"snowflake/etcdclient"
pb "snowflake/proto"
"strconv"
"sync"
"time"
cli "gopkg.in/urfave/cli.v2"
log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context"
)
const (
BACKOFF = 100 // max backoff delay millisecond
CONCURRENT = 128 // max concurrent connections to etcd
UUID_QUEUE = 1024 // uuid process queue
)
const (
TS_MASK = 0x1FFFFFFFFFF // 41bit
SN_MASK = 0xFFF // 12bit
MACHINE_ID_MASK = 0x3FF // 10bit
)
type server struct {
pkroot string
uuidkey string
machine_id uint64 // 10-bit machine id
ch_proc chan chan uint64
muNext sync.Mutex
}
func (s *server) init(c *cli.Context) {
etcdclient.Init(c)
s.ch_proc = make(chan chan uint64, UUID_QUEUE)
// shifted machine id
s.machine_id = (uint64(c.Int("machine-id")) & MACHINE_ID_MASK) << 12
s.pkroot = c.String("pk-root")
s.uuidkey = c.String("uuid-key")
go s.uuid_task()
}
// get next value of a key, like auto-increment in mysql
func (s *server) Next(ctx context.Context, in *pb.Snowflake_Key) (*pb.Snowflake_Value, error) {
s.muNext.Lock()
defer s.muNext.Unlock()
client := etcdclient.KeysAPI()
key := s.pkroot + "/" + in.Name
for {
// get the key
resp, err := client.Get(context.Background(), key, nil)
if err != nil {
log.Error(err)
return nil, errors.New("Key not exists, need to create first")
}
// get prevValue & prevIndex
prevValue, err := strconv.Atoi(resp.Node.Value)
if err != nil {
log.Error(err)
return nil, errors.New("marlformed value")
}
prevIndex := resp.Node.ModifiedIndex
// CompareAndSwap
resp, err = client.Set(context.Background(), key, fmt.Sprint(prevValue+1), &etcd.SetOptions{PrevIndex: prevIndex})
if err != nil {
log.Warn(err)
continue
}
return &pb.Snowflake_Value{int64(prevValue + 1)}, nil
}
}
// generate an unique uuid
func (s *server) GetUUID(context.Context, *pb.Snowflake_NullRequest) (*pb.Snowflake_UUID, error) {
req := make(chan uint64, 1)
s.ch_proc <- req
return &pb.Snowflake_UUID{<-req}, nil
}
// uuid generator
func (s *server) uuid_task() {
var sn uint64 // 12-bit serial no
var last_ts int64 // last timestamp
for {
ret := <-s.ch_proc
// get a correct serial number
t := ts()
if t < last_ts { // clock shift backward
log.Warn("clock shift happened, waiting until the clock moving to the next millisecond.")
t = s.wait_ms(last_ts)
}
if last_ts == t { // same millisecond
sn = (sn + 1) & SN_MASK
if sn == 0 { // serial number overflows, wait until next ms
t = s.wait_ms(last_ts)
}
} else { // new millsecond, reset serial number to 0
sn = 0
}
// remember last timestamp
last_ts = t
// generate uuid, format:
//
// 0 0.................0 0..............0 0........0
// 1-bit 41bit timestamp 10bit machine-id 12bit sn
var uuid uint64
uuid |= (uint64(t) & TS_MASK) << 22
uuid |= s.machine_id
uuid |= sn
ret <- uuid
}
}
// wait_ms will wait untill last_ts
func (s *server) wait_ms(last_ts int64) int64 {
t := ts()
for t < last_ts {
time.Sleep(time.Duration(last_ts-t) * time.Millisecond)
t = ts()
}
return t
}
// get timestamp
func ts() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}