Skip to content

Commit

Permalink
Merge pull request spotify#134 from pettermahlen/improve-controller-r…
Browse files Browse the repository at this point in the history
…obustness

Observe and update race issues
  • Loading branch information
pettermahlen authored Sep 17, 2020
2 parents 9feff71 + edfdd80 commit b37d6fc
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
3 changes: 3 additions & 0 deletions mobius-core/src/main/java/com/spotify/mobius/Loop.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface Loop<M, E, F> extends Disposable {
* notified of future changes to the model until the loop or the returned {@link Disposable} is
* disposed.
*
* <p>If the addition of the observer races with an event that changes the current model, the
* observer may get notified twice of the same model.
*
* @param observer a non-null observer of model changes
* @return a {@link Disposable} that can be used to stop further notifications to the observer
* @throws NullPointerException if the observer is null
Expand Down
4 changes: 2 additions & 2 deletions mobius-core/src/main/java/com/spotify/mobius/MobiusLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ public Disposable observe(final Consumer<M> observer) {
return () -> {};
}

modelObservers.add(checkNotNull(observer));

final M currentModel = mostRecentModel;
if (currentModel != null) {
// Start by emitting the most recently received model.
observer.accept(currentModel);
}

modelObservers.add(checkNotNull(observer));

return new Disposable() {
@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@ public synchronized boolean isRunning() {
return currentState.isRunning();
}

private synchronized void dispatchEvent(E event) {
// note on synchronization: since this method should never change the controller state, only that
// of the controlled loop, which has its own concurrency protection, it doesn't need to be
// synchronized.
private void dispatchEvent(E event) {
currentState.onDispatchEvent(event);
}

private synchronized void updateView(M model) {
// note on synchronization: since this method should never change the controller state, and will
// be called from the `mainThreadRunner` only, it doesn't need to be synchronized.
private void updateView(M model) {
currentState.onUpdateView(model);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@
*/
package com.spotify.mobius;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;

import com.spotify.mobius.disposables.Disposable;
import com.spotify.mobius.functions.Consumer;
import com.spotify.mobius.runners.ImmediateWorkRunner;
import com.spotify.mobius.test.RecordingModelObserver;
import com.spotify.mobius.testdomain.TestEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import org.junit.Test;

public class MobiusLoopObservabilityBehavior extends MobiusLoopTest {

@Test
public void shouldTransitionToNextStateBasedOnInput() throws Exception {
mobiusLoop.dispatchEvent(new TestEvent("first"));
Expand Down Expand Up @@ -55,4 +68,68 @@ public void shouldSupportUnregisteringObserver() throws Exception {

observer.assertStates("init", "init->active observer");
}

@Test
public void shouldNotReportModelsInIncorrectOrder() throws Exception {
// 1. create a loop with initial model A
// 2. concurrently, do:
// a. dispatch an event that changes the model to B
// b. add an observer
// 3. verify that the observer never saw B, A.

ExecutorService service1 = Executors.newSingleThreadExecutor();
ExecutorService service2 = Executors.newSingleThreadExecutor();

List<String> bad = new ArrayList<>();

// 100,000 iterations tended to lead to about 10 instances of 'B,A' before the underlying
// issue was fixed - but that takes a lot of time for little benefit now that the issue is
// fixed, so this iteration count was reduced. If another out-of-order error exists, or the
// error happens again, this test should become flaky.
for (int i = 0; i < 1000; i++) {
MobiusLoop<Integer, Integer, Integer> loop =
MobiusLoop.create(
(model, event) -> Next.next(event),
0,
emptyList(),
new NoopConnectable(),
new NoopConnectable(),
new ImmediateWorkRunner(),
new ImmediateWorkRunner());

List<Integer> observed = Collections.synchronizedList(new ArrayList<>());

final Future<Disposable> future2 = service2.submit(() -> loop.observe(observed::add));
final Future<?> future1 = service1.submit(() -> loop.dispatchEvent(1));

future1.get();
future2.get();

assertThat(observed.size()).isBetween(1, 2);

// two models, with model 0 at the end is bad - adding that to a list so that the test will
// give a clearer indication of how common this was.
if (observed.size() == 2 && observed.get(1) == 0) {
bad.add(observed.toString());
}
}

assertThat(bad).isEmpty();
}

private static class NoopConnectable implements Connectable<Integer, Integer> {

@Nonnull
@Override
public Connection<Integer> connect(Consumer<Integer> output)
throws ConnectionLimitExceededException {
return new Connection<Integer>() {
@Override
public void accept(Integer value) {}

@Override
public void dispose() {}
};
}
}
}

0 comments on commit b37d6fc

Please sign in to comment.