Skip to content

Commit

Permalink
Provide handle to register a ListenerInvocationErrorHandler
Browse files Browse the repository at this point in the history
Add functionality to register a ListenerInvocationErrorHandler to the
SagaTestFixture, so that users can verify their custom handler works as
they expect during Saga execution.

Resolves AxonFramework#1223
  • Loading branch information
smcvb committed Sep 30, 2019
1 parent 5c966d6 commit 7806af7
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.deadline.DeadlineMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.annotation.HandlerDefinition;
Expand Down Expand Up @@ -167,6 +168,19 @@ FixtureConfiguration registerDeadlineHandlerInterceptor(
*/
FixtureConfiguration registerStartRecordingCallback(Runnable onStartRecordingCallback);

/**
* Registers a {@link ListenerInvocationErrorHandler} to be set for the Saga to deal with exceptions being thrown
* from within Saga Event Handlers. Will be given to the
* {@link org.axonframework.modelling.saga.AnnotatedSagaManager} for the defined Saga type. Defaults to a
* {@link org.axonframework.eventhandling.LoggingErrorHandler}.
*
* @param listenerInvocationErrorHandler to be set for the Saga to deal with exceptions being thrown from within
* Saga Event Handlers
* @return the current FixtureConfiguration, for fluent interfacing
*/
FixtureConfiguration registerListenerInvocationErrorHandler(
ListenerInvocationErrorHandler listenerInvocationErrorHandler);

/**
* Sets the instance that defines the behavior of the Command Bus when a command is dispatched with a callback.
*
Expand Down
59 changes: 36 additions & 23 deletions test/src/main/java/org/axonframework/test/saga/SagaTestFixture.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.LoggingErrorHandler;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventBus;
import org.axonframework.modelling.saga.AnnotatedSagaManager;
import org.axonframework.modelling.saga.SagaRepository;
import org.axonframework.modelling.saga.repository.AnnotatedSagaRepository;
import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.ResultMessage;
Expand All @@ -41,6 +39,10 @@
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.annotation.SimpleResourceParameterResolverFactory;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.modelling.saga.AnnotatedSagaManager;
import org.axonframework.modelling.saga.SagaRepository;
import org.axonframework.modelling.saga.repository.AnnotatedSagaRepository;
import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore;
import org.axonframework.test.FixtureExecutionException;
import org.axonframework.test.deadline.StubDeadlineManager;
import org.axonframework.test.eventscheduler.StubEventScheduler;
Expand Down Expand Up @@ -78,19 +80,22 @@
*/
public class SagaTestFixture<T> implements FixtureConfiguration, ContinuedGivenState {

private final RecordingCommandBus commandBus;
private EventBus eventBus;
private final StubEventScheduler eventScheduler;
private final StubDeadlineManager deadlineManager;
private final LinkedList<Object> registeredResources = new LinkedList<>();
private final Map<Object, AggregateEventPublisherImpl> aggregatePublishers = new HashMap<>();
private final FixtureExecutionResultImpl<T> fixtureExecutionResult;
private final RecordingCommandBus commandBus;
private final MutableFieldFilter fieldFilters = new MutableFieldFilter();
private HandlerDefinition handlerDefinition;
private ListenerInvocationErrorHandler listenerInvocationErrorHandler;

private final Class<T> sagaType;
private final InMemorySagaStore sagaStore;
private AnnotatedSagaManager<T> sagaManager;
private SagaRepository<T> sagaRepository;
private EventBus eventBus;
private final LinkedList<Object> registeredResources = new LinkedList<>();

private final FixtureExecutionResultImpl<T> fixtureExecutionResult;
private final Map<Object, AggregateEventPublisherImpl> aggregatePublishers = new HashMap<>();
private final MutableFieldFilter fieldFilters = new MutableFieldFilter();

private boolean transienceCheckEnabled = true;
private boolean resourcesInitialized = false;

Expand All @@ -101,25 +106,25 @@ public class SagaTestFixture<T> implements FixtureConfiguration, ContinuedGivenS
*/
@SuppressWarnings({"unchecked"})
public SagaTestFixture(Class<T> sagaType) {
this.sagaType = sagaType;
commandBus = new RecordingCommandBus();
eventBus = SimpleEventBus.builder().build();
eventScheduler = new StubEventScheduler();
deadlineManager = new StubDeadlineManager();
eventBus = SimpleEventBus.builder().build();
handlerDefinition = ClasspathHandlerDefinition.forClass(sagaType);
listenerInvocationErrorHandler = new LoggingErrorHandler();

this.sagaType = sagaType;
sagaStore = new InMemorySagaStore();

registeredResources.add(eventBus);
commandBus = new RecordingCommandBus();
registeredResources.add(commandBus);
registeredResources.add(eventScheduler);
registeredResources.add(deadlineManager);
registeredResources.add(DefaultCommandGateway.builder().commandBus(commandBus).build());
fixtureExecutionResult = new FixtureExecutionResultImpl<>(sagaStore,
eventScheduler,
deadlineManager,
eventBus,
commandBus,
sagaType,
fieldFilters);
handlerDefinition = ClasspathHandlerDefinition.forClass(sagaType);

fixtureExecutionResult = new FixtureExecutionResultImpl<>(
sagaStore, eventScheduler, deadlineManager, eventBus, commandBus, sagaType, fieldFilters
);
}

/**
Expand Down Expand Up @@ -167,7 +172,7 @@ protected void ensureSagaResourcesInitialized() {
ClasspathParameterResolverFactory.forClass(sagaType)
);

sagaRepository = AnnotatedSagaRepository.<T>builder()
SagaRepository<T> sagaRepository = AnnotatedSagaRepository.<T>builder()
.sagaType(sagaType)
.parameterResolverFactory(parameterResolverFactory)
.handlerDefinition(handlerDefinition)
Expand All @@ -179,6 +184,7 @@ protected void ensureSagaResourcesInitialized() {
.sagaType(sagaType)
.parameterResolverFactory(parameterResolverFactory)
.handlerDefinition(handlerDefinition)
.listenerInvocationErrorHandler(listenerInvocationErrorHandler)
.build();
resourcesInitialized = true;
}
Expand Down Expand Up @@ -368,6 +374,13 @@ public FixtureConfiguration registerStartRecordingCallback(Runnable onStartRecor
return this;
}

@Override
public FixtureConfiguration registerListenerInvocationErrorHandler(
ListenerInvocationErrorHandler listenerInvocationErrorHandler) {
this.listenerInvocationErrorHandler = listenerInvocationErrorHandler;
return this;
}

private AggregateEventPublisherImpl getPublisherFor(String aggregateIdentifier) {
if (!aggregatePublishers.containsKey(aggregateIdentifier)) {
aggregatePublishers.put(aggregateIdentifier, new AggregateEventPublisherImpl(aggregateIdentifier));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.axonframework.test.saga;

import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.modelling.saga.SagaEventHandler;
import org.axonframework.modelling.saga.StartSaga;
import org.junit.*;

import java.util.concurrent.atomic.AtomicInteger;

import static java.time.Duration.ofSeconds;
import static java.time.Instant.now;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.*;

/**
* Test class dedicated to validating custom registered components on a {@link SagaTestFixture} instance.
*/
public class SagaTestFixtureRegistrationTest {

private SagaTestFixture<SomeTestSaga> fixture;
private AtomicInteger startRecordingCount;

@Before
public void setUp() {
fixture = new SagaTestFixture<>(SomeTestSaga.class);
startRecordingCount = new AtomicInteger();
fixture.registerStartRecordingCallback(startRecordingCount::getAndIncrement);
}

@Test
public void startRecordingCallbackIsInvokedOnWhenPublishingAnEvent() {
fixture.givenAPublished(new SomeTestSaga.SomeEvent());
assertThat(startRecordingCount.get(), equalTo(0));

fixture.whenPublishingA(new SomeTestSaga.SomeEvent());
assertThat(startRecordingCount.get(), equalTo(1));
}

@Test
public void startRecordingCallbackIsInvokedOnWhenTimeAdvances() {
fixture.givenAPublished(new SomeTestSaga.SomeEvent());
assertThat(startRecordingCount.get(), equalTo(0));

fixture.whenTimeAdvancesTo(now());
assertThat(startRecordingCount.get(), equalTo(1));
}

@Test
public void startRecordingCallbackIsInvokedOnWhenTimeElapses() {
fixture.givenAPublished(new SomeTestSaga.SomeEvent());
assertThat(startRecordingCount.get(), equalTo(0));

fixture.whenTimeElapses(ofSeconds(5));
assertThat(startRecordingCount.get(), equalTo(1));
}

@Test
public void testCustomListenerInvocationErrorHandlerIsUsed() {
SomeTestSaga.SomeEvent testEvent = new SomeTestSaga.SomeEvent("some-id", true);

ListenerInvocationErrorHandler testSubject = (exception, event, eventHandler) ->
assertEquals(testEvent.getException().getMessage(), exception.getMessage());

fixture.registerListenerInvocationErrorHandler(testSubject);
// This will trigger the test subject due to how the event is configured
fixture.givenAPublished(testEvent);
}

public static class SomeTestSaga {

@SuppressWarnings("unused")
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void handle(SomeEvent event) throws Exception {
if (event.shouldThrowException()) {
throw event.getException();
}
}

public static class SomeEvent {

private final String id;
private final Boolean shouldThrowException;
private final Exception exception = new IllegalStateException("I was told to throw an exception");

public SomeEvent() {
this("42", false);
}

public SomeEvent(String id, Boolean shouldThrowException) {
this.id = id;
this.shouldThrowException = shouldThrowException;
}

public String getId() {
return id;
}

public Boolean shouldThrowException() {
return shouldThrowException;
}

public Exception getException() {
return exception;
}
}
}
}

This file was deleted.

0 comments on commit 7806af7

Please sign in to comment.