37
37
import io .druid .query .lookup .namespace .ExtractionNamespace ;
38
38
import io .druid .query .lookup .namespace .ExtractionNamespaceCacheFactory ;
39
39
40
+ import javax .annotation .concurrent .GuardedBy ;
40
41
import java .util .Collection ;
41
42
import java .util .Map ;
42
43
import java .util .UUID ;
49
50
import java .util .concurrent .TimeUnit ;
50
51
import java .util .concurrent .atomic .AtomicBoolean ;
51
52
import java .util .concurrent .atomic .AtomicLong ;
53
+ import java .util .concurrent .atomic .AtomicReference ;
52
54
53
55
/**
54
56
*
@@ -71,16 +73,17 @@ public NamespaceImplData(
71
73
final ListenableFuture <?> future ;
72
74
final ExtractionNamespace namespace ;
73
75
final String name ;
76
+ final Object changeLock = new Object ();
74
77
final AtomicBoolean enabled = new AtomicBoolean (false );
75
78
final CountDownLatch firstRun = new CountDownLatch (1 );
79
+ final AtomicReference <String > latestVersion = new AtomicReference <>(null );
76
80
}
77
81
78
82
private static final Logger log = new Logger (NamespaceExtractionCacheManager .class );
79
83
private final ListeningScheduledExecutorService listeningScheduledExecutorService ;
80
84
protected final ConcurrentMap <String , NamespaceImplData > implData = new ConcurrentHashMap <>();
81
85
protected final AtomicLong tasksStarted = new AtomicLong (0 );
82
86
protected final ServiceEmitter serviceEmitter ;
83
- private final ConcurrentHashMap <String , String > lastVersion = new ConcurrentHashMap <>();
84
87
private final Map <Class <? extends ExtractionNamespace >, ExtractionNamespaceCacheFactory <?>> namespaceFunctionFactoryMap ;
85
88
86
89
public NamespaceExtractionCacheManager (
@@ -148,10 +151,8 @@ protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws Interrupt
148
151
}
149
152
150
153
151
- protected < T extends ExtractionNamespace > Runnable getPostRunnable (
154
+ protected Runnable getPostRunnable (
152
155
final String id ,
153
- final T namespace ,
154
- final ExtractionNamespaceCacheFactory <T > factory ,
155
156
final String cacheId
156
157
)
157
158
{
@@ -165,17 +166,20 @@ public void run()
165
166
// was removed
166
167
return ;
167
168
}
168
- synchronized (namespaceDatum .enabled ) {
169
- try {
169
+ try {
170
+ if (!namespaceDatum .enabled .get ()) {
171
+ // skip because it was disabled
172
+ return ;
173
+ }
174
+ synchronized (namespaceDatum .enabled ) {
170
175
if (!namespaceDatum .enabled .get ()) {
171
- // skip because it was disabled
172
176
return ;
173
177
}
174
178
swapAndClearCache (id , cacheId );
175
179
}
176
- finally {
177
- namespaceDatum . firstRun . countDown ();
178
- }
180
+ }
181
+ finally {
182
+ namespaceDatum . firstRun . countDown ();
179
183
}
180
184
}
181
185
};
@@ -221,7 +225,10 @@ public boolean scheduleOrUpdate(
221
225
if (log .isDebugEnabled ()) {
222
226
log .debug ("Namespace [%s] needs updated to [%s]" , implDatum .namespace , namespace );
223
227
}
224
- removeNamespaceLocalMetadata (implDatum );
228
+ // Ensure it is not changing state right now.
229
+ synchronized (implDatum .changeLock ) {
230
+ removeNamespaceLocalMetadata (implDatum );
231
+ }
225
232
schedule (id , namespace );
226
233
return true ;
227
234
}
@@ -257,59 +264,59 @@ public boolean scheduleAndWait(
257
264
return success ;
258
265
}
259
266
267
+ @ GuardedBy ("implDatum.changeLock" )
260
268
private void cancelFuture (final NamespaceImplData implDatum )
261
269
{
262
- synchronized (implDatum .enabled ) {
263
- final CountDownLatch latch = new CountDownLatch (1 );
264
- final ListenableFuture <?> future = implDatum .future ;
265
- Futures .addCallback (
266
- future , new FutureCallback <Object >()
270
+ final CountDownLatch latch = new CountDownLatch (1 );
271
+ final ListenableFuture <?> future = implDatum .future ;
272
+ Futures .addCallback (
273
+ future , new FutureCallback <Object >()
274
+ {
275
+ @ Override
276
+ public void onSuccess (Object result )
267
277
{
268
- @ Override
269
- public void onSuccess (Object result )
270
- {
271
- latch .countDown ();
272
- }
278
+ latch .countDown ();
279
+ }
273
280
274
- @ Override
275
- public void onFailure (Throwable t )
276
- {
277
- // Expect CancellationException
278
- latch .countDown ();
279
- if (!(t instanceof CancellationException )) {
280
- log .error (t , "Error in namespace [%s]" , implDatum .name );
281
- }
281
+ @ Override
282
+ public void onFailure (Throwable t )
283
+ {
284
+ // Expect CancellationException
285
+ latch .countDown ();
286
+ if (!(t instanceof CancellationException )) {
287
+ log .error (t , "Error in namespace [%s]" , implDatum .name );
282
288
}
283
289
}
284
- );
285
- if (! future . isDone ()
286
- && !future .cancel ( true )) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up
287
- throw new ISE ( "Future for namespace [%s] was not able to be canceled" , implDatum . name );
288
- }
289
- try {
290
- latch . await ();
291
- }
292
- catch ( InterruptedException e ) {
293
- Thread . currentThread (). interrupt ();
294
- throw Throwables . propagate ( e );
295
- }
290
+ }
291
+ );
292
+ if ( !future .isDone ()
293
+ && ! future . cancel ( true )) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up
294
+ throw new ISE ( "Future for namespace [%s] was not able to be canceled" , implDatum . name );
295
+ }
296
+ try {
297
+ latch . await ();
298
+ }
299
+ catch ( InterruptedException e ) {
300
+ Thread . currentThread (). interrupt ( );
301
+ throw Throwables . propagate ( e );
296
302
}
297
303
}
298
304
305
+ // Not thread safe
306
+ @ GuardedBy ("implDatum.changeLock" )
299
307
private boolean removeNamespaceLocalMetadata (final NamespaceImplData implDatum )
300
308
{
301
309
if (implDatum == null ) {
302
310
return false ;
303
311
}
304
- synchronized (implDatum .enabled ) {
305
- if (!implDatum .enabled .compareAndSet (true , false )) {
306
- return false ;
307
- }
308
- if (!implDatum .future .isDone ()) {
309
- cancelFuture (implDatum );
310
- }
311
- return implData .remove (implDatum .name , implDatum );
312
+ // "Leader" election for doing the deletion
313
+ if (!implDatum .enabled .compareAndSet (true , false )) {
314
+ return false ;
315
+ }
316
+ if (!implDatum .future .isDone ()) {
317
+ cancelFuture (implDatum );
312
318
}
319
+ return implData .remove (implDatum .name , implDatum );
313
320
}
314
321
315
322
// Optimistic scheduling of updates to a namespace.
@@ -321,7 +328,7 @@ public <T extends ExtractionNamespace> ListenableFuture<?> schedule(final String
321
328
throw new ISE ("Cannot find factory for namespace [%s]" , namespace );
322
329
}
323
330
final String cacheId = String .format ("namespace-cache-%s-%s" , id , UUID .randomUUID ().toString ());
324
- return schedule (id , namespace , factory , getPostRunnable (id , namespace , factory , cacheId ), cacheId );
331
+ return schedule (id , namespace , factory , getPostRunnable (id , cacheId ), cacheId );
325
332
}
326
333
327
334
// For testing purposes this is protected
@@ -336,7 +343,7 @@ protected <T extends ExtractionNamespace> ListenableFuture<?> schedule(
336
343
log .debug ("Trying to update namespace [%s]" , id );
337
344
final NamespaceImplData implDatum = implData .get (id );
338
345
if (implDatum != null ) {
339
- synchronized (implDatum .enabled ) {
346
+ synchronized (implDatum .changeLock ) {
340
347
if (implDatum .enabled .get ()) {
341
348
// We also check at the end of the function, but fail fast here
342
349
throw new IAE ("Namespace [%s] already exists! Leaving prior running" , namespace .toString ());
@@ -345,6 +352,8 @@ protected <T extends ExtractionNamespace> ListenableFuture<?> schedule(
345
352
}
346
353
final long updateMs = namespace .getPollMs ();
347
354
final CountDownLatch startLatch = new CountDownLatch (1 );
355
+ // Must be set before leader election occurs or else runnable will fail
356
+ final AtomicReference <NamespaceImplData > implDataAtomicReference = new AtomicReference <>(null );
348
357
349
358
final Runnable command = new Runnable ()
350
359
{
@@ -354,8 +363,13 @@ public void run()
354
363
try {
355
364
startLatch .await (); // wait for "election" to leadership or cancellation
356
365
if (!Thread .currentThread ().isInterrupted ()) {
366
+ final NamespaceImplData implData = implDataAtomicReference .get ();
367
+ if (implData == null ) {
368
+ // should never happen
369
+ throw new NullPointerException (String .format ("No data for namespace [%s]" , id ));
370
+ }
357
371
final Map <String , String > cache = getCacheMap (cacheId );
358
- final String preVersion = lastVersion . get (id );
372
+ final String preVersion = implData . latestVersion . get ();
359
373
final Callable <String > runnable = factory .getCachePopulator (id , namespace , preVersion , cache );
360
374
361
375
tasksStarted .incrementAndGet ();
@@ -364,7 +378,9 @@ public void run()
364
378
throw new CancellationException (String .format ("Version `%s` already exists" , preVersion ));
365
379
}
366
380
if (newVersion != null ) {
367
- lastVersion .put (id , newVersion );
381
+ if (!implData .latestVersion .compareAndSet (preVersion , newVersion )) {
382
+ log .wtf ("Somehow multiple threads are updating the same implData for [%s]" , id );
383
+ }
368
384
}
369
385
postRunnable .run ();
370
386
log .debug ("Namespace [%s] successfully updated" , id );
@@ -392,7 +408,9 @@ public void run()
392
408
future = listeningScheduledExecutorService .schedule (command , 0 , TimeUnit .MILLISECONDS );
393
409
}
394
410
411
+ // Do not need to synchronize here as we haven't set enabled to true yet, and haven't released startLatch
395
412
final NamespaceImplData me = new NamespaceImplData (future , namespace , id );
413
+ implDataAtomicReference .set (me );
396
414
final NamespaceImplData other = implData .putIfAbsent (id , me );
397
415
if (other != null ) {
398
416
if (!future .isDone () && !future .cancel (true )) {
@@ -433,8 +451,6 @@ public void run()
433
451
434
452
/**
435
453
* Clears out resources used by the namespace such as threads. Implementations may override this and call super.delete(...) if they have resources of their own which need cleared.
436
- * <p/>
437
- * This particular method is NOT thread safe, and any impl which is intended to be thread safe should safe-guard calls to this method.
438
454
*
439
455
* @param ns The namespace to be deleted
440
456
*
@@ -445,25 +461,31 @@ public void run()
445
461
public boolean delete (final String ns )
446
462
{
447
463
final NamespaceImplData implDatum = implData .get (ns );
448
- final boolean deleted = removeNamespaceLocalMetadata (implDatum );
449
- // At this point we have won leader election on canceling this implDatum
450
- if (deleted ) {
451
- log .info ("Deleting namespace [%s]" , ns );
452
- lastVersion .remove (implDatum .name );
453
- return true ;
454
- } else {
455
- log .debug ("Did not delete namespace [%s]" , ns );
464
+ if (implDatum == null ) {
465
+ log .debug ("Found no running cache for [%s]" , ns );
456
466
return false ;
457
467
}
468
+ synchronized (implDatum .changeLock ) {
469
+ if (removeNamespaceLocalMetadata (implDatum )) {
470
+ log .info ("Deleted namespace [%s]" , ns );
471
+ return true ;
472
+ } else {
473
+ log .debug ("Did not delete namespace [%s]" , ns );
474
+ return false ;
475
+ }
476
+ }
458
477
}
459
478
460
479
public String getVersion (String namespace )
461
480
{
462
481
if (namespace == null ) {
463
482
return null ;
464
- } else {
465
- return lastVersion .get (namespace );
466
483
}
484
+ final NamespaceImplData implDatum = implData .get (namespace );
485
+ if (implDatum == null ) {
486
+ return null ;
487
+ }
488
+ return implDatum .latestVersion .get ();
467
489
}
468
490
469
491
public Collection <String > getKnownIDs ()
0 commit comments