@@ -140,7 +140,8 @@ func (conf *syncConfig) makeHotCache() {
140
140
return
141
141
} else {
142
142
for _ , node := range nodes {
143
- conf .cache .nodes [dc + "@" + node .Node ] = nil
143
+ nodeName := strings .Split (node .Node , "." )[0 ]
144
+ conf .cache .nodes [dc + "@" + nodeName ] = nil
144
145
}
145
146
}
146
147
}
@@ -154,9 +155,7 @@ func (conf *syncConfig) writeSyncData(data *syncData) {
154
155
defer conf .servicesWG .Done ()
155
156
path := fmt .Sprintf ("%s/%s" , data .service , data .value )
156
157
if data .node != "" {
157
- //
158
- nodeShortHostName := data .node [:strings .Index (data .node , "." )]
159
- path = fmt .Sprintf ("%s/%s" , nodeShortHostName , path )
158
+ path = fmt .Sprintf ("%s/%s" , data .node , path )
160
159
}
161
160
if data .dc != "" {
162
161
path = fmt .Sprintf ("%s/%s" , data .dc , path )
@@ -178,45 +177,91 @@ func (conf *syncConfig) writeSyncData(data *syncData) {
178
177
}
179
178
180
179
func (conf * syncConfig ) manageSession () {
181
- nodeName := fmt .Sprintf ("%s.%s" , conf .consulDC , conf .nodeName )
182
- // 1. register main node
183
- conf .consulClient .Catalog ().Register (& api.CatalogRegistration {
184
- Node : nodeName ,
185
- Address : conf .nodeAddr ,
186
- Datacenter : conf .consulDC ,
187
- SkipNodeUpdate : true ,
188
- }, nil )
189
- if conf .sessionID == "" {
190
- if sess , _ , e := conf .consulClient .Session ().CreateNoChecks (
191
- & api.SessionEntry {
192
- Node : nodeName ,
193
- Name : "befw-sync" ,
194
- TTL : "30s" ,
195
- }, nil ); e == nil {
196
- conf .sessionID = sess
180
+ //nodeName := fmt.Sprintf("%s.%s", conf.consulDC, conf.nodeName)
181
+ errcount := 0
182
+ for errcount < 10 {
183
+ //// 1. register main node
184
+ if _ , e := conf .consulClient .Catalog ().Register (& api.CatalogRegistration {
185
+ Node : conf .nodeName ,
186
+ Address : conf .nodeAddr ,
187
+ Datacenter : conf .consulDC ,
188
+ }, nil ); e != nil {
189
+ befw .LogWarning ("[Syncer] can't register a node!" )
197
190
}
198
- } else {
199
- if se , _ , e := conf .consulClient .Session ().Info (conf .sessionID , nil ); e != nil || se == nil {
200
- befw .LogDebug ("[Syncer] Can't find session:" , conf .sessionID )
201
- conf .sessionID = ""
202
- conf .manageSession () // a bit recursive
191
+ befw .LogDebug ("[Syncer] starting session creation" )
192
+ if conf .sessionID == "" {
193
+ if sess , _ , e := conf .consulClient .Session ().CreateNoChecks (
194
+ & api.SessionEntry {
195
+ Node : conf .nodeName ,
196
+ Name : "befw-sync" ,
197
+ TTL : "40s" ,
198
+ }, & api.WriteOptions {Datacenter : conf .consulDC }); e == nil {
199
+ conf .sessionID = sess
200
+ } else {
201
+ befw .LogWarning ("[Syncer] Can't create session: " , e .Error ())
202
+ errcount ++
203
+ continue
204
+ }
203
205
} else {
204
- conf .consulClient .Session ().Renew (conf .sessionID , nil )
206
+ if se , _ , e := conf .consulClient .Session ().Info (conf .sessionID , nil ); e != nil {
207
+ conf .sessionID = ""
208
+ befw .LogDebug ("[Syncer] error while getting session: " , conf .sessionID , ", " , e .Error ())
209
+ errcount ++
210
+ continue
211
+ } else if se == nil {
212
+ conf .sessionID = ""
213
+ befw .LogDebug ("[Syncer] Can't find session:" , conf .sessionID )
214
+ errcount ++
215
+ continue
216
+ }
217
+ if se , _ , e := conf .consulClient .Session ().Renew (conf .sessionID , nil ); e != nil {
218
+ conf .sessionID = ""
219
+ befw .LogDebug ("[Syncer] error while renewning session: " , conf .sessionID , ", " , e .Error ())
220
+ errcount ++
221
+ continue
222
+ } else if se == nil {
223
+ conf .sessionID = ""
224
+ befw .LogDebug ("[Syncer] Can't find session:" , conf .sessionID )
225
+ errcount ++
226
+ continue
227
+ }
205
228
}
229
+ break
206
230
}
231
+ befw .LogDebug ("[Syncer] got session " , conf .sessionID )
207
232
}
208
233
234
+ func (conf * syncConfig ) getSessionHolder (session string ) string {
235
+ if se , _ , e := conf .consulClient .Session ().Info (session , nil ); e == nil && se != nil {
236
+ return fmt .Sprintf ("%s@%s" , se .Name , se .Node )
237
+ }
238
+ return ""
239
+ }
240
+
241
+ var lastState bool
242
+
209
243
func (conf * syncConfig ) manageSessionLock () bool {
210
244
conf .manageSession ()
211
245
if conf .sessionID != "" {
212
246
if v , _ , e := conf .consulClient .KV ().Acquire (
213
247
& api.KVPair {Key : "befw/.lock" ,
214
- Value : []byte ("ok" ),
248
+ Value : []byte (conf . nodeName ),
215
249
Session : conf .sessionID ,
216
- }, nil ); e != nil {
250
+ }, & api. WriteOptions { Datacenter : conf . consulDC } ); e != nil {
217
251
befw .LogWarning ("[Syncer] Can't create lock:" , e .Error ())
218
252
return false
219
253
} else {
254
+ if ! v {
255
+ if kv , _ , e := conf .consulClient .KV ().Get ("befw/.lock" , nil ); e == nil {
256
+ if kv .Session != "" {
257
+ if si := conf .getSessionHolder (kv .Session ); si != "" {
258
+ befw .LogInfo ("[Syncer] key is locked by " , si )
259
+ }
260
+ }
261
+ }
262
+ } else {
263
+ befw .LogInfo ("[Syncer] Lock acquired by me" )
264
+ }
220
265
return v
221
266
}
222
267
}
0 commit comments