From d8b9c5497549e5c726fe233e008822fa17bb1061 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 30 Dec 2019 16:45:47 +0000 Subject: [PATCH 01/11] Fix process manager `error/3` callback documentation --- guides/Process Managers.md | 12 ++++++--- .../process_managers/process_manager.ex | 27 ++++++++++++------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/guides/Process Managers.md b/guides/Process Managers.md index de38e84c..7ee7102d 100644 --- a/guides/Process Managers.md +++ b/guides/Process Managers.md @@ -58,13 +58,19 @@ The `error/3` callback function must return one of the following responses depen - `{:retry, delay, context}` - retry the failed command, after sleeping for the requested delay (in milliseconds). Context is a map as described in `{:retry, context}` above. -- `{:skip, :discard_pending}` - discard the failed command and any pending commands. +- `{:stop, reason}` - stop the process manager with the given reason. -- `{:skip, :continue_pending}` - skip the failed command, but continue dispatching any pending commands. +For event handling failures, when failure source is an event, you can also return: + +- `:skip` - to skip the problematic event. No commands will be dispatched. + +For command dispatch failures, when failure source is a command, you can also return: - `{:continue, commands, context}` - continue dispatching the given commands. This allows you to retry the failed command, modify it and retry, drop it, or drop all pending commands by passing an empty list `[]`. -- `{:stop, reason}` - stop the process manager with the given reason. +- `{:skip, :discard_pending}` - discard the failed command and any pending commands. + +- `{:skip, :continue_pending}` - skip the failed command, but continue dispatching any pending commands. ## Supervision diff --git a/lib/commanded/process_managers/process_manager.ex b/lib/commanded/process_managers/process_manager.ex index ff92d77a..b2144173 100644 --- a/lib/commanded/process_managers/process_manager.ex +++ b/lib/commanded/process_managers/process_manager.ex @@ -237,10 +237,10 @@ defmodule Commanded.ProcessManagers.ProcessManager do @doc """ Called when a command dispatch or event handling returns an error. - The `c:error/3` function allows you to control how command dispatch and event - handling failures are handled. The function is passed the error (e.g. - `{:error, :failure}`), the failed command (during failed dispatch) or failed - event (during failed event handling), and a failure context struct (see + The `c:error/3` function allows you to control how event handling and command + dispatch and failures are handled. The function is passed the error (e.g. + `{:error, :failure}`), the failed event (during failed event handling) or + failed command (during failed dispatch), and a failure context struct (see `Commanded.ProcessManagers.FailureContext` for details). The failure context contains a context map you can use to pass transient state @@ -257,6 +257,16 @@ defmodule Commanded.ProcessManagers.ProcessManager do the requested delay (in milliseconds). Context is a map as described in `{:retry, context}` above. + - `{:stop, reason}` - stop the process manager with the given reason. + + For event handling failures, when failure source is an event, you can also + return: + + - `:skip` - to skip the problematic event. No commands will be dispatched. + + For command dispatch failures, when failure source is a command, you can also + return: + - `{:skip, :discard_pending}` - discard the failed command and any pending commands. @@ -264,23 +274,22 @@ defmodule Commanded.ProcessManagers.ProcessManager do dispatching any pending commands. - `{:continue, commands, context}` - continue dispatching the given commands. - This allows you to retry the failed command, modify it and retry, drop it, + This allows you to retry the failed command, modify it and retry, drop it or drop all pending commands by passing an empty list `[]`. Context is a map as described in `{:retry, context}` above. - - `{:stop, reason}` - stop the process manager with the given reason. - """ @callback error( error :: {:error, term()}, failure_source :: command | domain_event, failure_context :: FailureContext.t() ) :: - {:retry, context :: map()} + {:continue, commands :: list(command), context :: map()} + | {:retry, context :: map()} | {:retry, delay :: non_neg_integer(), context :: map()} + | :skip | {:skip, :discard_pending} | {:skip, :continue_pending} - | {:continue, commands :: list(command), context :: map()} | {:stop, reason :: term()} alias Commanded.Event.Handler From 61cc7cda3ee50d5fad87df6b9a482ae3bc8c3254 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 18 Feb 2020 10:22:49 +0000 Subject: [PATCH 02/11] Global registry using Erlang's `:global` module --- .tool-versions | 1 + guides/Deployment.md | 44 +++++-- lib/commanded/registration.ex | 3 + lib/commanded/registration/global_registry.ex | 121 ++++++++++++++++++ mix.lock | 28 ++-- test/registration/global_registry_test.exs | 14 ++ test/registration/local_registry_test.exs | 13 +- .../support/registered_supervisor.ex | 5 +- .../support/registration_test_case.ex | 68 ++++++---- 9 files changed, 243 insertions(+), 54 deletions(-) create mode 100644 .tool-versions create mode 100644 lib/commanded/registration/global_registry.ex create mode 100644 test/registration/global_registry_test.exs diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 00000000..faafbd4b --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +elixir 1.9.4-otp-22 diff --git a/guides/Deployment.md b/guides/Deployment.md index 528f6f62..d16f6047 100644 --- a/guides/Deployment.md +++ b/guides/Deployment.md @@ -6,9 +6,37 @@ Commanded supports running on a single node, or multiple nodes run as either a [ Running your app using Commanded on a single node requires no configuration as local is the default setting. -## Multi node cluster deployment +## Multi node distributed Erlang deployment -To support deployment to a cluster of nodes you must use the [Commanded Swarm registry](https://github.com/commanded/commanded-swarm-registry) library and [Phoenix's distributed pub/sub and presence platform](https://hex.pm/packages/phoenix_pubsub) to allow process distribution and communication amongst all nodes in the cluster. +To support deployment to a cluster of nodes and distributed Erlang you must configure: + +1. A registry which supports distributed Erlang. The `:global` registry provided by Commanded or the [Commanded Swarm registry](https://github.com/commanded/commanded-swarm-registry) library. +2. [Phoenix's distributed pub/sub and presence platform](https://hex.pm/packages/phoenix_pubsub) to allow process distribution and communication amongst all nodes in the cluster. + +### `:global` registry + +Use Erlang's [`:global`](http://erlang.org/doc/man/global.html) name registration facility with distributed Erlang. The global name server starts automatically when a node is started. The registered names are stored in replica global name tables on every node. Thus, the translation of a name to a pid is fast, as it is always done locally. + +Define the `:global` registry for your application: + +```elixir +defmodule MyApp.Application do + use Commanded.Application, + otp_app: :my_app, + event_store: [ + adapter: Commanded.EventStore.Adapters.EventStore, + event_store: MyApp.EventStore + ], + registry: :global +end +``` + +Or configure your application to use the `:global` registry in config: + +```elixir +# config/config.exs +config :my_app, MyApp.Application, registry: :global +``` ### Commanded Swarm registry @@ -16,9 +44,7 @@ Add `commanded_swarm_registry` to your list of dependencies in `mix.exs`: ```elixir def deps do - [ - {:commanded_swarm_registry, "~> 0.1"} - ] + [{:commanded_swarm_registry, "~> 0.1"}] end ``` @@ -43,14 +69,14 @@ First, add it as a dependency to your project's `mix.exs` file: ```elixir defp deps do - [{:phoenix_pubsub, "~> 1.0"}] + [{:phoenix_pubsub, "~> 1.1"}] end ``` Fetch mix deps and configure the pubsub settings for your application in the environment config file: ```elixir -# `config/config.exs` +# config/config.exs config :my_app, MyApp.Application, pubsub: [ phoenix_pubsub: [ @@ -62,9 +88,9 @@ config :my_app, MyApp.Application, The PG2 adapter is preferable for cluster deployment since it is provided by Erlang and requires no further dependencies. -## Multi node, but not clustered deployment +## Multi node, but not distributed Erlang deployment -Running multiple nodes, but choosing not to connect the nodes together to form a cluster, requires that you use the local registry and Phoenix's pub/sub library with its Redis adapter. +Running multiple nodes, but choosing not to connect the nodes together to form a distributed Erlang cluster, requires that you use the local registry and Phoenix's pub/sub library with its Redis adapter. You must install and use Phoenix's pub/sub library, [as described above](#phoenix-pub-sub). diff --git a/lib/commanded/registration.ex b/lib/commanded/registration.ex index 0499ab0a..eecbd3d4 100644 --- a/lib/commanded/registration.ex +++ b/lib/commanded/registration.ex @@ -54,6 +54,9 @@ defmodule Commanded.Registration do :local -> {Commanded.Registration.LocalRegistry, []} + :global -> + {Commanded.Registration.GlobalRegistry, []} + adapter when is_atom(adapter) -> {adapter, []} diff --git a/lib/commanded/registration/global_registry.ex b/lib/commanded/registration/global_registry.ex new file mode 100644 index 00000000..78c6f67a --- /dev/null +++ b/lib/commanded/registration/global_registry.ex @@ -0,0 +1,121 @@ +defmodule Commanded.Registration.GlobalRegistry do + @moduledoc """ + Distributed process registration using Erlangs `:global` registry[1]. + + [1] http://erlang.org/doc/man/global.html + """ + + @behaviour Commanded.Registration.Adapter + + @doc """ + Return an optional supervisor spec for the registry + """ + @impl Commanded.Registration.Adapter + def child_spec(application, _config) do + {:ok, [], %{application: application}} + end + + @doc """ + Starts a supervisor. + """ + @impl Commanded.Registration.Adapter + def supervisor_child_spec(_adapter_meta, module, arg) do + spec = %{ + id: module, + start: {module, :start_link, [arg]}, + type: :supervisor + } + + Supervisor.child_spec(spec, []) + end + + @doc """ + Starts a uniquely named child process of a supervisor using the given module + and args. + + Registers the pid with the given name. + """ + @impl Commanded.Registration.Adapter + def start_child(adapter_meta, name, supervisor, child_spec) do + via_name = via_tuple(adapter_meta, name) + + child_spec = + case child_spec do + module when is_atom(module) -> + {module, name: via_name} + + {module, args} when is_atom(module) and is_list(args) -> + {module, Keyword.put(args, :name, via_name)} + end + + case DynamicSupervisor.start_child(supervisor, child_spec) do + {:error, {:already_started, pid}} -> {:ok, pid} + reply -> reply + end + end + + @doc """ + Starts a uniquely named `GenServer` process for the given module and args. + + Registers the pid with the given name. + """ + @impl Commanded.Registration.Adapter + def start_link(adapter_meta, name, module, args) do + via_name = via_tuple(adapter_meta, name) + + case GenServer.start_link(module, args, name: via_name) do + {:error, {:already_started, pid}} -> {:ok, pid} + reply -> reply + end + end + + @doc """ + Get the pid of a registered name. + + Returns `:undefined` if the name is unregistered. + """ + @impl Commanded.Registration.Adapter + def whereis_name(adapter_meta, name) do + global_name = global_name(adapter_meta, name) + + :global.whereis_name(global_name) + end + + @doc """ + Return a `:via` tuple to route a message to a process by its registered name + """ + @impl Commanded.Registration.Adapter + def via_tuple(adapter_meta, name) do + global_name = global_name(adapter_meta, name) + + {:via, :global, global_name} + end + + @doc false + def handle_call(_request, _from, _state) do + raise "attempted to call GenServer #{inspect(proc())} but no handle_call/3 clause was provided" + end + + @doc false + def handle_cast(_request, _state) do + raise "attempted to cast GenServer #{inspect(proc())} but no handle_cast/2 clause was provided" + end + + @doc false + def handle_info(_msg, state) do + {:noreply, state} + end + + defp proc do + case Process.info(self(), :registered_name) do + {_, []} -> self() + {_, name} -> name + end + end + + defp global_name(adapter_meta, name) do + application = Map.fetch!(adapter_meta, :application) + + {application, name} + end +end diff --git a/mix.lock b/mix.lock index fca76ff7..be1fcd0e 100644 --- a/mix.lock +++ b/mix.lock @@ -1,16 +1,16 @@ %{ - "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm"}, - "dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm"}, - "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm"}, - "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, - "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm"}, - "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, - "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, - "mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"}, - "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.2", "1d71150d5293d703a9c38d4329da57d3935faed2031d64bc19e77b654ef2d177", [:mix], [], "hexpm"}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"}, + "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, + "dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "506294d6c543e4e5282d4852aead19ace8a35bedeb043f9256a06a6336827122"}, + "earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm", "5e8806285d8a3a8999bd38e4a73c58d28534c856bc38c44818e5ba85bbda16fb"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"}, + "ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f1155337ae17ff7a1255217b4c1ceefcd1860b7ceb1a1874031e7a861b052e39"}, + "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm", "b4cfa2d69c7f0b18fd06db222b2398abeef743a72504e6bd7df9c52f171b047f"}, + "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, + "mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"}, + "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm", "052346cf322311c49a0f22789f3698eea030eec09b8c47367f0686ef2634ae14"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.2", "1d71150d5293d703a9c38d4329da57d3935faed2031d64bc19e77b654ef2d177", [:mix], [], "hexpm", "51aa192e0941313c394956718bdb1e59325874f88f45871cff90345b97f60bba"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm", "1f13f9f0f3e769a667a6b6828d29dec37497a082d195cc52dbef401a9b69bf38"}, } diff --git a/test/registration/global_registry_test.exs b/test/registration/global_registry_test.exs new file mode 100644 index 00000000..6d9081d0 --- /dev/null +++ b/test/registration/global_registry_test.exs @@ -0,0 +1,14 @@ +defmodule Commanded.GlobalRegistryTest do + alias Commanded.Registration.GlobalRegistry + alias Commanded.RegistrationTestCase + + use RegistrationTestCase, registry: GlobalRegistry + + setup do + {:ok, child_spec, registry_meta} = GlobalRegistry.child_spec(GlobalRegistry, []) + + for child <- child_spec, do: start_supervised!(child) + + [registry_meta: registry_meta] + end +end diff --git a/test/registration/local_registry_test.exs b/test/registration/local_registry_test.exs index 72141c5f..9a537850 100644 --- a/test/registration/local_registry_test.exs +++ b/test/registration/local_registry_test.exs @@ -1,7 +1,14 @@ defmodule Commanded.LocalRegistryTest do - alias Commanded.DefaultApp - alias Commanded.RegistrationTestCase alias Commanded.Registration.LocalRegistry + alias Commanded.RegistrationTestCase + + use RegistrationTestCase, registry: LocalRegistry + + setup do + {:ok, child_spec, registry_meta} = LocalRegistry.child_spec(LocalRegistry, []) + + for child <- child_spec, do: start_supervised!(child) - use RegistrationTestCase, application: DefaultApp, registry: LocalRegistry + [registry_meta: registry_meta] + end end diff --git a/test/registration/support/registered_supervisor.ex b/test/registration/support/registered_supervisor.ex index 5afccbf7..7e729617 100644 --- a/test/registration/support/registered_supervisor.ex +++ b/test/registration/support/registered_supervisor.ex @@ -1,15 +1,14 @@ defmodule Commanded.Registration.RegisteredSupervisor do use DynamicSupervisor - alias Commanded.Registration alias Commanded.Registration.RegisteredServer def start_link(arg) do DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__) end - def start_child(application, name) do - Registration.start_child(application, name, __MODULE__, {RegisteredServer, []}) + def start_child(registry, registry_meta, name) do + registry.start_child(registry_meta, name, __MODULE__, {RegisteredServer, []}) end def init(_arg) do diff --git a/test/registration/support/registration_test_case.ex b/test/registration/support/registration_test_case.ex index d50fb2b7..13977250 100644 --- a/test/registration/support/registration_test_case.ex +++ b/test/registration/support/registration_test_case.ex @@ -2,60 +2,78 @@ defmodule Commanded.RegistrationTestCase do import Commanded.SharedTestCase define_tests do - alias Commanded.Registration alias Commanded.Registration.{RegisteredServer, RegisteredSupervisor} - setup %{application: application} do - start_supervised!(application) + setup do supervisor = start_supervised!(RegisteredSupervisor) [supervisor: supervisor] end describe "`start_child/3`" do - test "should return child process PID on success", %{application: application} do - assert {:ok, _pid} = RegisteredSupervisor.start_child(application, "child") + test "should return child process PID on success", %{ + registry: registry, + registry_meta: registry_meta + } do + assert {:ok, pid} = RegisteredSupervisor.start_child(registry, registry_meta, "child") + assert is_pid(pid) end test "should return existing child process when already started", %{ - application: application + registry: registry, + registry_meta: registry_meta } do - assert {:ok, pid} = RegisteredSupervisor.start_child(application, "child") - assert {:ok, ^pid} = RegisteredSupervisor.start_child(application, "child") + assert {:ok, pid} = RegisteredSupervisor.start_child(registry, registry_meta, "child") + assert {:ok, ^pid} = RegisteredSupervisor.start_child(registry, registry_meta, "child") end end describe "`start_link/3`" do - test "should return process PID on success", %{application: application} do - assert {:ok, pid} = start_link(application, "registered") + test "should return process PID on success", %{ + registry: registry, + registry_meta: registry_meta + } do + assert {:ok, pid} = start_link(registry, registry_meta, "registered") assert is_pid(pid) end - test "should return existing process when already started", %{application: application} do - assert {:ok, pid} = start_link(application, "registered") - assert {:ok, ^pid} = start_link(application, "registered") + test "should return existing process when already started", %{ + registry: registry, + registry_meta: registry_meta + } do + assert {:ok, pid} = start_link(registry, registry_meta, "registered") + assert {:ok, ^pid} = start_link(registry, registry_meta, "registered") end end describe "`whereis_name/1`" do - test "should return `:undefined` when not registered", %{application: application} do - assert Registration.whereis_name(application, "notregistered") == :undefined + test "should return `:undefined` when not registered", %{ + registry: registry, + registry_meta: registry_meta + } do + assert registry.whereis_name(registry_meta, "notregistered") == :undefined end - test "should return `PID` when child registered", %{application: application} do - assert {:ok, pid} = RegisteredSupervisor.start_child(application, "child") - assert Registration.whereis_name(application, "child") == pid + test "should return `PID` when child registered", %{ + registry: registry, + registry_meta: registry_meta + } do + assert {:ok, pid} = RegisteredSupervisor.start_child(registry, registry_meta, "child") + assert registry.whereis_name(registry_meta, "child") == pid end - test "should return `PID` when process registered", %{application: application} do - assert {:ok, pid} = start_link(application, "registered") - assert Registration.whereis_name(application, "registered") == pid + test "should return `PID` when process registered", %{ + registry: registry, + registry_meta: registry_meta + } do + assert {:ok, pid} = start_link(registry, registry_meta, "registered") + assert registry.whereis_name(registry_meta, "registered") == pid end end describe "`supervisor_child_spec/2`" do - test "should return a valid child_spec", %{application: application} do - assert Registration.supervisor_child_spec(application, RegisteredSupervisor, "child") == + test "should return a valid child_spec", %{registry: registry, registry_meta: registry_meta} do + assert registry.supervisor_child_spec(registry_meta, RegisteredSupervisor, "child") == %{ id: Commanded.Registration.RegisteredSupervisor, start: {Commanded.Registration.RegisteredSupervisor, :start_link, ["child"]}, @@ -64,8 +82,8 @@ defmodule Commanded.RegistrationTestCase do end end - defp start_link(application, name) do - Registration.start_link(application, name, RegisteredServer, []) + defp start_link(registry, registry_meta, name) do + registry.start_link(registry_meta, name, RegisteredServer, []) end end end From 3ecfeb13e6659b02e3c2ea0a42fb634a0c032217 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 18 Feb 2020 10:27:09 +0000 Subject: [PATCH 03/11] Include #344 in CHANGELOG --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f9c8fd9..52e377c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Next release + +### Enhancements + +- Global registry using Erlang's `:global` module ([#344](https://github.com/commanded/commanded/pull/344)). + ## v1.0.0 ### Breaking changes From 354d0d901d6c00beef0e0408e7e0aa1fd549da83 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 17 Feb 2020 23:15:50 +0000 Subject: [PATCH 04/11] Fix distributed subscription registration bug - Register event handler and process manager subscriptions on all nodes, after process started. - Distributed subscriptions test. - Start epmd daemon before running tests in Travis CI config. --- .travis.yml | 5 +- config/test.exs | 6 +- lib/commanded/event/handler.ex | 19 +++-- .../process_managers/process_router.ex | 19 +++-- lib/commanded/subscriptions.ex | 2 +- lib/commanded/subscriptions/registry.ex | 11 +-- mix.exs | 2 + mix.lock | 10 +-- test/commands/dispatch_consistency_test.exs | 63 +++++++--------- .../eventually_consistent_event_handler.ex | 25 +++---- .../optional_strongly_consistent_handler.ex | 21 ++---- .../strongly_consistent_event_handler.ex | 23 +++--- .../strongly_consistent_process_manager.ex | 6 +- .../event/event_handler_subscription_test.exs | 2 +- .../process_manager_subscription_test.exs | 2 +- .../distributed_subscriptions_test.exs | 29 ++++++++ test/subscriptions/subscriptions_test.exs | 2 +- .../support/distributed_subscribers.ex | 71 +++++++++++++++++++ test/support/distributed_app.ex | 18 +++++ 19 files changed, 214 insertions(+), 122 deletions(-) create mode 100644 test/subscriptions/distributed_subscriptions_test.exs create mode 100644 test/subscriptions/support/distributed_subscribers.ex create mode 100644 test/support/distributed_app.ex diff --git a/.travis.yml b/.travis.yml index caf508af..3fd1cf56 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,9 @@ elixir: otp_release: - 22.0 +before_script: + - epmd -daemon + script: - - mix test + - mix test --include distributed - travis_wait 30 mix dialyzer --halt-exit-status diff --git a/config/test.exs b/config/test.exs index 341e3886..6d1bcef3 100644 --- a/config/test.exs +++ b/config/test.exs @@ -6,8 +6,9 @@ alias Commanded.Serialization.JsonSerializer config :logger, :console, level: :warn, format: "[$level] $message\n" config :ex_unit, + assert_receive_timeout: 1_000, capture_log: true, - assert_receive_timeout: 1_000 + exclude: [:distributed] config :commanded, assert_receive_event_timeout: 100, @@ -21,7 +22,8 @@ default_app_config = [ ] config :commanded, Commanded.Commands.ConsistencyApp, default_app_config -config :commanded, Commanded.DefaultApp, default_app_config +config :commanded, Commanded.DefaultApp, [] +config :commanded, Commanded.DistributedApp, [] config :commanded, Commanded.Event.Upcast.ProcessManager.Application, default_app_config config :commanded, Commanded.ProcessManagers.ErrorApp, default_app_config config :commanded, Commanded.ProcessManagers.ExampleApp, default_app_config diff --git a/lib/commanded/event/handler.ex b/lib/commanded/event/handler.ex index 457d9c05..8926ab9c 100644 --- a/lib/commanded/event/handler.ex +++ b/lib/commanded/event/handler.ex @@ -388,17 +388,23 @@ defmodule Commanded.Event.Handler do @doc false def start_link(application, handler_name, handler_module, opts \\ []) do name = name(application, handler_name) + consistency = consistency(opts) handler = %Handler{ application: application, handler_name: handler_name, handler_module: handler_module, - consistency: consistency(opts), + consistency: consistency, subscribe_from: start_from(opts), subscribe_to: subscribe_to(opts) } - Registration.start_link(application, name, __MODULE__, handler) + with {:ok, pid} <- Registration.start_link(application, name, __MODULE__, handler) do + # Register the started event handler as a subscription with the given consistency + :ok = Subscriptions.register(application, handler_name, pid, consistency) + + {:ok, pid} + end end def name(application, handler_name), do: {application, __MODULE__, handler_name} @@ -406,8 +412,6 @@ defmodule Commanded.Event.Handler do @doc false @impl GenServer def init(%Handler{} = state) do - :ok = register_subscription(state) - {:ok, state, {:continue, :subscribe_to_events}} end @@ -509,13 +513,6 @@ defmodule Commanded.Event.Handler do {:stop, reason, state} end - # Register this event handler as a subscription with the given consistency. - defp register_subscription(%Handler{} = state) do - %Handler{application: application, consistency: consistency, handler_name: name} = state - - Subscriptions.register(application, name, consistency) - end - defp reset_subscription(%Handler{} = state) do %Handler{ application: application, diff --git a/lib/commanded/process_managers/process_router.ex b/lib/commanded/process_managers/process_router.ex index 882f0373..8a0ab113 100644 --- a/lib/commanded/process_managers/process_router.ex +++ b/lib/commanded/process_managers/process_router.ex @@ -39,24 +39,28 @@ defmodule Commanded.ProcessManagers.ProcessRouter do def start_link(application, process_name, process_module, opts \\ []) do name = {application, ProcessRouter, process_name} + consistency = Keyword.get(opts, :consistency, :eventual) state = %State{ application: application, process_manager_name: process_name, process_manager_module: process_module, - consistency: Keyword.get(opts, :consistency, :eventual), + consistency: consistency, subscribe_from: Keyword.get(opts, :start_from, :origin), event_timeout: Keyword.get(opts, :event_timeout), idle_timeout: Keyword.get(opts, :idle_timeout, :infinity) } - Registration.start_link(application, name, __MODULE__, state) + with {:ok, pid} <- Registration.start_link(application, name, __MODULE__, state) do + # Register the process manager as a subscription with the given consistency. + :ok = Subscriptions.register(application, process_name, pid, consistency) + + {:ok, pid} + end end @impl GenServer def init(%State{} = state) do - :ok = register_subscription(state) - {:ok, state, {:continue, :subscribe_to_events}} end @@ -256,13 +260,6 @@ defmodule Commanded.ProcessManagers.ProcessRouter do {:stop, reason, state} end - # Register this process manager as a subscription with the given consistency. - defp register_subscription(%State{} = state) do - %State{application: application, consistency: consistency, process_manager_name: name} = state - - Subscriptions.register(application, name, consistency) - end - defp subscribe_to_events(%State{} = state) do %State{ application: application, diff --git a/lib/commanded/subscriptions.ex b/lib/commanded/subscriptions.ex index e17912a6..558795cd 100644 --- a/lib/commanded/subscriptions.ex +++ b/lib/commanded/subscriptions.ex @@ -22,7 +22,7 @@ defmodule Commanded.Subscriptions do GenServer.start_link(__MODULE__, subscriptions_opts, start_opts) end - defdelegate register(application, name, consistency), to: Subscriptions.Registry + defdelegate register(application, name, pid \\ self(), consistency), to: Subscriptions.Registry defdelegate all(application), to: Subscriptions.Registry @doc """ diff --git a/lib/commanded/subscriptions/registry.ex b/lib/commanded/subscriptions/registry.ex index 1af8992e..777d0af0 100644 --- a/lib/commanded/subscriptions/registry.ex +++ b/lib/commanded/subscriptions/registry.ex @@ -16,13 +16,16 @@ defmodule Commanded.Subscriptions.Registry do @doc """ Register an event store subscription with the given consistency guarantee. """ - def register(application, name, consistency) - def register(_application, _name, :eventual), do: :ok + def register(application, name, pid, consistency) - def register(application, name, :strong) do + # Ignore subscriptions with `:eventual` consistency + def register(_application, _name, _pid, :eventual), do: :ok + + # Register subscriptions with `:strong` consistency + def register(application, name, pid, :strong) do table_name = table_name(application) - true = :ets.insert(table_name, {name, self()}) + true = :ets.insert(table_name, {name, pid}) :ok end diff --git a/mix.exs b/mix.exs index a9742cd9..8ecf445a 100644 --- a/mix.exs +++ b/mix.exs @@ -44,6 +44,7 @@ defmodule Commanded.Mixfile do "test/process_managers/support", "test/pubsub/support", "test/registration/support", + "test/subscriptions/support", "test/support" ] @@ -61,6 +62,7 @@ defmodule Commanded.Mixfile do {:benchfella, "~> 0.3", only: :bench}, {:dialyxir, "~> 1.0.0-rc.7", only: :dev, runtime: false}, {:ex_doc, "~> 0.21", only: :dev}, + {:local_cluster, "~> 1.1", only: [:test]}, {:mix_test_watch, "~> 1.0", only: :dev}, {:mox, "~> 0.5", only: [:bench, :test]} ] diff --git a/mix.lock b/mix.lock index be1fcd0e..6caa0f00 100644 --- a/mix.lock +++ b/mix.lock @@ -1,16 +1,18 @@ %{ "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "506294d6c543e4e5282d4852aead19ace8a35bedeb043f9256a06a6336827122"}, - "earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm", "5e8806285d8a3a8999bd38e4a73c58d28534c856bc38c44818e5ba85bbda16fb"}, + "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"}, - "ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f1155337ae17ff7a1255217b4c1ceefcd1860b7ceb1a1874031e7a861b052e39"}, - "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm", "b4cfa2d69c7f0b18fd06db222b2398abeef743a72504e6bd7df9c52f171b047f"}, + "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"}, + "file_system": {:hex, :file_system, "0.2.8", "f632bd287927a1eed2b718f22af727c5aeaccc9a98d8c2bd7bff709e851dc986", [:mix], [], "hexpm", "97a3b6f8d63ef53bd0113070102db2ce05352ecf0d25390eb8d747c2bde98bca"}, + "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "fef6476083cf6f4c0526bb682de7ff75cd8b4bd4b7e20b3be60c1dab05f28ca7"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, "mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"}, "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm", "052346cf322311c49a0f22789f3698eea030eec09b8c47367f0686ef2634ae14"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.2", "1d71150d5293d703a9c38d4329da57d3935faed2031d64bc19e77b654ef2d177", [:mix], [], "hexpm", "51aa192e0941313c394956718bdb1e59325874f88f45871cff90345b97f60bba"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm", "1f13f9f0f3e769a667a6b6828d29dec37497a082d195cc52dbef401a9b69bf38"}, } diff --git a/test/commands/dispatch_consistency_test.exs b/test/commands/dispatch_consistency_test.exs index 04d00ded..c518fc87 100644 --- a/test/commands/dispatch_consistency_test.exs +++ b/test/commands/dispatch_consistency_test.exs @@ -3,7 +3,6 @@ defmodule Commanded.Commands.DispatchConsistencyTest do alias Commanded.Commands.{ ConsistencyAggregateRoot, - ConsistencyRouter, ConsistencyPrefixRouter, EventuallyConsistentEventHandler, ExecutionResult, @@ -13,28 +12,24 @@ defmodule Commanded.Commands.DispatchConsistencyTest do } alias Commanded.Commands.ConsistencyApp - alias Commanded.DefaultApp alias Commanded.EventStore - alias Commanded.Helpers.ProcessHelper alias ConsistencyAggregateRoot.ConsistencyCommand alias ConsistencyAggregateRoot.NoOpCommand alias ConsistencyAggregateRoot.RequestDispatchCommand setup do - start_supervised!(DefaultApp) start_supervised!(ConsistencyApp) :ok end describe "event handlers" do - setup :start_event_handlers + setup [:start_event_handlers] test "should wait for strongly consistent event handler to handle event" do command = %ConsistencyCommand{uuid: UUID.uuid4(), delay: 0} - opts = [application: DefaultApp, consistency: :strong] - assert :ok = ConsistencyRouter.dispatch(command, opts) + assert :ok = ConsistencyApp.dispatch(command, consistency: :strong) end # default consistency timeout set to 100ms test config @@ -42,28 +37,26 @@ defmodule Commanded.Commands.DispatchConsistencyTest do command = %ConsistencyCommand{uuid: UUID.uuid4(), delay: 5_000} assert {:error, :consistency_timeout} = - ConsistencyRouter.dispatch(command, application: DefaultApp, consistency: :strong) + ConsistencyApp.dispatch(command, consistency: :strong) end test "should not wait when command creates no events" do command = %NoOpCommand{uuid: UUID.uuid4()} - assert :ok = - ConsistencyRouter.dispatch(command, application: DefaultApp, consistency: :strong) + assert :ok = ConsistencyApp.dispatch(command, consistency: :strong) end test "should allow strongly consistent event handler to dispatch a command" do command = %RequestDispatchCommand{uuid: UUID.uuid4(), delay: 0} - assert :ok = - ConsistencyRouter.dispatch(command, application: DefaultApp, consistency: :strong) + assert :ok = ConsistencyApp.dispatch(command, consistency: :strong) end test "should timeout waiting for strongly consistent handler dispatching a command" do command = %RequestDispatchCommand{uuid: UUID.uuid4(), delay: 5_000} assert {:error, :consistency_timeout} = - ConsistencyRouter.dispatch(command, application: DefaultApp, consistency: :strong) + ConsistencyApp.dispatch(command, consistency: :strong) end end @@ -72,17 +65,21 @@ defmodule Commanded.Commands.DispatchConsistencyTest do test "should only wait for opt-in strongly consistent event handler to handle event" do command = %ConsistencyCommand{uuid: UUID.uuid4(), delay: 100} - opts = [application: DefaultApp, consistency: [OptionalStronglyConsistentEventHandler]] + opts = [application: ConsistencyApp, consistency: [OptionalStronglyConsistentEventHandler]] - assert :ok = ConsistencyRouter.dispatch(command, opts) + assert :ok = ConsistencyApp.dispatch(command, opts) end # Default consistency timeout set to 100ms test config test "should timeout waiting for strongly consistent event handler to handle event" do command = %ConsistencyCommand{uuid: UUID.uuid4(), delay: 5_000} - opts = [application: DefaultApp, consistency: ["OptionalStronglyConsistentEventHandler"]] - assert {:error, :consistency_timeout} = ConsistencyRouter.dispatch(command, opts) + opts = [ + application: ConsistencyApp, + consistency: ["OptionalStronglyConsistentEventHandler"] + ] + + assert {:error, :consistency_timeout} = ConsistencyApp.dispatch(command, opts) end end @@ -95,7 +92,7 @@ defmodule Commanded.Commands.DispatchConsistencyTest do assert :ok = ConsistencyPrefixRouter.dispatch(command, - application: DefaultApp, + application: ConsistencyApp, consistency: :strong ) end @@ -106,14 +103,16 @@ defmodule Commanded.Commands.DispatchConsistencyTest do assert {:ok, %ExecutionResult{aggregate_uuid: aggregate_uuid}} = ConsistencyPrefixRouter.dispatch(command, - application: DefaultApp, + application: ConsistencyApp, consistency: :strong, include_execution_result: true ) assert aggregate_uuid == "example-prefix-" <> uuid - recorded_events = EventStore.stream_forward(DefaultApp, aggregate_uuid) |> Enum.to_list() + recorded_events = + EventStore.stream_forward(ConsistencyApp, aggregate_uuid) |> Enum.to_list() + assert length(recorded_events) == 1 end end @@ -124,39 +123,25 @@ defmodule Commanded.Commands.DispatchConsistencyTest do test "should successfully dispatch command" do command = %RequestDispatchCommand{uuid: UUID.uuid4(), delay: 5_000} - assert :ok = - ConsistencyRouter.dispatch(command, application: DefaultApp, consistency: :strong) + assert :ok = ConsistencyApp.dispatch(command, consistency: :strong) end end def start_event_handlers(_context) do - {:ok, handler1} = StronglyConsistentEventHandler.start_link() - {:ok, handler2} = EventuallyConsistentEventHandler.start_link() - - on_exit(fn -> - ProcessHelper.shutdown(handler1) - ProcessHelper.shutdown(handler2) - end) + start_supervised!(StronglyConsistentEventHandler) + start_supervised!(EventuallyConsistentEventHandler) :ok end def start_optional_handler(_context) do - {:ok, handler3} = OptionalStronglyConsistentEventHandler.start_link() - - on_exit(fn -> - ProcessHelper.shutdown(handler3) - end) + start_supervised!(OptionalStronglyConsistentEventHandler) :ok end def start_process_manager(_context) do - {:ok, pm} = StronglyConsistentProcessManager.start_link() - - on_exit(fn -> - ProcessHelper.shutdown(pm) - end) + start_supervised!(StronglyConsistentProcessManager) :ok end diff --git a/test/commands/support/consistency/eventually_consistent_event_handler.ex b/test/commands/support/consistency/eventually_consistent_event_handler.ex index 3b13993c..d54af1d2 100644 --- a/test/commands/support/consistency/eventually_consistent_event_handler.ex +++ b/test/commands/support/consistency/eventually_consistent_event_handler.ex @@ -1,31 +1,28 @@ defmodule Commanded.Commands.EventuallyConsistentEventHandler do use Commanded.Event.Handler, - application: Commanded.DefaultApp, + application: Commanded.Commands.ConsistencyApp, name: "EventuallyConsistentEventHandler", consistency: :eventual - alias Commanded.Commands.{ - ConsistencyAggregateRoot, - ConsistencyRouter - } - - alias ConsistencyAggregateRoot.{ - ConsistencyCommand, - ConsistencyEvent, - DispatchRequestedEvent - } + alias Commanded.Commands.ConsistencyApp + alias Commanded.Commands.ConsistencyAggregateRoot + alias ConsistencyAggregateRoot.{ConsistencyCommand, ConsistencyEvent, DispatchRequestedEvent} + @doc """ + Simulate slow event handler. + """ def handle(%ConsistencyEvent{}, _metadata) do - # Simulate slow event handler :timer.sleep(:infinity) :ok end - # Handle event by dispatching a command. + @doc """ + Dispatch a command. + """ def handle(%DispatchRequestedEvent{uuid: uuid, delay: delay}, _metadata) do command = %ConsistencyCommand{uuid: uuid, delay: delay} - ConsistencyRouter.dispatch(command, application: Commanded.DefaultApp) + ConsistencyApp.dispatch(command) end end diff --git a/test/commands/support/consistency/optional_strongly_consistent_handler.ex b/test/commands/support/consistency/optional_strongly_consistent_handler.ex index 0da2962a..d8201166 100644 --- a/test/commands/support/consistency/optional_strongly_consistent_handler.ex +++ b/test/commands/support/consistency/optional_strongly_consistent_handler.ex @@ -1,19 +1,12 @@ defmodule Commanded.Commands.OptionalStronglyConsistentEventHandler do use Commanded.Event.Handler, - application: Commanded.DefaultApp, + application: Commanded.Commands.ConsistencyApp, name: "OptionalStronglyConsistentEventHandler", consistency: :strong - alias Commanded.Commands.{ - ConsistencyAggregateRoot, - ConsistencyRouter - } - - alias ConsistencyAggregateRoot.{ - ConsistencyCommand, - ConsistencyEvent, - DispatchRequestedEvent - } + alias Commanded.Commands.ConsistencyApp + alias Commanded.Commands.ConsistencyAggregateRoot + alias ConsistencyAggregateRoot.{ConsistencyCommand, ConsistencyEvent, DispatchRequestedEvent} def handle(%ConsistencyEvent{delay: delay}, _metadata) do :timer.sleep(round(delay / 10)) @@ -26,10 +19,6 @@ defmodule Commanded.Commands.OptionalStronglyConsistentEventHandler do command = %ConsistencyCommand{uuid: uuid, delay: delay} - ConsistencyRouter.dispatch( - command, - application: Commanded.DefaultApp, - consistency: :strong - ) + ConsistencyApp.dispatch(command, consistency: :strong) end end diff --git a/test/commands/support/consistency/strongly_consistent_event_handler.ex b/test/commands/support/consistency/strongly_consistent_event_handler.ex index f90b54a4..e650490d 100644 --- a/test/commands/support/consistency/strongly_consistent_event_handler.ex +++ b/test/commands/support/consistency/strongly_consistent_event_handler.ex @@ -1,29 +1,26 @@ defmodule Commanded.Commands.StronglyConsistentEventHandler do use Commanded.Event.Handler, - application: Commanded.DefaultApp, + application: Commanded.Commands.ConsistencyApp, name: "StronglyConsistentEventHandler", consistency: :strong - alias Commanded.Commands.{ - ConsistencyAggregateRoot, - ConsistencyRouter - } + alias Commanded.Commands.{ConsistencyAggregateRoot, ConsistencyApp} + alias ConsistencyAggregateRoot.{ConsistencyCommand, ConsistencyEvent, DispatchRequestedEvent} - alias ConsistencyAggregateRoot.{ - ConsistencyCommand, - ConsistencyEvent, - DispatchRequestedEvent - } - - # handle event by dispatching a command + @doc """ + Dispatch a command with consistency `:strong` after an optional delay. + """ def handle(%DispatchRequestedEvent{uuid: uuid, delay: delay}, _metadata) do :timer.sleep(delay) command = %ConsistencyCommand{uuid: uuid, delay: delay} - ConsistencyRouter.dispatch(command, application: Commanded.DefaultApp, consistency: :strong) + ConsistencyApp.dispatch(command, consistency: :strong) end + @doc """ + Block for a requested delay. + """ def handle(%ConsistencyEvent{delay: delay}, _metadata) do :timer.sleep(delay) diff --git a/test/commands/support/consistency/strongly_consistent_process_manager.ex b/test/commands/support/consistency/strongly_consistent_process_manager.ex index 9f9b92a4..ba180714 100644 --- a/test/commands/support/consistency/strongly_consistent_process_manager.ex +++ b/test/commands/support/consistency/strongly_consistent_process_manager.ex @@ -1,8 +1,6 @@ defmodule Commanded.Commands.StronglyConsistentProcessManager do - alias Commanded.Commands.{ConsistencyApp, StronglyConsistentProcessManager} - use Commanded.ProcessManagers.ProcessManager, - application: ConsistencyApp, + application: Commanded.Commands.ConsistencyApp, name: __MODULE__, consistency: :strong @@ -18,6 +16,8 @@ defmodule Commanded.Commands.StronglyConsistentProcessManager do NoOpCommand } + alias Commanded.Commands.StronglyConsistentProcessManager + def interested?(%DispatchRequestedEvent{uuid: uuid}), do: {:start, uuid} def interested?(%ConsistencyEvent{uuid: uuid}), do: {:continue, uuid} diff --git a/test/event/event_handler_subscription_test.exs b/test/event/event_handler_subscription_test.exs index 896a8840..9b3cd842 100644 --- a/test/event/event_handler_subscription_test.exs +++ b/test/event/event_handler_subscription_test.exs @@ -27,7 +27,7 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do shutdown_subscription(subscription) - assert_receive {:DOWN, ^ref, :process, ^handler, :normal} + assert_receive {:DOWN, ^ref, :process, ^handler, _reason} end end diff --git a/test/process_managers/process_manager_subscription_test.exs b/test/process_managers/process_manager_subscription_test.exs index 58e0ef63..c69ff119 100644 --- a/test/process_managers/process_manager_subscription_test.exs +++ b/test/process_managers/process_manager_subscription_test.exs @@ -32,7 +32,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do shutdown_subscription(subscription) - assert_receive {:DOWN, ^ref, :process, ^pm, :normal} + assert_receive {:DOWN, ^ref, :process, ^pm, _reason} end end diff --git a/test/subscriptions/distributed_subscriptions_test.exs b/test/subscriptions/distributed_subscriptions_test.exs new file mode 100644 index 00000000..ed6096b2 --- /dev/null +++ b/test/subscriptions/distributed_subscriptions_test.exs @@ -0,0 +1,29 @@ +defmodule Commanded.DistributedSubscriptionsTest do + use Commanded.StorageCase + + alias Commanded.Subscriptions.DistributedSubscribers + + @moduletag :distributed + + setup do + :ok = LocalCluster.start() + + nodes = LocalCluster.start_nodes("commanded", 3) + + [nodes: nodes] + end + + describe "distributed subscriptions" do + test "should be registered on all nodes", %{nodes: nodes} do + DistributedSubscribers.start_subscribers(nodes) + DistributedSubscribers.query_subscriptions(nodes) + + expected_subscribers = + DistributedSubscribers.all() |> Enum.map(& &1.__name__()) |> Enum.sort() + + for node <- nodes do + assert_receive {:subscriptions, ^node, ^expected_subscribers} + end + end + end +end diff --git a/test/subscriptions/subscriptions_test.exs b/test/subscriptions/subscriptions_test.exs index 9abd58f1..6c0612f6 100644 --- a/test/subscriptions/subscriptions_test.exs +++ b/test/subscriptions/subscriptions_test.exs @@ -10,7 +10,7 @@ defmodule Commanded.SubscriptionsTest do :ok end - describe "register event handler" do + describe "register subscription" do test "should be registered" do :ok = Subscriptions.register(DefaultApp, "handler1", :strong) :ok = Subscriptions.register(DefaultApp, "handler2", :eventual) diff --git a/test/subscriptions/support/distributed_subscribers.ex b/test/subscriptions/support/distributed_subscribers.ex new file mode 100644 index 00000000..d52fa9a2 --- /dev/null +++ b/test/subscriptions/support/distributed_subscribers.ex @@ -0,0 +1,71 @@ +defmodule Commanded.Subscriptions.DistributedSubscribers do + import ExUnit.Assertions + + alias Commanded.DistributedApp + alias Commanded.Subscriptions + + @event_handlers [EventHandler1, EventHandler2, EventHandler3] + @process_managers [ProcessManager1, ProcessManager2, ProcessManager3] + + for event_handler <- @event_handlers do + defmodule event_handler do + use Commanded.Event.Handler, + application: DistributedApp, + name: __MODULE__, + consistency: :strong + end + end + + for process_manager <- @process_managers do + defmodule process_manager do + use Commanded.ProcessManagers.ProcessManager, + application: DistributedApp, + name: __MODULE__, + consistency: :strong + end + end + + def all, do: @event_handlers ++ @process_managers + + def start_subscribers(nodes) do + reply_to = self() + + for node <- nodes do + Node.spawn_link(node, fn -> + Logger.configure(level: :error) + + {:ok, _pid} = DistributedApp.start_link() + + for subscriber <- Enum.shuffle(all()) do + {:ok, _pid} = subscriber.start_link() + + # Sleep to allow subscribers to be distributed amongst all nodes + :timer.sleep(100) + end + + send(reply_to, {:started, node}) + + :timer.sleep(:infinity) + end) + end + + for node <- nodes do + assert_receive {:started, ^node}, 5_000 + end + end + + def query_subscriptions(nodes) do + reply_to = self() + + for node <- nodes do + Node.spawn_link(node, fn -> + subscriptions = + Subscriptions.all(DistributedApp) + |> Enum.map(fn {name, _pid} -> name end) + |> Enum.sort() + + send(reply_to, {:subscriptions, node, subscriptions}) + end) + end + end +end diff --git a/test/support/distributed_app.ex b/test/support/distributed_app.ex new file mode 100644 index 00000000..3038f8ab --- /dev/null +++ b/test/support/distributed_app.ex @@ -0,0 +1,18 @@ +defmodule Commanded.DistributedApp do + alias Commanded.EventStore.Adapters.InMemory + alias Commanded.Serialization.JsonSerializer + + use Commanded.Application, + otp_app: :commanded, + event_store: [ + adapter: InMemory, + serializer: JsonSerializer + ], + pubsub: [ + phoenix_pubsub: [ + adapter: Phoenix.PubSub.PG2, + pool_size: 1 + ] + ], + registry: :global +end From e70708ff7ae8beeb7dbb8365fb3362298a8f181e Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 18 Feb 2020 22:54:41 +0000 Subject: [PATCH 05/11] Include #345 in CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52e377c8..b31beca9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ - Global registry using Erlang's `:global` module ([#344](https://github.com/commanded/commanded/pull/344)). +### Bug fixes + +- Fix distributed subscription registration bug ([#345](https://github.com/commanded/commanded/pull/345)). + ## v1.0.0 ### Breaking changes From 5058e665d3c9c43202e66fdc3db266bd72ac0d88 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 19 Feb 2020 13:32:45 +0000 Subject: [PATCH 06/11] Update Commanded Swam registry in documentation to v1.0 --- guides/Deployment.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/guides/Deployment.md b/guides/Deployment.md index d16f6047..b67f84cd 100644 --- a/guides/Deployment.md +++ b/guides/Deployment.md @@ -44,7 +44,7 @@ Add `commanded_swarm_registry` to your list of dependencies in `mix.exs`: ```elixir def deps do - [{:commanded_swarm_registry, "~> 0.1"}] + [{:commanded_swarm_registry, "~> 1.0"}] end ``` From 7e9607f8dba4cc815efb2dbbc4957d6c3af3a222 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 19 Feb 2020 22:00:09 +0000 Subject: [PATCH 07/11] Link processes started by global registry To ensure restart on termination. --- lib/commanded/registration/global_registry.ex | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/commanded/registration/global_registry.ex b/lib/commanded/registration/global_registry.ex index 78c6f67a..af923455 100644 --- a/lib/commanded/registration/global_registry.ex +++ b/lib/commanded/registration/global_registry.ex @@ -64,8 +64,18 @@ defmodule Commanded.Registration.GlobalRegistry do via_name = via_tuple(adapter_meta, name) case GenServer.start_link(module, args, name: via_name) do - {:error, {:already_started, pid}} -> {:ok, pid} - reply -> reply + {:error, {:already_started, pid}} -> + true = Process.link(pid) + + {:ok, pid} + + {:error, :killed} -> + # Process may be killed due to `:global` name registation when another node connects. + # Attempting to start again should link to the other named existing process. + start_link(adapter_meta, name, module, args) + + reply -> + reply end end From 8c4483671159a58646314b18c6cf7c8acfacca96 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Thu, 20 Feb 2020 10:29:41 +0000 Subject: [PATCH 08/11] Retry event handler and process manager subscription on error Restarting an event handler or process manager may encounter an error attempting to subscribe to the event store. Instead of crashing, the process will attempt to resubscribe with an exponentially increasing delay, including random jitter. The maximum delay between attempts is one minute. --- lib/commanded/event/handler.ex | 127 ++++++++--------- .../event_store/adapters/in_memory.ex | 118 +++++++++++----- lib/commanded/event_store/recorded_event.ex | 8 +- lib/commanded/event_store/subscription.ex | 131 ++++++++++++++++++ .../process_managers/process_router.ex | 87 +++++++----- mix.exs | 1 + mix.lock | 1 + .../event/event_handler_subscription_test.exs | 78 ++++++++--- .../process_manager_subscription_test.exs | 73 ++++++++-- 9 files changed, 452 insertions(+), 172 deletions(-) create mode 100644 lib/commanded/event_store/subscription.ex diff --git a/lib/commanded/event/handler.ex b/lib/commanded/event/handler.ex index 8926ab9c..8d0a3483 100644 --- a/lib/commanded/event/handler.ex +++ b/lib/commanded/event/handler.ex @@ -201,13 +201,12 @@ defmodule Commanded.Event.Handler do alias Commanded.Event.FailureContext alias Commanded.Event.Handler alias Commanded.Event.Upcast - alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent + alias Commanded.EventStore.Subscription alias Commanded.Subscriptions @type domain_event :: struct() @type metadata :: map() - @type subscribe_from :: :origin | :current | non_neg_integer() @type consistency :: :eventual | :strong @doc """ @@ -379,10 +378,8 @@ defmodule Commanded.Event.Handler do :handler_name, :handler_module, :last_seen_event, - :subscribe_from, - :subscribe_to, :subscription, - :subscription_ref + :subscribe_timer ] @doc false @@ -390,13 +387,20 @@ defmodule Commanded.Event.Handler do name = name(application, handler_name) consistency = consistency(opts) + subscription = + Subscription.new( + application: application, + subscription_name: handler_name, + subscribe_from: Keyword.get(opts, :start_from, :origin), + subscribe_to: Keyword.get(opts, :subscribe_to, :all) + ) + handler = %Handler{ application: application, handler_name: handler_name, handler_module: handler_module, consistency: consistency, - subscribe_from: start_from(opts), - subscribe_to: subscribe_to(opts) + subscription: subscription } with {:ok, pid} <- Registration.start_link(application, name, __MODULE__, handler) do @@ -412,13 +416,9 @@ defmodule Commanded.Event.Handler do @doc false @impl GenServer def init(%Handler{} = state) do - {:ok, state, {:continue, :subscribe_to_events}} - end + :ok = Process.send(self(), :subscribe_to_events, []) - @doc false - @impl GenServer - def handle_continue(:subscribe_to_events, %Handler{} = state) do - {:noreply, subscribe_to_events(state)} + {:ok, state} end @doc false @@ -432,8 +432,10 @@ defmodule Commanded.Event.Handler do @doc false @impl GenServer def handle_call(:config, _from, %Handler{} = state) do - %Handler{consistency: consistency, subscribe_from: subscribe_from, subscribe_to: subscribe_to} = - state + %Handler{ + consistency: consistency, + subscription: %Subscription{subscribe_from: subscribe_from, subscribe_to: subscribe_to} + } = state config = [consistency: consistency, start_from: subscribe_from, subscribe_to: subscribe_to] @@ -466,10 +468,19 @@ defmodule Commanded.Event.Handler do end end + @doc false + @impl GenServer + def handle_info(:subscribe_to_events, %Handler{} = state) do + {:noreply, subscribe_to_events(state)} + end + @doc false # Subscription to event store has successfully subscribed, init event handler @impl GenServer - def handle_info({:subscribed, subscription}, %Handler{subscription: subscription} = state) do + def handle_info( + {:subscribed, subscription}, + %Handler{subscription: %Subscription{subscription_pid: subscription}} = state + ) do Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end) %Handler{handler_module: handler_module} = state @@ -506,7 +517,10 @@ defmodule Commanded.Event.Handler do @doc false @impl GenServer - def handle_info({:DOWN, ref, :process, _pid, reason}, %Handler{subscription_ref: ref} = state) do + def handle_info( + {:DOWN, ref, :process, _pid, reason}, + %Handler{subscription: %Subscription{subscription_ref: ref}} = state + ) do Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end) # Stop event handler when event store subscription process terminates. @@ -514,36 +528,33 @@ defmodule Commanded.Event.Handler do end defp reset_subscription(%Handler{} = state) do - %Handler{ - application: application, - handler_name: handler_name, - subscribe_to: subscribe_to, - subscription_ref: subscription_ref, - subscription: subscription - } = state - - Process.demonitor(subscription_ref) + %Handler{subscription: subscription} = state - :ok = EventStore.unsubscribe(application, subscription) - :ok = EventStore.delete_subscription(application, subscribe_to, handler_name) + subscription = Subscription.reset(subscription) - %Handler{state | last_seen_event: nil, subscription: nil, subscription_ref: nil} + %Handler{state | last_seen_event: nil, subscription: subscription, subscribe_timer: nil} end defp subscribe_to_events(%Handler{} = state) do - %Handler{ - application: application, - handler_name: handler_name, - subscribe_from: subscribe_from, - subscribe_to: subscribe_to - } = state + %Handler{subscription: subscription} = state - {:ok, subscription} = - EventStore.subscribe_to(application, subscribe_to, handler_name, self(), subscribe_from) + case Subscription.subscribe(subscription, self()) do + {:ok, subscription} -> + %Handler{state | subscription: subscription, subscribe_timer: nil} + + {:error, error} -> + {backoff, subscription} = Subscription.backoff(subscription) + + Logger.info(fn -> + describe(state) <> + " failed to subscribe to event store due to: " <> + inspect(error) <> ", retrying in " <> inspect(backoff) <> "ms" + end) - subscription_ref = Process.monitor(subscription) + subscribe_timer = Process.send_after(self(), :subscribe_to_events, backoff) - %Handler{state | subscription: subscription, subscription_ref: subscription_ref} + %Handler{state | subscription: subscription, subscribe_timer: subscribe_timer} + end end defp handle_event(event, handler, context \\ %{}) @@ -642,18 +653,6 @@ defmodule Commanded.Event.Handler do # Confirm receipt of event defp confirm_receipt(%RecordedEvent{} = event, %Handler{} = state) do - %RecordedEvent{event_number: event_number} = event - - Logger.debug(fn -> - describe(state) <> " confirming receipt of event ##{inspect(event_number)}" - end) - - ack_event(event, state) - - %Handler{state | last_seen_event: event_number} - end - - defp ack_event(event, %Handler{} = state) do %Handler{ application: application, consistency: consistency, @@ -661,8 +660,16 @@ defmodule Commanded.Event.Handler do subscription: subscription } = state - :ok = EventStore.ack_event(application, subscription, event) + %RecordedEvent{event_number: event_number} = event + + Logger.debug(fn -> + describe(state) <> " confirming receipt of event ##{inspect(event_number)}" + end) + + :ok = Subscription.ack_event(subscription, event) :ok = Subscriptions.ack_event(application, handler_name, consistency, event) + + %Handler{state | last_seen_event: event_number} end @enrich_metadata_fields [ @@ -691,22 +698,6 @@ defmodule Commanded.Event.Handler do end end - defp start_from(opts) do - case opts[:start_from] || :origin do - start_from when start_from in [:origin, :current] -> start_from - start_from when is_integer(start_from) -> start_from - invalid -> "Invalid `start_from` option: #{inspect(invalid)}" - end - end - - defp subscribe_to(opts) do - case opts[:subscribe_to] || :all do - :all -> :all - stream when is_binary(stream) -> stream - invalid -> "Invalid `subscribe_to` option: #{inspect(invalid)}" - end - end - defp describe(%Handler{handler_module: handler_module}), do: inspect(handler_module) end diff --git a/lib/commanded/event_store/adapters/in_memory.ex b/lib/commanded/event_store/adapters/in_memory.ex index cc1bd101..f0f3c4ea 100644 --- a/lib/commanded/event_store/adapters/in_memory.ex +++ b/lib/commanded/event_store/adapters/in_memory.ex @@ -100,14 +100,14 @@ defmodule Commanded.EventStore.Adapters.InMemory do end @impl Commanded.EventStore.Adapter - def ack_event(adapter_meta, pid, event) do + def ack_event(adapter_meta, subscription, %RecordedEvent{} = event) when is_pid(subscription) do event_store = event_store_name(adapter_meta) - GenServer.cast(event_store, {:ack_event, event, pid}) + GenServer.call(event_store, {:ack_event, event, subscription}) end @impl Commanded.EventStore.Adapter - def unsubscribe(adapter_meta, subscription) do + def unsubscribe(adapter_meta, subscription) when is_pid(subscription) do event_store = event_store_name(adapter_meta) GenServer.call(event_store, {:unsubscribe, subscription}) @@ -371,15 +371,10 @@ defmodule Commanded.EventStore.Adapters.InMemory do end @impl GenServer - def handle_cast({:ack_event, event, subscriber}, %State{} = state) do - %State{persistent_subscriptions: subscriptions} = state + def handle_call({:ack_event, event, subscriber}, _from, %State{} = state) do + state = ack_subscription_by_pid(event, subscriber, state) - state = %State{ - state - | persistent_subscriptions: ack_subscription_by_pid(subscriptions, event, subscriber) - } - - {:noreply, state} + {:reply, :ok, state} end @impl GenServer @@ -483,7 +478,9 @@ defmodule Commanded.EventStore.Adapters.InMemory do Process.monitor(pid) - subscription = %Subscription{subscription | subscriber: pid} + last_seen = start_from(subscription, state) + + subscription = %Subscription{subscription | subscriber: pid, last_seen: last_seen} catch_up(subscription, state) @@ -513,18 +510,26 @@ defmodule Commanded.EventStore.Adapters.InMemory do end) end - defp ack_subscription_by_pid(subscriptions, %RecordedEvent{} = event, pid) do + defp ack_subscription_by_pid(%RecordedEvent{} = event, pid, %State{} = state) do + %State{persistent_subscriptions: subscriptions} = state + %RecordedEvent{event_number: event_number} = event - Enum.reduce(subscriptions, subscriptions, fn - {name, %Subscription{subscriber: subscriber} = subscription}, acc when subscriber == pid -> - subscription = %Subscription{subscription | last_seen: event_number} + persistent_subscriptions = + Enum.reduce(subscriptions, subscriptions, fn + {name, %Subscription{subscriber: subscriber} = subscription}, acc + when subscriber == pid -> + subscription = %Subscription{subscription | last_seen: event_number} - Map.put(acc, name, subscription) + send_unseen_events(subscription, state) - _, acc -> - acc - end) + Map.put(acc, name, subscription) + + _, acc -> + acc + end) + + %State{state | persistent_subscriptions: persistent_subscriptions} end defp remove_transient_subscriber_by_pid(transient_subscriptions, pid) do @@ -534,25 +539,55 @@ defmodule Commanded.EventStore.Adapters.InMemory do end) end + defp start_from(%Subscription{last_seen: last_seen}, _state) when last_seen > 0, do: last_seen + + defp start_from(%Subscription{start_from: :origin}, _state), do: 0 + + defp start_from(%Subscription{start_from: position}, _state) when is_integer(position), + do: position + + defp start_from(%Subscription{start_from: :current, stream_uuid: :all}, %State{} = state) do + %State{persisted_events: persisted_events} = state + + length(persisted_events) + end + + defp start_from(%Subscription{start_from: :current} = subscription, %State{} = state) do + %Subscription{stream_uuid: stream_uuid} = subscription + %State{streams: streams} = state + + Map.get(streams, stream_uuid, []) |> length() + end + defp catch_up(%Subscription{start_from: :current}, _state), do: :ok defp catch_up(%Subscription{stream_uuid: :all} = subscription, %State{} = state) do + send_unseen_events(subscription, state) + end + + defp catch_up(%Subscription{} = subscription, %State{} = state) do + send_unseen_events(subscription, state) + end + + defp send_unseen_events(%Subscription{stream_uuid: :all} = subscription, %State{} = state) do %Subscription{subscriber: subscriber, last_seen: last_seen} = subscription %State{persisted_events: persisted_events} = state - unseen_events = - persisted_events - |> Enum.reverse() - |> Enum.drop(last_seen) - |> Enum.map(&deserialize(&1, state)) - |> Enum.chunk_by(fn %RecordedEvent{stream_id: stream_id} -> stream_id end) + persisted_events + |> Enum.reverse() + |> Enum.drop(last_seen) + |> Enum.map(&deserialize(&1, state)) + |> Enum.take(1) + |> case do + [] -> + :ok - for events <- unseen_events do - send(subscriber, {:events, events}) + unseen_events -> + send(subscriber, {:events, unseen_events}) end end - defp catch_up(%Subscription{} = subscription, %State{} = state) do + defp send_unseen_events(%Subscription{} = subscription, %State{} = state) do %Subscription{subscriber: subscriber, stream_uuid: stream_uuid, last_seen: last_seen} = subscription @@ -564,6 +599,7 @@ defmodule Commanded.EventStore.Adapters.InMemory do |> Enum.drop(last_seen) |> Enum.map(&deserialize(&1, state)) |> set_event_number_from_version() + |> Enum.take(1) |> case do [] -> :ok @@ -583,13 +619,29 @@ defmodule Commanded.EventStore.Adapters.InMemory do end end + defp publish_to_persistent_subscriptions(:all, events, %State{} = state) do + %State{persistent_subscriptions: subscriptions} = state + [%RecordedEvent{event_number: event_number} = event | _events] = events + + for {_name, %Subscription{stream_uuid: :all} = subscription} <- subscriptions do + %Subscription{subscriber: subscriber, last_seen: last_seen} = subscription + + if is_pid(subscriber) && (is_nil(last_seen) || event_number == last_seen + 1) do + send(subscriber, {:events, [event]}) + end + end + end + defp publish_to_persistent_subscriptions(stream_uuid, events, %State{} = state) do %State{persistent_subscriptions: subscriptions} = state + [%RecordedEvent{stream_version: stream_version} = event | _events] = events - for {_name, %Subscription{subscriber: subscriber, stream_uuid: ^stream_uuid}} <- - subscriptions, - is_pid(subscriber) do - send(subscriber, {:events, events}) + for {_name, %Subscription{stream_uuid: ^stream_uuid} = subscription} <- subscriptions do + %Subscription{subscriber: subscriber, last_seen: last_seen} = subscription + + if is_pid(subscriber) && (is_nil(last_seen) || stream_version == last_seen + 1) do + send(subscriber, {:events, [event]}) + end end end diff --git a/lib/commanded/event_store/recorded_event.ex b/lib/commanded/event_store/recorded_event.ex index b414bca8..c4af05cc 100644 --- a/lib/commanded/event_store/recorded_event.ex +++ b/lib/commanded/event_store/recorded_event.ex @@ -21,9 +21,9 @@ defmodule Commanded.EventStore.RecordedEvent do - `correlation_id` - an optional UUID identifier used to correlate related messages. - - `data` - the serialized event as binary data. + - `data` - the event data deserialized into a struct. - - `metadata` - the serialized event metadata as binary data. + - `metadata` - a string keyed map of metadata associated with the event. - `created_at` - the datetime, in UTC, indicating when the event was created. @@ -40,8 +40,8 @@ defmodule Commanded.EventStore.RecordedEvent do causation_id: uuid() | nil, correlation_id: uuid() | nil, event_type: String.t(), - data: binary(), - metadata: binary(), + data: struct(), + metadata: map(), created_at: DateTime.t() } diff --git a/lib/commanded/event_store/subscription.ex b/lib/commanded/event_store/subscription.ex new file mode 100644 index 00000000..5bbc0e2a --- /dev/null +++ b/lib/commanded/event_store/subscription.ex @@ -0,0 +1,131 @@ +defmodule Commanded.EventStore.Subscription do + @moduledoc false + + alias Commanded.EventStore + alias Commanded.EventStore + alias Commanded.EventStore.RecordedEvent + alias Commanded.EventStore.Subscription + + @enforce_keys [:application, :subscribe_to, :subscribe_from, :subscription_name] + + @type t :: %Subscription{ + application: Commanded.Application.t(), + backoff: any(), + subscribe_to: EventStore.Adapter.stream_uuid() | :all, + subscribe_from: EventStore.Adapter.start_from(), + subscription_name: EventStore.Adapter.subscription_name(), + subscription_pid: nil | pid(), + subscription_ref: nil | reference() + } + + defstruct [ + :application, + :backoff, + :subscribe_to, + :subscribe_from, + :subscription_name, + :subscription_pid, + :subscription_ref + ] + + def new(opts) do + %Subscription{ + application: Keyword.fetch!(opts, :application), + backoff: init_backoff(), + subscription_name: Keyword.fetch!(opts, :subscription_name), + subscribe_to: parse_subscribe_to(opts), + subscribe_from: parse_subscribe_from(opts) + } + end + + @spec subscribe(Subscription.t(), pid()) :: {:ok, Subscription.t()} | {:error, any()} + def subscribe(%Subscription{} = subscription, pid) do + with {:ok, pid} <- subscribe_to(subscription, pid) do + subscription_ref = Process.monitor(pid) + + subscription = %Subscription{ + subscription + | subscription_pid: pid, + subscription_ref: subscription_ref + } + + {:ok, subscription} + end + end + + @spec backoff(Subscription.t()) :: {non_neg_integer(), Subscription.t()} + def backoff(%Subscription{} = subscription) do + %Subscription{backoff: backoff} = subscription + + {next, backoff} = :backoff.fail(backoff) + + subscription = %Subscription{subscription | backoff: backoff} + + {next, subscription} + end + + @spec ack_event(Subscription.t(), RecordedEvent.t()) :: :ok + def ack_event(%Subscription{} = subscription, %RecordedEvent{} = event) do + %Subscription{application: application, subscription_pid: subscription_pid} = subscription + + EventStore.ack_event(application, subscription_pid, event) + end + + @spec reset(Subscription.t()) :: Subscription.t() + def reset(%Subscription{} = subscription) do + %Subscription{ + application: application, + subscribe_to: subscribe_to, + subscription_name: subscription_name, + subscription_pid: subscription_pid, + subscription_ref: subscription_ref + } = subscription + + Process.demonitor(subscription_ref) + + :ok = EventStore.unsubscribe(application, subscription_pid) + :ok = EventStore.delete_subscription(application, subscribe_to, subscription_name) + + %Subscription{ + subscription + | backoff: init_backoff(), + subscription_pid: nil, + subscription_ref: nil + } + end + + defp subscribe_to(%Subscription{} = subscription, pid) do + %Subscription{ + application: application, + subscribe_to: subscribe_to, + subscription_name: subscription_name, + subscribe_from: subscribe_from + } = subscription + + EventStore.subscribe_to(application, subscribe_to, subscription_name, pid, subscribe_from) + end + + defp parse_subscribe_to(opts) do + case opts[:subscribe_to] || :all do + :all -> :all + stream when is_binary(stream) -> stream + invalid -> "Invalid `subscribe_to` option: #{inspect(invalid)}" + end + end + + defp parse_subscribe_from(opts) do + case opts[:subscribe_from] || :origin do + start_from when start_from in [:origin, :current] -> start_from + start_from when is_integer(start_from) -> start_from + invalid -> "Invalid `start_from` option: #{inspect(invalid)}" + end + end + + @backoff_min :timer.seconds(1) + @backoff_max :timer.minutes(1) + + # Exponential backoff with jitter + defp init_backoff do + :backoff.init(@backoff_min, @backoff_max) |> :backoff.type(:jitter) + end +end diff --git a/lib/commanded/process_managers/process_router.ex b/lib/commanded/process_managers/process_router.ex index 8a0ab113..f7dd043f 100644 --- a/lib/commanded/process_managers/process_router.ex +++ b/lib/commanded/process_managers/process_router.ex @@ -7,8 +7,8 @@ defmodule Commanded.ProcessManagers.ProcessRouter do require Logger alias Commanded.Event.Upcast - alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent + alias Commanded.EventStore.Subscription alias Commanded.ProcessManagers.FailureContext alias Commanded.ProcessManagers.ProcessManagerInstance alias Commanded.ProcessManagers.ProcessRouter @@ -25,10 +25,9 @@ defmodule Commanded.ProcessManagers.ProcessRouter do :idle_timeout, :process_manager_name, :process_manager_module, - :subscribe_from, :supervisor, :subscription, - :subscription_ref, + :subscribe_timer, :last_seen_event, :process_event_timer, process_managers: %{}, @@ -41,12 +40,20 @@ defmodule Commanded.ProcessManagers.ProcessRouter do name = {application, ProcessRouter, process_name} consistency = Keyword.get(opts, :consistency, :eventual) + subscription = + Subscription.new( + application: application, + subscription_name: process_name, + subscribe_from: Keyword.get(opts, :start_from, :origin), + subscribe_to: Keyword.get(opts, :subscribe_to, :all) + ) + state = %State{ application: application, process_manager_name: process_name, process_manager_module: process_module, consistency: consistency, - subscribe_from: Keyword.get(opts, :start_from, :origin), + subscription: subscription, event_timeout: Keyword.get(opts, :event_timeout), idle_timeout: Keyword.get(opts, :idle_timeout, :infinity) } @@ -61,7 +68,9 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @impl GenServer def init(%State{} = state) do - {:ok, state, {:continue, :subscribe_to_events}} + :ok = Process.send(self(), :subscribe_to_events, []) + + {:ok, state} end @doc """ @@ -81,12 +90,6 @@ defmodule Commanded.ProcessManagers.ProcessRouter do GenServer.call(process_router, :process_instances) end - @doc false - @impl GenServer - def handle_continue(:subscribe_to_events, %State{} = state) do - {:noreply, subscribe_to_events(state)} - end - @doc false @impl GenServer def handle_call(:process_instances, _from, %State{} = state) do @@ -163,10 +166,19 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end end + @doc false + @impl GenServer + def handle_info(:subscribe_to_events, %State{} = state) do + {:noreply, subscribe_to_events(state)} + end + @doc false # Subscription to event store has successfully subscribed, init process router @impl GenServer - def handle_info({:subscribed, subscription}, %State{subscription: subscription} = state) do + def handle_info( + {:subscribed, subscription}, + %State{subscription: %Subscription{subscription_pid: subscription}} = state + ) do Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end) {:ok, supervisor} = Supervisor.start_link() @@ -232,8 +244,8 @@ defmodule Commanded.ProcessManagers.ProcessRouter do # Stop process manager when event store subscription process terminates. @impl GenServer def handle_info( - {:DOWN, ref, :process, pid, reason}, - %State{subscription_ref: ref, subscription: pid} = state + {:DOWN, ref, :process, _pid, reason}, + %State{subscription: %Subscription{subscription_ref: ref}} = state ) do Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end) @@ -261,18 +273,25 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end defp subscribe_to_events(%State{} = state) do - %State{ - application: application, - process_manager_name: process_manager_name, - subscribe_from: subscribe_from - } = state + %State{subscription: subscription} = state + + case Subscription.subscribe(subscription, self()) do + {:ok, subscription} -> + %State{state | subscription: subscription, subscribe_timer: nil} - {:ok, subscription} = - EventStore.subscribe_to(application, :all, process_manager_name, self(), subscribe_from) + {:error, error} -> + {backoff, subscription} = Subscription.backoff(subscription) - subscription_ref = Process.monitor(subscription) + Logger.info(fn -> + describe(state) <> + " failed to subscribe to event store due to: " <> + inspect(error) <> ", retrying in " <> inspect(backoff) <> "ms" + end) - %State{state | subscription: subscription, subscription_ref: subscription_ref} + subscribe_timer = Process.send_after(self(), :subscribe_to_events, backoff) + + %State{state | subscription: subscription, subscribe_timer: subscribe_timer} + end end # Ignore already seen event @@ -425,11 +444,19 @@ defmodule Commanded.ProcessManagers.ProcessRouter do # Confirm receipt of given event defp confirm_receipt(%RecordedEvent{event_number: event_number} = event, %State{} = state) do + %State{ + application: application, + consistency: consistency, + process_manager_name: name, + subscription: subscription + } = state + Logger.debug(fn -> describe(state) <> " confirming receipt of event: #{inspect(event_number)}" end) - do_ack_event(event, state) + :ok = Subscription.ack_event(subscription, event) + :ok = Subscriptions.ack_event(application, name, consistency, event) %State{state | last_seen_event: event_number} end @@ -501,18 +528,6 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end) end - defp do_ack_event(event, %State{} = state) do - %State{ - application: application, - consistency: consistency, - process_manager_name: name, - subscription: subscription - } = state - - :ok = EventStore.ack_event(application, subscription, event) - :ok = Subscriptions.ack_event(application, name, consistency, event) - end - # Delegate event to process instance who will ack event processing on success defp delegate_event(process_instance, %RecordedEvent{} = event, %State{} = state) do %State{pending_acks: pending_acks} = state diff --git a/mix.exs b/mix.exs index 8ecf445a..72a3385e 100644 --- a/mix.exs +++ b/mix.exs @@ -52,6 +52,7 @@ defmodule Commanded.Mixfile do defp deps do [ + {:backoff, "~> 1.1"}, {:elixir_uuid, "~> 1.2"}, # Optional dependencies diff --git a/mix.lock b/mix.lock index 6caa0f00..27201778 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ %{ + "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "506294d6c543e4e5282d4852aead19ace8a35bedeb043f9256a06a6336827122"}, "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"}, diff --git a/test/event/event_handler_subscription_test.exs b/test/event/event_handler_subscription_test.exs index 9b3cd842..4629d504 100644 --- a/test/event/event_handler_subscription_test.exs +++ b/test/event/event_handler_subscription_test.exs @@ -1,27 +1,24 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do use Commanded.MockEventStoreCase + alias Commanded.Event.Handler + defmodule ExampleHandler do use Commanded.Event.Handler, application: Commanded.MockedApp, name: "ExampleHandler" end - setup do - {:ok, subscription} = start_subscription() - {:ok, handler} = start_handler(subscription) + describe "event handler subscription" do + setup [:verify_on_exit!] - [ - handler: handler, - subscription: subscription - ] - end + test "should monitor subscription and terminate handler on shutdown" do + {:ok, subscription} = start_subscription() + + expect_subscribe_to(subscription) + + {:ok, handler} = start_handler(subscription) - describe "event handler subscription" do - test "should monitor subscription and terminate handler on shutdown", %{ - handler: handler, - subscription: subscription - } do Process.unlink(handler) ref = Process.monitor(handler) @@ -29,11 +26,58 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do assert_receive {:DOWN, ^ref, :process, ^handler, _reason} end + + test "should retry subscription on error" do + {:ok, subscription} = start_subscription() + + reply_to = self() + + # First and second subscription attempts fail + expect(MockEventStore, :subscribe_to, 2, fn _event_store_meta, + :all, + "ExampleHandler", + handler, + :origin -> + send(reply_to, {:subscribe_to, handler}) + + {:error, :subscription_already_exists} + end) + + # Third subscription attempt succeeds + expect_subscribe_to(subscription) + + {:ok, handler} = ExampleHandler.start_link() + + assert_receive {:subscribe_to, ^handler} + assert_handler_subscription_timer(handler, 1..3_000) + refute_receive {:subscribed, ^subscription} + + send(handler, :subscribe_to_events) + + assert_receive {:subscribe_to, ^handler} + assert_handler_subscription_timer(handler, 1_000..9_000) + refute_receive {:subscribed, ^subscription} + + send(handler, :subscribe_to_events) + + assert_receive {:subscribed, ^subscription} + end + end + + defp assert_handler_subscription_timer(handler, expected_timer_range) do + %Handler{subscribe_timer: subscribe_timer} = :sys.get_state(handler) + + assert is_reference(subscribe_timer) + + timer = Process.read_timer(subscribe_timer) + + assert is_integer(timer) + assert timer in expected_timer_range end defp start_subscription do pid = - spawn(fn -> + spawn_link(fn -> receive do :shutdown -> :ok end @@ -42,7 +86,7 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do {:ok, pid} end - defp start_handler(subscription) do + defp expect_subscribe_to(subscription) do reply_to = self() expect(MockEventStore, :subscribe_to, fn _event_store_meta, @@ -55,7 +99,9 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do {:ok, subscription} end) + end + defp start_handler(subscription) do {:ok, pid} = ExampleHandler.start_link() assert_receive {:subscribed, ^subscription} @@ -68,6 +114,6 @@ defmodule Commanded.Event.EventHandlerSubscriptionTest do send(subscription, :shutdown) - assert_receive {:DOWN, ^ref, :process, _, _} + assert_receive {:DOWN, ^ref, :process, ^subscription, _reason} end end diff --git a/test/process_managers/process_manager_subscription_test.exs b/test/process_managers/process_manager_subscription_test.exs index c69ff119..70e18836 100644 --- a/test/process_managers/process_manager_subscription_test.exs +++ b/test/process_managers/process_manager_subscription_test.exs @@ -2,6 +2,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do use Commanded.MockEventStoreCase alias Commanded.MockedApp + alias Commanded.ProcessManagers.ProcessRouter defmodule ExampleProcessManager do use Commanded.ProcessManagers.ProcessManager, @@ -12,21 +13,14 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do defstruct [:data] end - setup do - {:ok, subscription} = start_subscription() - {:ok, pm} = start_process_manager(subscription) + describe "process manager subscription" do + test "should monitor subscription and terminate process manager on shutdown" do + {:ok, subscription} = start_subscription() - [ - pm: pm, - subscription: subscription - ] - end + expect_subscribe_to(subscription) + + {:ok, pm} = start_process_manager(subscription) - describe "process manager subscription" do - test "should monitor subscription and terminate process manager on shutdown", %{ - pm: pm, - subscription: subscription - } do Process.unlink(pm) ref = Process.monitor(pm) @@ -36,9 +30,56 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do end end + test "should retry subscription on error" do + {:ok, subscription} = start_subscription() + + reply_to = self() + + # First and second subscription attempts fail + expect(MockEventStore, :subscribe_to, 2, fn _event_store_meta, + :all, + "ExampleProcessManager", + pm, + :origin -> + send(reply_to, {:subscribe_to, pm}) + + {:error, :subscription_already_exists} + end) + + # Third subscription attempt succeeds + expect_subscribe_to(subscription) + + {:ok, pm} = ExampleProcessManager.start_link() + + assert_receive {:subscribe_to, ^pm} + assert_subscription_timer(pm, 1..3_000) + refute_receive {:subscribed, ^subscription} + + send(pm, :subscribe_to_events) + + assert_receive {:subscribe_to, ^pm} + assert_subscription_timer(pm, 1_000..9_000) + refute_receive {:subscribed, ^subscription} + + send(pm, :subscribe_to_events) + + assert_receive {:subscribed, ^subscription} + end + + defp assert_subscription_timer(pm, expected_timer_range) do + %ProcessRouter.State{subscribe_timer: subscribe_timer} = :sys.get_state(pm) + + assert is_reference(subscribe_timer) + + timer = Process.read_timer(subscribe_timer) + + assert is_integer(timer) + assert timer in expected_timer_range + end + defp start_subscription do pid = - spawn(fn -> + spawn_link(fn -> receive do :shutdown -> :ok end @@ -47,7 +88,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do {:ok, pid} end - defp start_process_manager(subscription) do + defp expect_subscribe_to(subscription) do reply_to = self() expect(MockEventStore, :subscribe_to, fn _event_store_meta, @@ -60,7 +101,9 @@ defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do {:ok, subscription} end) + end + defp start_process_manager(subscription) do {:ok, pid} = ExampleProcessManager.start_link() assert_receive {:subscribed, ^subscription} From f0c667bcc0b5fa794d73cc8d03d2e6675e0be1f7 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Sat, 22 Feb 2020 23:20:36 +0000 Subject: [PATCH 09/11] Include #348 in CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b31beca9..89f0f2dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Bug fixes - Fix distributed subscription registration bug ([#345](https://github.com/commanded/commanded/pull/345)). +- Retry event handler and process manager suscriptions on error ([#348](https://github.com/commanded/commanded/pull/348)). ## v1.0.0 From e793de9185fc84867adac78e2bca99890cc1a482 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Thu, 27 Feb 2020 20:19:11 +0000 Subject: [PATCH 10/11] Command dispatch `returning` option Allow aggregate state to be returned from a command dispatch. --- CHANGELOG.md | 1 + lib/application.ex | 56 +++++- lib/commanded/aggregates/aggregate.ex | 94 +++++---- lib/commanded/aggregates/execution_context.ex | 37 ++++ lib/commanded/commands/composite_router.ex | 53 +++-- lib/commanded/commands/dispatcher.ex | 56 +++--- lib/commanded/commands/execution_result.ex | 44 +++-- lib/commanded/commands/router.ex | 179 ++++++++++++----- .../process_manager_instance.ex | 8 +- .../aggregates/aggregate_concurrency_test.exs | 6 +- test/aggregates/execute_command_test.exs | 6 +- test/commands/dispatch_consistency_test.exs | 1 - test/commands/dispatch_return_test.exs | 187 ++++++++++++++++++ test/commands/routing_commands_test.exs | 59 +----- .../returning/default_dispatch_return_app.ex | 18 ++ 15 files changed, 584 insertions(+), 221 deletions(-) create mode 100644 test/commands/dispatch_return_test.exs create mode 100644 test/commands/support/returning/default_dispatch_return_app.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 89f0f2dd..6ea4d611 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Enhancements - Global registry using Erlang's `:global` module ([#344](https://github.com/commanded/commanded/pull/344)). +- Command dispatch return ([#331](https://github.com/commanded/commanded/pull/331)). ### Bug fixes diff --git a/lib/application.ex b/lib/application.ex index f81b17d4..ae92e29c 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -40,6 +40,55 @@ defmodule Commanded.Application do router(MyApp.Router) end + A Commanded application must be started before it can be used: + + {:ok, _pid} = MyApp.Application.start_link() + + Instead of starting the application manually, you should use a + [Supervisor](supervision.html). + + ## Supervision + + Use a supervisor to start your Commanded application: + + Supervisor.start_link([ + MyApp.Application + ], strategy: :one_for_one) + + ## Command routing + + Commanded applications are also composite routers allowing you to include + one or more routers within an application. + + ### Example + + defmodule MyApp.Application do + use Commanded.Application, otp_app: :my_app + + router(MyApp.Accounts.Router) + router(MyApp.Billing.Router) + router(MyApp.Notifications.Router) + end + + See `Commanded.Commands.CompositeRouter` for details. + + ## Default dispatch options + + An application can be configured with default command dispatch options such as + `:consistency`, `:timeout`, and `:returning`. Any defaults will be used + unless overridden by options provided to the dispatch function. + + defmodule MyApp.Application do + use Commanded.Application, + otp_app: :my_app, + default_dispatch_opts: [ + consistency: :eventual, + returning: :aggregate_version + ] + end + + See the `Commanded.Commands.Router` module for more details about the + supported options. """ @type t :: module @@ -55,7 +104,9 @@ defmodule Commanded.Application do @config config @name Keyword.get(opts, :name, __MODULE__) - use Commanded.Commands.CompositeRouter, application: __MODULE__ + use Commanded.Commands.CompositeRouter, + application: __MODULE__, + default_dispatch_opts: Keyword.get(opts, :default_dispatch_opts, []) def config do {:ok, config} = @@ -120,8 +171,9 @@ defmodule Commanded.Application do """ @callback dispatch(command :: struct, timeout_or_opts :: integer | :infinity | Keyword.t()) :: :ok - | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()} + | {:ok, aggregate_state :: struct} | {:ok, aggregate_version :: non_neg_integer()} + | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term} diff --git a/lib/commanded/aggregates/aggregate.ex b/lib/commanded/aggregates/aggregate.ex index 9b86e846..3726ad03 100644 --- a/lib/commanded/aggregates/aggregate.ex +++ b/lib/commanded/aggregates/aggregate.ex @@ -47,11 +47,13 @@ defmodule Commanded.Aggregates.Aggregate do require Logger - alias Commanded.Aggregates.{Aggregate, ExecutionContext} + alias Commanded.Aggregates.Aggregate + alias Commanded.Aggregates.ExecutionContext alias Commanded.Event.Mapper alias Commanded.Event.Upcast alias Commanded.EventStore - alias Commanded.EventStore.{RecordedEvent, SnapshotData} + alias Commanded.EventStore.RecordedEvent + alias Commanded.EventStore.SnapshotData alias Commanded.Registration alias Commanded.Snapshotting @@ -227,16 +229,18 @@ defmodule Commanded.Aggregates.Aggregate do lifespan_timeout = case reply do - {:ok, _stream_version, []} -> + {:ok, []} -> aggregate_lifespan_timeout(lifespan, :after_command, command) - {:ok, _stream_version, events} -> + {:ok, events} -> aggregate_lifespan_timeout(lifespan, :after_event, events) {:error, error} -> aggregate_lifespan_timeout(lifespan, :after_error, error) end + reply = ExecutionContext.format_reply(reply, context, state) + state = %Aggregate{state | lifespan_timeout: lifespan_timeout} %Aggregate{aggregate_version: aggregate_version, snapshotting: snapshotting} = state @@ -443,60 +447,35 @@ defmodule Commanded.Aggregates.Aggregate do defp execute_command(%ExecutionContext{} = context, %Aggregate{} = state) do %ExecutionContext{command: command, handler: handler, function: function} = context - %Aggregate{aggregate_version: expected_version, aggregate_state: aggregate_state} = state + %Aggregate{aggregate_state: aggregate_state} = state Logger.debug(fn -> describe(state) <> " executing command: #{inspect(command)}" end) - {reply, state} = - case Kernel.apply(handler, function, [aggregate_state, command]) do - {:error, _error} = reply -> - {reply, state} - - none when none in [:ok, nil, []] -> - {{:ok, expected_version, []}, state} - - %Commanded.Aggregate.Multi{} = multi -> - case Commanded.Aggregate.Multi.run(multi) do - {:error, _error} = reply -> - {reply, state} - - {aggregate_state, pending_events} -> - persist_events(pending_events, aggregate_state, context, state) - end - - {:ok, pending_events} -> - record_events(pending_events, context, state) - - pending_events -> - record_events(pending_events, context, state) - end - - case reply do - {:error, :wrong_expected_version} -> - # Fetch missing events from event store - state = rebuild_from_events(state) - - # Retry command if there are any attempts left - case ExecutionContext.retry(context) do - {:ok, context} -> - Logger.debug(fn -> describe(state) <> " wrong expected version, retrying command" end) + case Kernel.apply(handler, function, [aggregate_state, command]) do + {:error, _error} = reply -> + {reply, state} - execute_command(context, state) - - reply -> - Logger.debug(fn -> - describe(state) <> " wrong expected version, but not retrying command" - end) + none when none in [:ok, nil, []] -> + {{:ok, []}, state} + %Commanded.Aggregate.Multi{} = multi -> + case Commanded.Aggregate.Multi.run(multi) do + {:error, _error} = reply -> {reply, state} + + {aggregate_state, pending_events} -> + persist_events(pending_events, aggregate_state, context, state) end - reply -> - {reply, state} + {:ok, pending_events} -> + apply_and_persist_events(pending_events, context, state) + + pending_events -> + apply_and_persist_events(pending_events, context, state) end end - defp record_events(pending_events, context, %Aggregate{} = state) do + defp apply_and_persist_events(pending_events, context, %Aggregate{} = state) do %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state pending_events = List.wrap(pending_events) @@ -521,8 +500,27 @@ defmodule Commanded.Aggregates.Aggregate do aggregate_version: aggregate_version } - {{:ok, aggregate_version, pending_events}, state} + {{:ok, pending_events}, state} else + {:error, :wrong_expected_version} -> + # Fetch missing events from event store + state = rebuild_from_events(state) + + # Retry command if there are any attempts left + case ExecutionContext.retry(context) do + {:ok, context} -> + Logger.debug(fn -> describe(state) <> " wrong expected version, retrying command" end) + + execute_command(context, state) + + reply -> + Logger.debug(fn -> + describe(state) <> " wrong expected version, but not retrying command" + end) + + {reply, state} + end + {:error, _error} = reply -> {reply, state} end diff --git a/lib/commanded/aggregates/execution_context.ex b/lib/commanded/aggregates/execution_context.ex index dea0ce86..67797f11 100644 --- a/lib/commanded/aggregates/execution_context.ex +++ b/lib/commanded/aggregates/execution_context.ex @@ -32,8 +32,10 @@ defmodule Commanded.Aggregates.ExecutionContext do """ + alias Commanded.Aggregates.Aggregate alias Commanded.Aggregates.DefaultLifespan alias Commanded.Aggregates.ExecutionContext + alias Commanded.Commands.ExecutionResult defstruct [ :command, @@ -42,6 +44,7 @@ defmodule Commanded.Aggregates.ExecutionContext do :function, :handler, retry_attempts: 0, + returning: false, lifespan: DefaultLifespan, metadata: %{} ] @@ -59,4 +62,38 @@ defmodule Commanded.Aggregates.ExecutionContext do {:ok, context} end + + def format_reply(reply, %ExecutionContext{} = context, %Aggregate{} = aggregate) do + %Aggregate{ + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version + } = aggregate + + %ExecutionContext{metadata: metadata, returning: returning} = context + + with {:ok, events} <- reply do + case returning do + :aggregate_state -> + {:ok, aggregate_version, events, aggregate_state} + + :aggregate_version -> + {:ok, aggregate_version, events, aggregate_version} + + :execution_result -> + result = %ExecutionResult{ + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + events: events, + metadata: metadata + } + + {:ok, aggregate_version, events, result} + + false -> + {:ok, aggregate_version, events} + end + end + end end diff --git a/lib/commanded/commands/composite_router.ex b/lib/commanded/commands/composite_router.ex index 6896fc06..5dd83dfc 100644 --- a/lib/commanded/commands/composite_router.ex +++ b/lib/commanded/commands/composite_router.ex @@ -1,20 +1,34 @@ defmodule Commanded.Commands.CompositeRouter do @moduledoc """ Composite router allows you to combine multiple router modules into a single - router able to dispatch any registered command using its corresponding router. + router able to dispatch any registered command from an included child router. - ## Example + One example usage is to define a router per context and then combine each + context's router into a single top-level composite app router used for all + command dispatching. - defmodule ExampleCompositeRouter do - use Commanded.Commands.CompositeRouter + ### Example - router BankAccountRouter - router MoneyTransferRouter + Define a composite router module which imports the commands from each included + router: + + defmodule Bank.AppRouter do + use Commanded.Commands.CompositeRouter, application: Bank.App + + router(Bank.Accounts.Router) + router(Bank.MoneyTransfer.Router) end + You can dispatch a command via the composite router which will be routed to + the appropriate child router: + + alias Bank.AppRouter + command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000} - :ok = ExampleCompositeRouter.dispatch(command) + :ok = AppRouter.dispatch(command) + + A composite router can include composite routers. """ defmacro __using__(opts) do @@ -23,8 +37,15 @@ defmodule Commanded.Commands.CompositeRouter do import unquote(__MODULE__) + application = Keyword.get(unquote(opts), :application) + + default_dispatch_opts = + unquote(opts) + |> Keyword.get(:default_dispatch_opts, []) + |> Keyword.put(:application, application) + + @default_dispatch_opts default_dispatch_opts @registered_commands %{} - @application Keyword.get(unquote(opts), :application) @before_compile unquote(__MODULE__) end @@ -53,6 +74,10 @@ defmodule Commanded.Commands.CompositeRouter do Enum.map(@registered_commands, fn {command, _router} -> command end) end + def __dispatch_opts__(opts) do + Keyword.merge(@default_dispatch_opts, opts) + end + @doc false def dispatch(command, opts \\ []) @@ -62,18 +87,24 @@ defmodule Commanded.Commands.CompositeRouter do quote do @doc false def dispatch(%unquote(command_module){} = command, :infinity) do - unquote(router).dispatch(command, application: @application, timeout: :infinity) + opts = __dispatch_opts__(timeout: :infinity) + + unquote(router).dispatch(command, opts) end @doc false def dispatch(%unquote(command_module){} = command, timeout) when is_integer(timeout) do - unquote(router).dispatch(command, application: @application, timeout: timeout) + opts = __dispatch_opts__(timeout: timeout) + + unquote(router).dispatch(command, opts) end @doc false def dispatch(%unquote(command_module){} = command, opts) do - unquote(router).dispatch(command, Keyword.put_new(opts, :application, @application)) + opts = __dispatch_opts__(opts) + + unquote(router).dispatch(command, opts) end end ) diff --git a/lib/commanded/commands/dispatcher.ex b/lib/commanded/commands/dispatcher.ex index da10b228..eecb7257 100644 --- a/lib/commanded/commands/dispatcher.ex +++ b/lib/commanded/commands/dispatcher.ex @@ -5,7 +5,6 @@ defmodule Commanded.Commands.Dispatcher do alias Commanded.Aggregates.Aggregate alias Commanded.Aggregates.ExecutionContext - alias Commanded.Commands.ExecutionResult alias Commanded.Middleware.Pipeline defmodule Payload do @@ -21,24 +20,28 @@ defmodule Commanded.Commands.Dispatcher do :handler_module, :handler_function, :aggregate_module, - :include_aggregate_version, - :include_execution_result, :identity, :identity_prefix, :timeout, :lifespan, :metadata, :retry_attempts, + :returning, middleware: [] ] end @doc """ - Dispatch the given command to the handler module for the aggregate as identified - - Returns `:ok` on success, or `{:error, error}` on failure. + Dispatch the given command to the handler module for the aggregate as + identified. """ - @spec dispatch(payload :: struct) :: :ok | {:error, error :: term} + @spec dispatch(payload :: struct) :: + :ok + | {:ok, aggregate_state :: struct} + | {:ok, aggregate_version :: non_neg_integer()} + | {:ok, events :: list(struct)} + | {:ok, Commanded.Commands.ExecutionResult.t()} + | {:error, error :: term} def dispatch(%Payload{} = payload) do pipeline = payload @@ -100,7 +103,14 @@ defmodule Commanded.Commands.Dispatcher do |> Pipeline.assign(:aggregate_version, aggregate_version) |> Pipeline.assign(:events, events) |> after_dispatch(payload) - |> respond_with_success(payload, events) + |> Pipeline.respond(:ok) + + {:ok, aggregate_version, events, reply} -> + pipeline + |> Pipeline.assign(:aggregate_version, aggregate_version) + |> Pipeline.assign(:events, events) + |> after_dispatch(payload) + |> Pipeline.respond({:ok, reply}) {:exit, {:normal, :aggregate_stopped}} -> # Maybe retry command when aggregate process stopped by lifespan timeout @@ -133,7 +143,8 @@ defmodule Commanded.Commands.Dispatcher do handler_module: handler_module, handler_function: handler_function, lifespan: lifespan, - retry_attempts: retry_attempts + retry_attempts: retry_attempts, + returning: returning } = payload %ExecutionContext{ @@ -144,34 +155,11 @@ defmodule Commanded.Commands.Dispatcher do handler: handler_module, function: handler_function, lifespan: lifespan, - retry_attempts: retry_attempts + retry_attempts: retry_attempts, + returning: returning } end - defp respond_with_success(%Pipeline{} = pipeline, payload, events) do - response = - case payload do - %{include_execution_result: true} -> - { - :ok, - %ExecutionResult{ - aggregate_uuid: pipeline.assigns.aggregate_uuid, - aggregate_version: pipeline.assigns.aggregate_version, - events: events, - metadata: pipeline.metadata - } - } - - %{include_aggregate_version: true} -> - {:ok, pipeline.assigns.aggregate_version} - - _ -> - :ok - end - - Pipeline.respond(pipeline, response) - end - defp before_dispatch(%Pipeline{} = pipeline, %Payload{middleware: middleware}) do Pipeline.chain(pipeline, :before_dispatch, middleware) end diff --git a/lib/commanded/commands/execution_result.ex b/lib/commanded/commands/execution_result.ex index acc7a8e6..81a07e26 100644 --- a/lib/commanded/commands/execution_result.ex +++ b/lib/commanded/commands/execution_result.ex @@ -1,30 +1,40 @@ defmodule Commanded.Commands.ExecutionResult do @moduledoc """ - Contains the events and metadata created by a command succesfully executed - against an aggregate. + Contains the aggregate, events, and metadata created by a successfully + executed command. The available fields are: - - `aggregate_uuid` - identity of the aggregate instance. - - `aggregate_version` - resultant version of the aggregate after executing - the command. - - `events` - a list of the created events, may be an empty list. - - `metadata` - an optional map containing the metadata associated with the - command dispatch. + - `aggregate_uuid` - identity of the aggregate instance. + + - `aggregate_state` - resultant state of the aggregate after executing + the command. + + - `aggregate_version` - resultant version of the aggregate after executing + the command. + + - `events` - a list of the created events, it may be an empty list. + + - `metadata` - an map containing the metadata associated with the command + dispatch. """ @type t :: %__MODULE__{ - aggregate_uuid: String.t, - aggregate_version: non_neg_integer(), - events: list(struct()), - metadata: struct(), - } + aggregate_uuid: String.t(), + aggregate_state: struct, + aggregate_version: non_neg_integer(), + events: list(struct()), + metadata: struct() + } + + @enforce_keys [:aggregate_uuid, :aggregate_state, :aggregate_version, :events, :metadata] defstruct [ - aggregate_uuid: nil, - aggregate_version: nil, - events: nil, - metadata: nil, + :aggregate_uuid, + :aggregate_state, + :aggregate_version, + :events, + :metadata ] end diff --git a/lib/commanded/commands/router.ex b/lib/commanded/commands/router.ex index 7a76f42c..acde4443 100644 --- a/lib/commanded/commands/router.ex +++ b/lib/commanded/commands/router.ex @@ -119,30 +119,64 @@ defmodule Commanded.Commands.Router do Note you cannot opt-in to strong consistency for a handler that has been configured as eventually consistent. - ## Aggregate version + ## Dispatch return - You can optionally choose to include the aggregate's version as part of the - dispatch result by setting `include_aggregate_version` true. + By default a successful command dispatch will return `:ok`. You can change + this behaviour by specifying a `returning` option. - {:ok, aggregate_version} = BankApp.dispatch(command, include_aggregate_version: true) + The supported options are: - This is useful when you need to wait for an event handler (e.g. a read model - projection) to be up-to-date before continuing or querying its data. + - `:aggregate_state` - to return the update aggregate state. - ## Execution results + - `:aggregate_version` - to return only the aggregate version. - You can also choose to include the execution result as part of the dispatch result by - setting `include_execution_result` true. + - `:execution_result` - to return a `Commanded.Commands.ExecutionResult` + struct containing the aggregate's identity, state, version, and any events + produced from the command along with their associated metadata. - {:ok, execution_result} = BankApp.dispatch(command, include_execution_result: true) + - `false` - don't return anything except an `:ok`. - Or by setting `include_execution_result` in your application config file: + ### Aggregate state + + Return the updated aggregate state as part of the dispatch result: + + {:ok, %BankAccount{}} = BankApp.dispatch(command, returning: :aggregate_state) + + This is useful when you want to immediately return fields from the aggregate's + state without requiring an read model projection and waiting for the event(s) + to be projected. It may also be appropriate to use this feature for unit + tests. + + However, be warned that tightly coupling an aggregate's state with read + requests may be harmful. It's why CQRS enforces the separation of reads from + writes by defining two separate and specialised models. + + ### Aggregate version + + You can optionally choose to return the aggregate's version as part of the + dispatch result: + + {:ok, aggregate_version} = BankApp.dispatch(command, returning: :aggregate_version) + + This is useful when you need to wait for an event handler, such as a read model + projection, to be up-to-date before querying its data. + + ### Execution results + + You can also choose to return the execution result as part of the dispatch + result: + + alias Commanded.Commands.ExecutionResult + + {:ok, %ExecutionResult{} = result} = BankApp.dispatch(command, returning: :execution_result) + + Or by setting the `default_dispatch_return` in your application config file: # config/config.exs - config :commanded, include_execution_result: true + config :commanded, default_dispatch_return: :execution_result - Use this if you need to get information from the events produced by the aggregate - but you don't want to wait for the events to be projected. + Use the execution result struct to get information from the events produced + from the command. ## Metadata @@ -174,9 +208,8 @@ defmodule Commanded.Commands.Router do Commanded.Middleware.ExtractAggregateIdentity, Commanded.Middleware.ConsistencyGuarantee ], - consistency: get_env(:default_consistency, :eventual), - include_aggregate_version: get_env(:include_aggregate_version, false), - include_execution_result: get_env(:include_execution_result, false), + consistency: get_opt(unquote(opts), :default_consistency, :eventual), + returning: get_default_dispatch_return(unquote(opts)), dispatch_timeout: 5_000, lifespan: Commanded.Aggregates.DefaultLifespan, metadata: %{}, @@ -195,15 +228,16 @@ defmodule Commanded.Commands.Router do ## Example - defmodule BankingRouter do + defmodule BankRouter do use Commanded.Commands.Router middleware CommandLogger middleware MyCommandValidator middleware AuthorizeCommand - dispatch [OpenAccount,DepositMoney] to: BankAccount, identity: :account_number + dispatch [OpenAccount, DepositMoney], to: BankAccount, identity: :account_number end + """ defmacro middleware(middleware_module) do quote do @@ -283,6 +317,15 @@ defmodule Commanded.Commands.Router do @doc """ Configure the command, or list of commands, to be dispatched to the corresponding handler for a given aggregate. + + ## Example + + defmodule BankRouter do + use Commanded.Commands.Router + + dispatch [OpenAccount, DepositMoney], to: BankAccount, identity: :account_number + end + """ defmacro dispatch(command_module_or_modules, opts) do opts = parse_opts(opts, []) @@ -402,11 +445,25 @@ defmodule Commanded.Commands.Router do metadata = Keyword.get(opts, :metadata) || @default[:metadata] timeout = Keyword.get(opts, :timeout) || unquote(timeout) || @default[:dispatch_timeout] - include_aggregate_version = - Keyword.get(opts, :include_aggregate_version) || @default[:include_aggregate_version] + returning = + cond do + (returning = Keyword.get(opts, :returning)) in [ + :aggregate_state, + :aggregate_version, + :execution_result, + false + ] -> + returning + + Keyword.get(opts, :include_execution_result) == true -> + :execution_result + + Keyword.get(opts, :include_aggregate_version) == true -> + :aggregate_version - include_execution_result = - Keyword.get(opts, :include_execution_result) || @default[:include_execution_result] + true -> + @default[:returning] + end lifespan = Keyword.get(opts, :lifespan) || unquote(lifespan) || @default[:lifespan] retry_attempts = Keyword.get(opts, :retry_attempts) || @default[:retry_attempts] @@ -438,8 +495,7 @@ defmodule Commanded.Commands.Router do aggregate_module: unquote(aggregate), identity: identity, identity_prefix: identity_prefix, - include_aggregate_version: include_aggregate_version, - include_execution_result: include_execution_result, + returning: returning, timeout: timeout, lifespan: lifespan, metadata: metadata, @@ -464,8 +520,9 @@ defmodule Commanded.Commands.Router do """ @spec dispatch(command :: struct) :: :ok + | {:ok, aggregate_state :: struct} + | {:ok, aggregate_version :: non_neg_integer()} | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()} - | {:ok, aggregate_version :: integer} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term} @@ -496,31 +553,41 @@ defmodule Commanded.Commands.Router do will block until all strongly consistent event handlers and process managers have handled all events created by the command. - - `timeout` - as described above. - - - `include_aggregate_version` - set to true to include the aggregate - stream version in the success response: `{:ok, aggregate_version}` - The default is false, to return just `:ok`. - - - `include_execution_result` - set to true to include more - information about the dispatch, like the aggregate name, uuid, and - the produced events. Overrides `include_aggregate_version`. The - default is false to return `:ok`. See - `Commanded.Commands.ExecutionResult`. - - `metadata` - an optional map containing key/value pairs comprising the metadata to be associated with all events created by the command. - Returns `:ok` on success, unless `:include_aggregate_version` or - `:include_execution_result` is enabled, where it respectively returns - `{:ok, aggregate_version}` or `{:ok, %ExecutionResult{..}}`. Returns - `{:error, reason}` on failure. + - `returning` - to choose what response is returned from a successful + command dispatch. The default is to return an `:ok`. + + The available options are: + + - `:aggregate_state` - to return the update aggregate state in the + successful response: `{:ok, aggregate_state}`. + + - `:aggregate_version` - to include the aggregate stream version + in the successful response: `{:ok, aggregate_version}`. + + - `:execution_result` - to return a `Commanded.Commands.ExecutionResult` + struct containing the aggregate's identity, version, and any + events produced from the command along with their associated + metadata. + + - `false` - don't return anything except an `:ok`. + + - `timeout` - as described above. + + Returns `:ok` on success unless the `:returning` option is specified where + it returns one of `{:ok, aggregate_state}`, `{:ok, aggregate_version}`, or + `{:ok, %Commanded.Commands.ExecutionResult{}}`. + + Returns `{:error, reason}` on failure. """ @spec dispatch(command :: struct, timeout_or_opts :: integer | :infinity | keyword()) :: :ok + | {:ok, aggregate_state :: struct} + | {:ok, aggregate_version :: non_neg_integer()} | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()} - | {:ok, aggregate_version :: integer} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term} @@ -545,7 +612,31 @@ defmodule Commanded.Commands.Router do end @doc false - def get_env(name, default \\ nil), do: Application.get_env(:commanded, name, default) + def get_opt(opts, name, default \\ nil) do + Keyword.get(opts, name) || Application.get_env(:commanded, name) || default + end + + @doc false + def get_default_dispatch_return(opts) do + cond do + (default_dispatch_return = get_opt(opts, :default_dispatch_return)) in [ + :aggregate_state, + :aggregate_version, + :execution_result, + false + ] -> + default_dispatch_return + + get_opt(opts, :include_execution_result) == true -> + :execution_result + + get_opt(opts, :include_aggregate_version) == true -> + :aggregate_version + + true -> + false + end + end defp parse_opts([{:to, aggregate_or_handler} | opts], result) do case Keyword.pop(opts, :aggregate) do diff --git a/lib/commanded/process_managers/process_manager_instance.ex b/lib/commanded/process_managers/process_manager_instance.ex index 08f85216..f6273b26 100644 --- a/lib/commanded/process_managers/process_manager_instance.ex +++ b/lib/commanded/process_managers/process_manager_instance.ex @@ -166,7 +166,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do %RecordedEvent{correlation_id: correlation_id, event_id: event_id, event_number: event_number} = event - %State{application: application, idle_timeout: idle_timeout} = state + %State{idle_timeout: idle_timeout} = state case handle_event(event, state) do {:error, error} -> @@ -182,7 +182,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do commands -> # Copy event id, as causation id, and correlation id from handled event. - opts = [application: application, causation_id: event_id, correlation_id: correlation_id] + opts = [causation_id: event_id, correlation_id: correlation_id, returning: false] with :ok <- commands |> List.wrap() |> dispatch_commands(opts, state, event) do process_state = mutate_state(event, state) @@ -303,10 +303,6 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do :ok -> dispatch_commands(pending_commands, opts, state, last_event) - # when include_execution_result is set to true, the dispatcher returns an :ok tuple - {:ok, _} -> - dispatch_commands(pending_commands, opts, state, last_event) - error -> Logger.warn(fn -> describe(state) <> diff --git a/test/aggregates/aggregate_concurrency_test.exs b/test/aggregates/aggregate_concurrency_test.exs index 5dcbdfd3..33fdc6e8 100644 --- a/test/aggregates/aggregate_concurrency_test.exs +++ b/test/aggregates/aggregate_concurrency_test.exs @@ -96,10 +96,8 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do } end - test "should error after too many attempts", context do - %{account_number: account_number} = context - - # fail to append to stream + test "should error after too many attempts", %{account_number: account_number} do + # Fail to append to stream expect(MockEventStore, :append_to_stream, 6, fn _event_store_meta, ^account_number, 1, diff --git a/test/aggregates/execute_command_test.exs b/test/aggregates/execute_command_test.exs index 090c895a..9af0fca3 100644 --- a/test/aggregates/execute_command_test.exs +++ b/test/aggregates/execute_command_test.exs @@ -26,7 +26,7 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do {:ok, 1, events} = Aggregate.execute(BankApp, BankAccount, account_number, context) - assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1000}] + assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1_000}] shutdown_aggregate(BankApp, BankAccount, account_number) @@ -52,11 +52,11 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do {:ok, 1, events} = Aggregate.execute(BankApp, BankAccount, account_number, context) - assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1000}] + assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1_000}] shutdown_aggregate(BankApp, BankAccount, account_number) - # reload aggregate to fetch persisted events from event store and rebuild state by applying saved events + # Reload aggregate to fetch persisted events from event store and rebuild state by applying saved events {:ok, ^account_number} = open_aggregate(BankAccount, account_number) assert Aggregate.aggregate_version(BankApp, BankAccount, account_number) == 1 diff --git a/test/commands/dispatch_consistency_test.exs b/test/commands/dispatch_consistency_test.exs index c518fc87..a04809aa 100644 --- a/test/commands/dispatch_consistency_test.exs +++ b/test/commands/dispatch_consistency_test.exs @@ -19,7 +19,6 @@ defmodule Commanded.Commands.DispatchConsistencyTest do setup do start_supervised!(ConsistencyApp) - :ok end diff --git a/test/commands/dispatch_return_test.exs b/test/commands/dispatch_return_test.exs new file mode 100644 index 00000000..d2f8ef74 --- /dev/null +++ b/test/commands/dispatch_return_test.exs @@ -0,0 +1,187 @@ +defmodule Commanded.Commands.DispatchReturnTest do + use Commanded.StorageCase + + alias Commanded.Commands.ExecutionResult + alias Commanded.ExampleDomain.BankApp + alias Commanded.ExampleDomain.BankAccount + alias Commanded.ExampleDomain.BankAccount.Commands.DepositMoney + alias Commanded.ExampleDomain.BankAccount.Commands.OpenAccount + alias Commanded.ExampleDomain.BankAccount.Events.BankAccountOpened + alias Commanded.Helpers.CommandAuditMiddleware + + setup do + start_supervised!(BankApp) + start_supervised!(CommandAuditMiddleware) + :ok + end + + describe "dispatch return disabled" do + test "should return `:ok`" do + command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000} + + assert :ok == BankApp.dispatch(command, returning: false) + end + + test "should return an error on failure" do + command = %OpenAccount{account_number: "ACC123", initial_balance: -1} + + assert {:error, :invalid_initial_balance} == + BankApp.dispatch(command, returning: false) + end + end + + describe "dispatch return aggregate state" do + test "should return aggregate's updated state" do + assert {:ok, %BankAccount{account_number: "ACC123", balance: 1_000, state: :active}} == + BankApp.dispatch( + %OpenAccount{account_number: "ACC123", initial_balance: 1_000}, + returning: :aggregate_state + ) + + assert {:ok, %BankAccount{account_number: "ACC123", balance: 1_100, state: :active}} == + BankApp.dispatch( + %DepositMoney{account_number: "ACC123", amount: 100}, + returning: :aggregate_state + ) + end + + test "should return an error on failure" do + command = %OpenAccount{account_number: "ACC123", initial_balance: -1} + + assert {:error, :invalid_initial_balance} == + BankApp.dispatch(command, returning: :aggregate_state) + end + end + + describe "dispatch return aggregate version" do + test "should return aggregate's updated version" do + assert {:ok, 1} == + BankApp.dispatch( + %OpenAccount{account_number: "ACC123", initial_balance: 1_000}, + returning: :aggregate_version + ) + + assert {:ok, 2} == + BankApp.dispatch( + %DepositMoney{account_number: "ACC123", amount: 100}, + returning: :aggregate_version + ) + end + + test "should return an error on failure" do + command = %OpenAccount{account_number: "ACC123", initial_balance: -1} + + assert {:error, :invalid_initial_balance} == + BankApp.dispatch(command, returning: :aggregate_version) + end + end + + describe "dispatch return execution result" do + test "should return created events" do + metadata = %{"ip_address" => "127.0.0.1"} + command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000} + + assert BankApp.dispatch(command, metadata: metadata, returning: :execution_result) == + { + :ok, + %ExecutionResult{ + aggregate_uuid: "ACC123", + aggregate_state: %BankAccount{ + account_number: "ACC123", + balance: 1_000, + state: :active + }, + aggregate_version: 1, + events: [%BankAccountOpened{account_number: "ACC123", initial_balance: 1_000}], + metadata: metadata + } + } + end + + test "should return an error on failure" do + command = %OpenAccount{account_number: "ACC123", initial_balance: -1} + + assert {:error, :invalid_initial_balance} == + BankApp.dispatch(command, returning: :execution_result) + end + end + + describe "dispatch include aggregate version" do + test "should return aggregate's updated version" do + assert {:ok, 1} == + BankApp.dispatch( + %OpenAccount{account_number: "ACC123", initial_balance: 1_000}, + include_aggregate_version: true + ) + + assert {:ok, 2} == + BankApp.dispatch( + %DepositMoney{account_number: "ACC123", amount: 100}, + include_aggregate_version: true + ) + end + end + + describe "dispatch include execution result" do + test "should return created events" do + metadata = %{"ip_address" => "127.0.0.1"} + command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000} + + assert BankApp.dispatch(command, + metadata: metadata, + include_execution_result: true + ) == + { + :ok, + %ExecutionResult{ + aggregate_uuid: "ACC123", + aggregate_state: %BankAccount{ + account_number: "ACC123", + balance: 1_000, + state: :active + }, + aggregate_version: 1, + events: [%BankAccountOpened{account_number: "ACC123", initial_balance: 1_000}], + metadata: metadata + } + } + end + end + + describe "application dispatch return aggregate state" do + alias Commanded.Commands.DefaultDispatchReturnApp + + setup do + start_supervised!(DefaultDispatchReturnApp) + :ok + end + + test "should return aggregate's updated version" do + assert {:ok, 1} == + DefaultDispatchReturnApp.dispatch(%OpenAccount{ + account_number: "ACC123", + initial_balance: 1_000 + }) + + assert {:ok, 2} == + DefaultDispatchReturnApp.dispatch(%DepositMoney{ + account_number: "ACC123", + amount: 100 + }) + end + + test "should allow default to be overridden during dispatch" do + assert {:ok, 1} == + DefaultDispatchReturnApp.dispatch(%OpenAccount{ + account_number: "ACC123", + initial_balance: 1_000 + }) + + assert {:ok, 2} == + DefaultDispatchReturnApp.dispatch(%DepositMoney{ + account_number: "ACC123", + amount: 100 + }) + end + end +end diff --git a/test/commands/routing_commands_test.exs b/test/commands/routing_commands_test.exs index b8ceea22..152cb6e4 100644 --- a/test/commands/routing_commands_test.exs +++ b/test/commands/routing_commands_test.exs @@ -2,19 +2,16 @@ defmodule Commanded.Commands.RoutingCommandsTest do use Commanded.StorageCase alias Commanded.DefaultApp - alias Commanded.Commands.{ExecutionResult, UnregisteredCommand} + alias Commanded.Commands.UnregisteredCommand alias Commanded.EventStore alias Commanded.ExampleDomain.BankAccount - alias Commanded.ExampleDomain.{OpenAccountHandler, DepositMoneyHandler, WithdrawMoneyHandler} - - alias Commanded.ExampleDomain.BankAccount.Commands.{ - OpenAccount, - CloseAccount, - DepositMoney, - WithdrawMoney - } - - alias Commanded.ExampleDomain.BankAccount.Events.BankAccountOpened + alias Commanded.ExampleDomain.OpenAccountHandler + alias Commanded.ExampleDomain.DepositMoneyHandler + alias Commanded.ExampleDomain.WithdrawMoneyHandler + alias Commanded.ExampleDomain.BankAccount.Commands.OpenAccount + alias Commanded.ExampleDomain.BankAccount.Commands.CloseAccount + alias Commanded.ExampleDomain.BankAccount.Commands.DepositMoney + alias Commanded.ExampleDomain.BankAccount.Commands.WithdrawMoney @dispatch_opts [application: DefaultApp] @@ -368,24 +365,6 @@ defmodule Commanded.Commands.RoutingCommandsTest do ) end - describe "include aggregate version" do - test "should return aggregate's updated stream version" do - assert {:ok, 1} == - MultiCommandHandlerRouter.dispatch( - %OpenAccount{account_number: "ACC123", initial_balance: 1_000}, - application: DefaultApp, - include_aggregate_version: true - ) - - assert {:ok, 2} == - MultiCommandHandlerRouter.dispatch( - %DepositMoney{account_number: "ACC123", amount: 100}, - application: DefaultApp, - include_aggregate_version: true - ) - end - end - test "should allow setting metadata" do metadata = %{"ip_address" => "127.0.0.1"} @@ -410,26 +389,4 @@ defmodule Commanded.Commands.RoutingCommandsTest do assert event.metadata == metadata end) end - - describe "include execution result" do - test "should return created events" do - metadata = %{"ip_address" => "127.0.0.1"} - command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000} - - assert MultiCommandHandlerRouter.dispatch(command, - application: DefaultApp, - metadata: metadata, - include_execution_result: true - ) == - { - :ok, - %ExecutionResult{ - aggregate_uuid: "ACC123", - aggregate_version: 1, - events: [%BankAccountOpened{account_number: "ACC123", initial_balance: 1000}], - metadata: metadata - } - } - end - end end diff --git a/test/commands/support/returning/default_dispatch_return_app.ex b/test/commands/support/returning/default_dispatch_return_app.ex new file mode 100644 index 00000000..d82cbe9e --- /dev/null +++ b/test/commands/support/returning/default_dispatch_return_app.ex @@ -0,0 +1,18 @@ +defmodule Commanded.Commands.DefaultDispatchReturnApp do + alias Commanded.ExampleDomain.BankRouter + + use Commanded.Application, + otp_app: :commanded, + event_store: [ + adapter: Commanded.EventStore.Adapters.InMemory, + serializer: Commanded.Serialization.JsonSerializer + ], + pubsub: :local, + registry: :local, + default_dispatch_opts: [ + consistency: :eventual, + returning: :aggregate_version + ] + + router(BankRouter) +end From 8cfb938ddd621d55f90e8b829a623069b2cc51cb Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Thu, 27 Feb 2020 21:07:30 +0000 Subject: [PATCH 11/11] Release v1.0.1 --- CHANGELOG.md | 2 +- guides/Getting Started.md | 2 +- mix.exs | 2 +- mix.lock | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ea4d611..64f44bd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## Next release +## v1.0.1 ### Enhancements diff --git a/guides/Getting Started.md b/guides/Getting Started.md index 0effdaaa..723f74e0 100644 --- a/guides/Getting Started.md +++ b/guides/Getting Started.md @@ -6,7 +6,7 @@ Commanded can be installed from hex as follows. ```elixir def deps do - [{:commanded, "~> 1.0.0"}] + [{:commanded, "~> 1.0"}] end ``` diff --git a/mix.exs b/mix.exs index 72a3385e..55611971 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Commanded.Mixfile do use Mix.Project - @version "1.0.0" + @version "1.0.1" def project do [ diff --git a/mix.lock b/mix.lock index 27201778..71e06049 100644 --- a/mix.lock +++ b/mix.lock @@ -13,7 +13,7 @@ "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, "mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"}, - "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm", "052346cf322311c49a0f22789f3698eea030eec09b8c47367f0686ef2634ae14"}, + "mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm", "1f13f9f0f3e769a667a6b6828d29dec37497a082d195cc52dbef401a9b69bf38"}, }