Skip to content

Commit

Permalink
GEODE-8913: Add ability for ConcurrentLoopingThreads to run in lockst…
Browse files Browse the repository at this point in the history
…ep (apache#5997)

- Can be invoked by calling `.runInLockstep()`.
- This simply adds a CyclicBarrier before the invocation of each
  iteration and should produce a bit more contention.
  • Loading branch information
jdeppe-pivotal authored Feb 3, 2021
1 parent 4988d83 commit d44c07f
Showing 1 changed file with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ public ConcurrentLoopingThreads(int iterationCount,
* Start the operations asynchronously. Use {@link #await()} to wait for completion.
*/
public ConcurrentLoopingThreads start() {
return start(false);
}

private ConcurrentLoopingThreads start(boolean lockstep) {
CyclicBarrier latch = new CyclicBarrier(functions.length);

loopingFutures = Arrays
.stream(functions)
.map(r -> new LoopingThread(r, iterationCount, latch))
.map(r -> new LoopingThread(r, iterationCount, latch, lockstep))
.map(t -> executorService.submit(t))
.collect(Collectors.toList());

Expand All @@ -72,40 +76,57 @@ public void await() {
* Start operations and only return once all are complete.
*/
public void run() {
start();
start(false);
await();
}

/**
* Start operations and run each iteration in lockstep
*/
public void runInLockstep() {
start(true);
await();
}

private static class LoopingRunnable implements Runnable {
private final Consumer<Integer> runnable;
private final int iterationCount;
private final CyclicBarrier barrier;
private final boolean lockstep;

public LoopingRunnable(Consumer<Integer> runnable, int iterationCount,
CyclicBarrier barrier) {
CyclicBarrier barrier, boolean lockstep) {
this.runnable = runnable;
this.iterationCount = iterationCount;
this.barrier = barrier;
this.lockstep = lockstep;
}

@Override
public void run() {
waitForBarrier();
for (int i = 0; i < iterationCount; i++) {
if (lockstep) {
waitForBarrier();
}
runnable.accept(i);
Thread.yield();
}
}

private void waitForBarrier() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < iterationCount; i++) {
runnable.accept(i);
Thread.yield();
}
}
}

private static class LoopingThread extends Thread {
public LoopingThread(Consumer<Integer> runnable, int iterationCount,
CyclicBarrier barrier) {
super(new LoopingRunnable(runnable, iterationCount, barrier));
CyclicBarrier barrier, boolean lockstep) {
super(new LoopingRunnable(runnable, iterationCount, barrier, lockstep));
}
}
}

0 comments on commit d44c07f

Please sign in to comment.