diff --git a/geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java index bc321f1c4e15..7a8347b14b50 100644 --- a/geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java +++ b/geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java @@ -43,11 +43,15 @@ public ConcurrentLoopingThreads(int iterationCount, * Start the operations asynchronously. Use {@link #await()} to wait for completion. */ public ConcurrentLoopingThreads start() { + return start(false); + } + + private ConcurrentLoopingThreads start(boolean lockstep) { CyclicBarrier latch = new CyclicBarrier(functions.length); loopingFutures = Arrays .stream(functions) - .map(r -> new LoopingThread(r, iterationCount, latch)) + .map(r -> new LoopingThread(r, iterationCount, latch, lockstep)) .map(t -> executorService.submit(t)) .collect(Collectors.toList()); @@ -72,7 +76,15 @@ public void await() { * Start operations and only return once all are complete. */ public void run() { - start(); + start(false); + await(); + } + + /** + * Start operations and run each iteration in lockstep + */ + public void runInLockstep() { + start(true); await(); } @@ -80,32 +92,41 @@ private static class LoopingRunnable implements Runnable { private final Consumer runnable; private final int iterationCount; private final CyclicBarrier barrier; + private final boolean lockstep; public LoopingRunnable(Consumer runnable, int iterationCount, - CyclicBarrier barrier) { + CyclicBarrier barrier, boolean lockstep) { this.runnable = runnable; this.iterationCount = iterationCount; this.barrier = barrier; + this.lockstep = lockstep; } @Override public void run() { + waitForBarrier(); + for (int i = 0; i < iterationCount; i++) { + if (lockstep) { + waitForBarrier(); + } + runnable.accept(i); + Thread.yield(); + } + } + + private void waitForBarrier() { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } - for (int i = 0; i < iterationCount; i++) { - runnable.accept(i); - Thread.yield(); - } } } private static class LoopingThread extends Thread { public LoopingThread(Consumer runnable, int iterationCount, - CyclicBarrier barrier) { - super(new LoopingRunnable(runnable, iterationCount, barrier)); + CyclicBarrier barrier, boolean lockstep) { + super(new LoopingRunnable(runnable, iterationCount, barrier, lockstep)); } } }