Skip to content

Commit

Permalink
Refactor dispatch macro in composite router
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Apr 14, 2020
1 parent 0806bbf commit 04afe01
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 73 deletions.
4 changes: 2 additions & 2 deletions guides/Application.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Application

Commanded allows you to define, supervise, and start your own application module. To use Commanded you must create at least one application module. You can create multiple Commanded applications which will run independently, each using its own separately configured event store.
Commanded allows you to define, supervise, and start your own application module. To use Commanded you must create at least one application. You can create multiple Commanded applications which will run independently, each using its own separately configured event store.

The application expects at least an `:otp_app` option to be specified. It should point to an OTP application containing the application's configuration.

Expand Down Expand Up @@ -70,7 +70,7 @@ defmodule MyApp.Application do
end
```

Once you have defined a router, you can dispatch a command using the application module:
Once you have defined a router you can dispatch a command using the application module:

```elixir
:ok = MyApp.Application.dispatch(%RegisterCustomer{id: UUID.uuid4(), name: "Ben"})
Expand Down
44 changes: 36 additions & 8 deletions lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ defmodule Commanded.Application do
See `Commanded.Commands.CompositeRouter` for details.
## Command dispatch
Once a router has been configured you can dispatch a command via the
application:
:ok = MyApp.dispatch(command, opts)
See `c:dispatch/1` and `c:dispatch/2` for details.
## Dynamic named applications
An application can be provided with a name as an option to `start_link/1`.
Expand Down Expand Up @@ -105,7 +114,6 @@ defmodule Commanded.Application do
:ok = MyApp.Application.dispatch(command, application: :tenant1)
## Default dispatch options
An application can be configured with default command dispatch options such as
Expand Down Expand Up @@ -223,11 +231,28 @@ defmodule Commanded.Application do
- `command` is a command struct which must be registered with a
`Commanded.Commands.Router` and included in the application.
"""
@callback dispatch(command :: struct()) ::
:ok
| {:ok, aggregate_state :: struct()}
| {:ok, aggregate_version :: non_neg_integer()}
| {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()}
| {:error, :unregistered_command}
| {:error, :consistency_timeout}
| {:error, reason :: term()}

@doc """
Dispatch a registered command.
- `command` is a command struct which must be registered with a
`Commanded.Commands.Router` and included in the application.
- `timeout_or_opts` is either an integer timeout or a keyword list of
options. The timeout must be an integer greater than zero which
specifies how many milliseconds to allow the command to be handled, or
the atom `:infinity` to wait indefinitely. The default timeout value is
five seconds.
options.
The timeout must be an integer greater than zero which specifies how many
milliseconds to allow the command to be handled, or the atom `:infinity`
to wait indefinitely. The default timeout value is five seconds.
Alternatively, an options keyword list can be provided, it supports the
following options.
Expand Down Expand Up @@ -282,14 +307,17 @@ defmodule Commanded.Application do
:ok = BankApp.dispatch(command, timeout: 30_000)
"""
@callback dispatch(command :: struct, timeout_or_opts :: integer | :infinity | Keyword.t()) ::
@callback dispatch(
command :: struct(),
timeout_or_opts :: non_neg_integer() | :infinity | Keyword.t()
) ::
:ok
| {:ok, aggregate_state :: struct}
| {:ok, aggregate_state :: struct()}
| {:ok, aggregate_version :: non_neg_integer()}
| {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()}
| {:error, :unregistered_command}
| {:error, :consistency_timeout}
| {:error, reason :: term}
| {:error, reason :: term()}

alias Commanded.Application.Config

Expand Down
75 changes: 39 additions & 36 deletions lib/commanded/commands/composite_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,25 @@ defmodule Commanded.Commands.CompositeRouter do
router(Bank.MoneyTransfer.Router)
end
You can dispatch a command via the composite router which will be routed to
the associated router:
One or more routers or composite routers can be included in a
`Commanded.Application` since it is also a composite router:
alias Bank.AppRouter
defmodule BankApp do
use Commanded.Application
router(Bank.AppRouter)
end
You can dispatch a command via the application which will then be routed to
the associated child router:
command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000}
:ok = AppRouter.dispatch(command)
:ok = BankApp.dispatch(command)
Or via the composite router itself, specifying the application:
:ok = Bank.AppRouter.dispatch(command, application: BankApp)
A composite router can include composite routers.
"""
Expand All @@ -37,6 +48,10 @@ defmodule Commanded.Commands.CompositeRouter do

import unquote(__MODULE__)

@before_compile unquote(__MODULE__)

Module.register_attribute(__MODULE__, :registered_commands, accumulate: false)

application = Keyword.get(unquote(opts), :application)

default_dispatch_opts =
Expand All @@ -46,8 +61,6 @@ defmodule Commanded.Commands.CompositeRouter do

@default_dispatch_opts default_dispatch_opts
@registered_commands %{}

@before_compile unquote(__MODULE__)
end
end

Expand Down Expand Up @@ -77,47 +90,37 @@ defmodule Commanded.Commands.CompositeRouter do
quote generated: true do
@doc false
def __registered_commands__ do
Enum.map(@registered_commands, fn {command, _router} -> command end)
end

def __dispatch_opts__(opts) do
Keyword.merge(@default_dispatch_opts, opts)
Enum.map(@registered_commands, fn {command_module, _router} -> command_module end)
end

@doc false
def dispatch(command, opts \\ [])

Enum.map(@registered_commands, fn {command_module, router} ->
Module.eval_quoted(
__MODULE__,
quote do
@doc false
def dispatch(%unquote(command_module){} = command, :infinity) do
opts = __dispatch_opts__(timeout: :infinity)
@doc false
def dispatch(command, :infinity),
do: do_dispatch(command, timeout: :infinity)

unquote(router).dispatch(command, opts)
end
@doc false
def dispatch(command, timeout) when is_integer(timeout),
do: do_dispatch(command, timeout: timeout)

@doc false
def dispatch(%unquote(command_module){} = command, timeout)
when is_integer(timeout) do
opts = __dispatch_opts__(timeout: timeout)
@doc false
def dispatch(command, opts),
do: do_dispatch(command, opts)

unquote(router).dispatch(command, opts)
end
for {command_module, router} <- @registered_commands do
@command_module command_module
@router router

@doc false
def dispatch(%unquote(command_module){} = command, opts) do
opts = __dispatch_opts__(opts)
defp do_dispatch(%@command_module{} = command, opts) do
opts = Keyword.merge(@default_dispatch_opts, opts)

unquote(router).dispatch(command, opts)
end
end
)
end)
@router.dispatch(command, opts)
end
end

@doc false
def dispatch(command, _opts) do
# Catch unregistered commands, log and return an error.
defp do_dispatch(command, _opts) do
Logger.error(fn ->
"attempted to dispatch an unregistered command: " <> inspect(command)
end)
Expand Down
19 changes: 14 additions & 5 deletions lib/commanded/commands/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ defmodule Commanded.Commands.Handler do
defmodule OpenAccountHandler do
@behaviour Commanded.Commands.Handler
def handle(%BankAccount{} = aggregate, %OpenAccount{account_number: account_number, initial_balance: initial_balance}) do
def handle(%BankAccount{} = aggregate, %OpenAccount{} = command) do
%OpenAccount{account_number: account_number, initial_balance: initial_balance} = command
BankAccount.open_account(aggregate, account_number, initial_balance)
end
end
Expand All @@ -18,15 +20,22 @@ defmodule Commanded.Commands.Handler do
@type aggregate :: struct()
@type command :: struct()
@type domain_event :: struct
@type domain_events :: list(struct())
@type reason :: term()
@type reason :: any()

@doc """
Apply the given command to the event sourced aggregate.
You must return a list containing the pending events, or `nil` / `[]` when no events produced.
You must return a single domain event, a list containing the pending events,
or `nil`, `[]`, or `:ok` when no events are produced.
You should return `{:error, reason}` on failure.
"""
@callback handle(aggregate, command) :: domain_event | domain_events | nil | {:error, reason}
@callback handle(aggregate, command) ::
domain_event
| list(domain_event)
| {:ok, domain_event}
| {:ok, list(domain_event)}
| :ok
| nil
| {:error, reason}
end
41 changes: 21 additions & 20 deletions lib/commanded/commands/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ defmodule Commanded.Commands.Router do
Module.register_attribute(__MODULE__, :registered_middleware, accumulate: true)
Module.register_attribute(__MODULE__, :registered_identities, accumulate: false)

@application Keyword.get(unquote(opts), :application)

@default [
@default_dispatch_opts [
application: Keyword.get(unquote(opts), :application),
consistency: Router.get_opt(unquote(opts), :default_consistency, :eventual),
returning: Router.get_default_dispatch_return(unquote(opts)),
timeout: 5_000,
Expand Down Expand Up @@ -362,7 +361,10 @@ defmodule Commanded.Commands.Router do
}`"
end

@registered_commands {unquote(command_module), Keyword.merge(@default, unquote(opts))}
@registered_commands {
unquote(command_module),
Keyword.merge(@default_dispatch_opts, unquote(opts))
}
end
end
end
Expand Down Expand Up @@ -393,14 +395,15 @@ defmodule Commanded.Commands.Router do
- `command` is a command struct which must be registered with the router.
- `timeout_or_opts` is either an integer timeout or a keyword list of
options. The timeout must be an integer greater than zero which
specifies how many milliseconds to allow the command to be handled, or
the atom `:infinity` to wait indefinitely. The default timeout value is
five seconds.
- `timeout_or_opts` is either an integer timeout, `:infinity`, or a keyword
list of options.
The timeout must be an integer greater than zero which specifies how many
milliseconds to allow the command to be handled, or the atom `:infinity`
to wait indefinitely. The default timeout value is five seconds.
Alternatively, an options keyword list can be provided, it supports the
following options.
Alternatively, an options keyword list can be provided with the following
options.
Options:
Expand Down Expand Up @@ -466,16 +469,10 @@ defmodule Commanded.Commands.Router do

defmacro __before_compile__(_env) do
quote generated: true do
@registered_command_modules Enum.map(@registered_commands, fn
{command_module, _command_opts} -> command_module
end)

@middleware Enum.reduce(@registered_middleware, @default_middleware, fn middleware, acc ->
[middleware | acc]
end)

@doc false
def __registered_commands__, do: @registered_command_modules
def __registered_commands__ do
Enum.map(@registered_commands, fn {command_module, _command_opts} -> command_module end)
end

@doc false
def dispatch(command, opts \\ [])
Expand All @@ -492,6 +489,10 @@ defmodule Commanded.Commands.Router do
def dispatch(command, opts),
do: do_dispatch(command, opts)

@middleware Enum.reduce(@registered_middleware, @default_middleware, fn middleware, acc ->
[middleware | acc]
end)

for {command_module, command_opts} <- @registered_commands do
@aggregate Keyword.fetch!(command_opts, :aggregate)
@handler Keyword.fetch!(command_opts, :to)
Expand Down
2 changes: 0 additions & 2 deletions test/support/mocked_app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ defmodule Commanded.MockedApp do
],
registry: :local,
pubsub: :local

# router(Commanded.Commands.MockRouter)
end

0 comments on commit 04afe01

Please sign in to comment.