Skip to content

Commit

Permalink
Include stacktrace in event handler and process manager error callb…
Browse files Browse the repository at this point in the history
…ack functions

- Include `__STACKTRACE__` in event handler failure context.
- Include `__STACKTRACE__` in process manager failure context.
- Log aggregate and process manager exceptions and stacktrace.
- Extend aggregate lifespan to support `{:stop, reason}`.
  • Loading branch information
slashdotdash committed Apr 30, 2020
1 parent a4f0a02 commit 8ceab54
Show file tree
Hide file tree
Showing 53 changed files with 499 additions and 333 deletions.
2 changes: 1 addition & 1 deletion guides/Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ end

The inactivity timeout is specified in milliseconds, after which time the aggregate process will be stopped if no other messages are received.

Return `:stop` to immediately shutdown the aggregate process. Return `:infinity` to prevent the aggregate instance from shutting down.
Return `:stop` or `{:stop, reason}` to immediately shutdown the aggregate process. Return `:infinity` to prevent the aggregate instance from shutting down.

You can also return `:hibernate` and the process is hibernated, it will continue its loop once a message is in its message queue. Hibernating an aggregate causes garbage collection and minimises the memory used by the process. Hibernating should not be used aggressively as too much time could be spent garbage collecting.

Expand Down
62 changes: 29 additions & 33 deletions lib/commanded/aggregates/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ defmodule Commanded.Aggregates.Aggregate do
end

case lifespan_timeout do
:stop ->
{:stop, :normal, state}
{:stop, reason} ->
{:stop, reason, state}

lifespan_timeout ->
{:noreply, state, lifespan_timeout}
Expand Down Expand Up @@ -251,7 +251,7 @@ defmodule Commanded.Aggregates.Aggregate do
{:reply, reply, state}
else
case lifespan_timeout do
:stop -> {:stop, :normal, reply, state}
{:stop, reason} -> {:stop, reason, reply, state}
lifespan_timeout -> {:reply, reply, state, lifespan_timeout}
end
end
Expand Down Expand Up @@ -391,51 +391,42 @@ defmodule Commanded.Aggregates.Aggregate do
end
end

# Rebuild aggregate state from a `Stream` of its events
# Rebuild aggregate state from a `Stream` of its events.
defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do
%Aggregate{aggregate_module: aggregate_module} = state

event_stream
|> Stream.map(fn event ->
{event.data, event.stream_version}
end)
|> Enum.reduce_while(state, fn {event, stream_version}, state ->
case event do
nil ->
{:halt, state}
Enum.reduce(event_stream, state, fn event, state ->
%RecordedEvent{data: data, stream_version: stream_version} = event
%Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state

event ->
state = %Aggregate{
state
| aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(state.aggregate_state, event)
}

{:cont, state}
end
%Aggregate{
state
| aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(aggregate_state, data)
}
end)
|> case do
nil -> state
state -> state
end
end

defp aggregate_lifespan_timeout(lifespan, timeout_function_name, args) do
# Take the last event or the command/error
defp aggregate_lifespan_timeout(lifespan, function_name, args) do
# Take the last event or the command or error
args = args |> List.wrap() |> Enum.take(-1)

case apply(lifespan, timeout_function_name, args) do
timeout when timeout in [:infinity, :hibernate, :stop] ->
case apply(lifespan, function_name, args) do
timeout when timeout in [:infinity, :hibernate] ->
timeout

:stop ->
{:stop, :normal}

{:stop, _reason} = reply ->
reply

timeout when is_integer(timeout) and timeout >= 0 ->
timeout

invalid ->
Logger.warn(fn ->
"Invalid timeout for aggregate lifespan " <>
inspect(lifespan) <>
", expected a non-negative integer, `:infinity`, `:hibernate`, or `:stop` but got: " <>
", expected a non-negative integer, `:infinity`, `:hibernate`, `:stop`, or `{:stop, reason}` but got: " <>
inspect(invalid)
end)

Expand All @@ -447,7 +438,7 @@ defmodule Commanded.Aggregates.Aggregate do
%ExecutionContext{command: command, handler: handler, function: function} = context
%Aggregate{aggregate_state: aggregate_state} = state

Logger.debug(fn -> describe(state) <> " executing command: #{inspect(command)}" end)
Logger.debug(fn -> describe(state) <> " executing command: " <> inspect(command) end)

case Kernel.apply(handler, function, [aggregate_state, command]) do
{:error, _error} = reply ->
Expand All @@ -471,6 +462,11 @@ defmodule Commanded.Aggregates.Aggregate do
pending_events ->
apply_and_persist_events(pending_events, context, state)
end
rescue
error ->
Logger.error(fn -> Exception.format(:error, error, __STACKTRACE__) end)

{{:error, error}, state}
end

defp apply_and_persist_events(pending_events, context, %Aggregate{} = state) do
Expand Down
7 changes: 5 additions & 2 deletions lib/commanded/aggregates/aggregate_lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ defmodule Commanded.Aggregates.AggregateLifespan do
- Non-negative integer - specify an inactivity timeout, in millisconds.
- `:infinity` - prevent the aggregate instance from shutting down.
- `:hibernate` - send the process into hibernation.
- `:stop` - immediately shutdown the aggregate process.
- `:stop` - immediately shutdown the aggregate process with a `:normal` exit
reason.
- `{:stop, reason}` - immediately shutdown the aggregate process with the
given reason.
### Hibernation
Expand Down Expand Up @@ -68,7 +71,7 @@ defmodule Commanded.Aggregates.AggregateLifespan do
"""

@type lifespan :: timeout | :hibernate | :stop
@type lifespan :: timeout | :hibernate | :stop | {:stop, reason :: term()}

@doc """
Aggregate process will be stopped after specified inactivity timeout unless
Expand Down
13 changes: 10 additions & 3 deletions lib/commanded/aggregates/default_lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Commanded.Aggregates.DefaultLifespan do
behaviour.
It will ensure that an aggregate instance process runs indefinitely once
started.
started, unless an exception is encountered.
"""

@behaviour Commanded.Aggregates.AggregateLifespan
Expand All @@ -20,7 +20,14 @@ defmodule Commanded.Aggregates.DefaultLifespan do
def after_command(_command), do: :infinity

@doc """
Aggregate will run indefinitely once started.
Aggregate is stopped on exception, but will run indefinitely for any non-
exception error.
"""
def after_error(_error), do: :infinity
def after_error(error) do
if Exception.exception?(error) do
{:stop, error}
else
:infinity
end
end
end
8 changes: 2 additions & 6 deletions lib/commanded/aggregates/multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ defmodule Commanded.Aggregate.Multi do
updated balance is used to check whether the account is overdrawn.
defmodule BankAccount do
defstruct [
account_number: nil,
balance: 0,
state: nil,
]
alias Commanded.Aggregate.Multi
defstruct [:account_number, :state, balance: 0]
def withdraw(
%BankAccount{state: :active} = account,
%WithdrawMoney{amount: amount})
Expand Down
17 changes: 10 additions & 7 deletions lib/commanded/event/failure_context.ex
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
defmodule Commanded.Event.FailureContext do
@moduledoc """
Contains the data related to an event handling failure.
Data related to an event handling failure.
The available fields are:
- `context` - a map that is passed between each failure. Use it to store any
transient state between failures. As an example it could be used to count
error failures and stop or skip the problematic event after too many.
- `metadata` - the metadata associated with the failed event.
- `context` - a map that is passed between each failure. Use it to store any
transient state between failures. As an example it could be used to count
error failures and stop or skip the problematic event after too many.
- `metadata` - the metadata associated with the failed event.
- `stacktrace` - the stacktrace if the error was an unhandled exception.
"""

@type t :: %__MODULE__{
context: map(),
metadata: map()
metadata: map(),
stacktrace: Exception.stacktrace() | nil
}

defstruct [
:context,
:metadata
:metadata,
:stacktrace
]
end
54 changes: 31 additions & 23 deletions lib/commanded/event/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -634,27 +634,15 @@ defmodule Commanded.Event.Handler do
{:error, :already_seen_event} ->
confirm_receipt(event, state)

{:error, reason} = error ->
Logger.error(fn ->
describe(state) <>
" failed to handle event #{inspect(event, pretty: true)} due to: #{
inspect(reason, pretty: true)
}"
end)
{:error, _reason} = error ->
failure_context = build_failure_context(event, context)

handle_event_error(error, event, state, context)
handle_event_error(error, event, failure_context, state)

{:error, reason, stacktrace} ->
Logger.error(fn ->
describe(state) <>
" failed to handle event #{inspect(event, pretty: true)} due to: #{
inspect(reason, pretty: true)
}"
end)
failure_context = build_failure_context(event, context, stacktrace)

Logger.error(fn -> Exception.format(:error, reason, stacktrace) end)

handle_event_error({:error, reason}, event, state, context)
handle_event_error({:error, reason}, event, failure_context, state)
end
end

Expand All @@ -667,18 +655,38 @@ defmodule Commanded.Event.Handler do
try do
handler_module.handle(data, metadata)
rescue
e -> {:error, e, __STACKTRACE__}
error ->
stacktrace = __STACKTRACE__
Logger.error(fn -> Exception.format(:error, error, stacktrace) end)

{:error, error, stacktrace}
end
end

defp handle_event_error(error, %RecordedEvent{} = failed_event, %Handler{} = state, context) do
%RecordedEvent{data: data} = failed_event
%Handler{handler_module: handler_module} = state

failure_context = %FailureContext{
defp build_failure_context(%RecordedEvent{} = failed_event, context, stacktrace \\ nil) do
%FailureContext{
context: context,
stacktrace: stacktrace,
metadata: enrich_metadata(failed_event)
}
end

defp handle_event_error(
{:error, reason} = error,
%RecordedEvent{} = failed_event,
%FailureContext{} = failure_context,
%Handler{} = state
) do
%RecordedEvent{data: data} = failed_event
%Handler{handler_module: handler_module} = state

Logger.error(fn ->
describe(state) <>
" failed to handle event " <>
inspect(failed_event, pretty: true) <>
" due to: " <>
inspect(reason, pretty: true)
end)

case handler_module.error(error, data, failure_context) do
{:retry, context} when is_map(context) ->
Expand Down
22 changes: 13 additions & 9 deletions lib/commanded/process_managers/failure_context.ex
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
defmodule Commanded.ProcessManagers.FailureContext do
@moduledoc """
Contains the data related to a failure.
Data related to a process manager event handling or command dispatch failure.
The available fields are:
- `pending_commands` - the pending commands that were not executed yet
- `process_manager_state` - the state the process manager would be in
if the command would not fail.
- `last_event` - the last event the process manager received
- `context` - the context of failures (to be passed between each failure),
used in example to count retries.
- `context` - the context map passed between each failure and may be used
to track state between retries, such as to count failures.
- `last_event` - the last event the process manager received
- `pending_commands` - the pending commands that were not executed yet
- `process_manager_state` - the state the process manager would be in
if the command would not fail.
- `stacktrace` - the stacktrace if the error was an unhandled exception.
"""

@type t :: %__MODULE__{
context: map(),
last_event: struct(),
pending_commands: [struct()],
process_manager_state: struct(),
last_event: struct(),
context: map()
stacktrace: Exception.stacktrace() | nil
}

defstruct [
:process_manager_state,
:last_event,
:stacktrace,
context: %{},
pending_commands: []
]
Expand Down
Loading

0 comments on commit 8ceab54

Please sign in to comment.