Skip to content

Commit

Permalink
Merge branch 'releases/v1.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Apr 23, 2020
2 parents a729117 + 8cfb938 commit 76832a2
Show file tree
Hide file tree
Showing 31 changed files with 912 additions and 320 deletions.
7 changes: 5 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ elixir:
otp_release:
- 22.0

before_script:
- epmd -daemon

script:
- mix test
- mix format --check-formatted
- travis_wait 30 mix dialyzer --halt-exit-status
- mix test --include distributed
- travis_wait 30 mix dialyzer
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
- Fixes the typespec for command dispatch ([#325](https://github.com/commanded/commanded/pull/325)).
- Process manager stops if `interested?/1` returns an empty list ([#335](https://github.com/commanded/commanded/pull/335)).

## v1.0.1

### Enhancements

- Global registry using Erlang's `:global` module ([#344](https://github.com/commanded/commanded/pull/344)).
- Command dispatch return ([#331](https://github.com/commanded/commanded/pull/331)).

### Bug fixes

- Fix distributed subscription registration bug ([#345](https://github.com/commanded/commanded/pull/345)).
- Retry event handler and process manager suscriptions on error ([#348](https://github.com/commanded/commanded/pull/348)).

## v1.0.0

### Breaking changes
Expand Down
6 changes: 4 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ alias Commanded.Serialization.JsonSerializer
config :logger, :console, level: :debug, format: "[$level] $message\n"

config :ex_unit,
assert_receive_timeout: 1_000,
capture_log: true,
assert_receive_timeout: 1_000
exclude: [:distributed]

config :commanded,
assert_receive_event_timeout: 100,
Expand All @@ -21,7 +22,8 @@ default_app_config = [
]

config :commanded, Commanded.Commands.ConsistencyApp, default_app_config
config :commanded, Commanded.DefaultApp, default_app_config
config :commanded, Commanded.DefaultApp, []
config :commanded, Commanded.DistributedApp, []
config :commanded, Commanded.Event.Upcast.ProcessManager.Application, default_app_config
config :commanded, Commanded.Middleware.TenantApp, default_app_config
config :commanded, Commanded.ProcessManagers.ErrorApp, default_app_config
Expand Down
44 changes: 35 additions & 9 deletions guides/Deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,45 @@ Commanded supports running on a single node, or multiple nodes run as either a [

Running your app using Commanded on a single node requires no configuration as local is the default setting.

## Multi node cluster deployment
## Multi node distributed Erlang deployment

To support deployment to a cluster of nodes you must use the [Commanded Swarm registry](https://github.com/commanded/commanded-swarm-registry) library and [Phoenix's distributed pub/sub and presence platform](https://hex.pm/packages/phoenix_pubsub) to allow process distribution and communication amongst all nodes in the cluster.
To support deployment to a cluster of nodes and distributed Erlang you must configure:

1. A registry which supports distributed Erlang. The `:global` registry provided by Commanded or the [Commanded Swarm registry](https://github.com/commanded/commanded-swarm-registry) library.
2. [Phoenix's distributed pub/sub and presence platform](https://hex.pm/packages/phoenix_pubsub) to allow process distribution and communication amongst all nodes in the cluster.

### `:global` registry

Use Erlang's [`:global`](http://erlang.org/doc/man/global.html) name registration facility with distributed Erlang. The global name server starts automatically when a node is started. The registered names are stored in replica global name tables on every node. Thus, the translation of a name to a pid is fast, as it is always done locally.

Define the `:global` registry for your application:

```elixir
defmodule MyApp.Application do
use Commanded.Application,
otp_app: :my_app,
event_store: [
adapter: Commanded.EventStore.Adapters.EventStore,
event_store: MyApp.EventStore
],
registry: :global
end
```

Or configure your application to use the `:global` registry in config:

```elixir
# config/config.exs
config :my_app, MyApp.Application, registry: :global
```

### Commanded Swarm registry

Add `commanded_swarm_registry` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:commanded_swarm_registry, "~> 1.0.0"}
]
[{:commanded_swarm_registry, "~> 1.0"}]
end
```

Expand All @@ -43,14 +69,14 @@ First, add it as a dependency to your project's `mix.exs` file:

```elixir
defp deps do
[{:phoenix_pubsub, "~> 1.0"}]
[{:phoenix_pubsub, "~> 1.1"}]
end
```

Fetch mix deps and configure the pubsub settings for your application in the environment config file:

```elixir
# `config/config.exs`
# config/config.exs
config :my_app, MyApp.Application,
pubsub: [
phoenix_pubsub: [
Expand All @@ -66,9 +92,9 @@ The PG2 adapter is preferable for cluster deployment since it is provided by Erl

If using PostgreSQL-based Elixir EventStore please also refer to its documentation about [running on a clustering of nodes](https://hexdocs.pm/eventstore/cluster.html).

## Multi node, but not clustered deployment
## Multi node, but not distributed Erlang deployment

Running multiple nodes, but choosing not to connect the nodes together to form a cluster, requires that you use the local registry and Phoenix's pub/sub library with its Redis adapter.
Running multiple nodes, but choosing not to connect the nodes together to form a distributed Erlang cluster, requires that you use the local registry and Phoenix's pub/sub library with its Redis adapter.

You must install and use Phoenix's pub/sub library, [as described above](#phoenix-pub-sub).

Expand Down
2 changes: 1 addition & 1 deletion guides/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Commanded can be installed from hex as follows.

```elixir
def deps do
[{:commanded, "~> 1.0.0"}]
[{:commanded, "~> 1.0"}]
end
```

Expand Down
137 changes: 65 additions & 72 deletions lib/commanded/event/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ defmodule Commanded.Event.Handler do
alias Commanded.Event.FailureContext
alias Commanded.Event.Handler
alias Commanded.Event.Upcast
alias Commanded.EventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.EventStore.Subscription
alias Commanded.Subscriptions

@type domain_event :: struct()
Expand Down Expand Up @@ -418,35 +418,44 @@ defmodule Commanded.Event.Handler do
:handler_name,
:handler_module,
:last_seen_event,
:subscribe_from,
:subscribe_to,
:subscription,
:subscription_ref
:subscribe_timer
]

@doc false
def start_link(application, handler_name, handler_module, opts \\ []) do
name = name(application, handler_name)
consistency = consistency(opts)

subscription =
Subscription.new(
application: application,
subscription_name: handler_name,
subscribe_from: Keyword.get(opts, :start_from, :origin),
subscribe_to: Keyword.get(opts, :subscribe_to, :all)
)

handler = %Handler{
application: application,
handler_name: handler_name,
handler_module: handler_module,
consistency: consistency(opts),
subscribe_from: start_from(opts),
subscribe_to: subscribe_to(opts)
consistency: consistency,
subscription: subscription
}

Registration.start_link(application, name, __MODULE__, handler)
with {:ok, pid} <- Registration.start_link(application, name, __MODULE__, handler) do
# Register the started event handler as a subscription with the given consistency
:ok = Subscriptions.register(application, handler_name, pid, consistency)

{:ok, pid}
end
end

def name(application, handler_name), do: {application, __MODULE__, handler_name}

@doc false
@impl GenServer
def init(%Handler{} = state) do
:ok = register_subscription(state)

{:ok, state, {:continue, :subscribe_to_events}}
end

Expand All @@ -467,8 +476,10 @@ defmodule Commanded.Event.Handler do
@doc false
@impl GenServer
def handle_call(:config, _from, %Handler{} = state) do
%Handler{consistency: consistency, subscribe_from: subscribe_from, subscribe_to: subscribe_to} =
state
%Handler{
consistency: consistency,
subscription: %Subscription{subscribe_from: subscribe_from, subscribe_to: subscribe_to}
} = state

config = [consistency: consistency, start_from: subscribe_from, subscribe_to: subscribe_to]

Expand Down Expand Up @@ -501,10 +512,19 @@ defmodule Commanded.Event.Handler do
end
end

@doc false
@impl GenServer
def handle_info(:subscribe_to_events, %Handler{} = state) do
{:noreply, subscribe_to_events(state)}
end

@doc false
# Subscription to event store has successfully subscribed, init event handler
@impl GenServer
def handle_info({:subscribed, subscription}, %Handler{subscription: subscription} = state) do
def handle_info(
{:subscribed, subscription},
%Handler{subscription: %Subscription{subscription_pid: subscription}} = state
) do
Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end)

%Handler{handler_module: handler_module} = state
Expand Down Expand Up @@ -541,7 +561,10 @@ defmodule Commanded.Event.Handler do

@doc false
@impl GenServer
def handle_info({:DOWN, ref, :process, _pid, reason}, %Handler{subscription_ref: ref} = state) do
def handle_info(
{:DOWN, ref, :process, _pid, reason},
%Handler{subscription: %Subscription{subscription_ref: ref}} = state
) do
Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end)

# Stop event handler when event store subscription process terminates.
Expand All @@ -558,44 +581,34 @@ defmodule Commanded.Event.Handler do
{:noreply, state}
end

# Register this event handler as a subscription with the given consistency.
defp register_subscription(%Handler{} = state) do
%Handler{application: application, consistency: consistency, handler_name: name} = state

Subscriptions.register(application, name, consistency)
end

defp reset_subscription(%Handler{} = state) do
%Handler{
application: application,
handler_name: handler_name,
subscribe_to: subscribe_to,
subscription_ref: subscription_ref,
subscription: subscription
} = state
%Handler{subscription: subscription} = state

Process.demonitor(subscription_ref)
subscription = Subscription.reset(subscription)

:ok = EventStore.unsubscribe(application, subscription)
:ok = EventStore.delete_subscription(application, subscribe_to, handler_name)

%Handler{state | last_seen_event: nil, subscription: nil, subscription_ref: nil}
%Handler{state | last_seen_event: nil, subscription: subscription, subscribe_timer: nil}
end

defp subscribe_to_events(%Handler{} = state) do
%Handler{
application: application,
handler_name: handler_name,
subscribe_from: subscribe_from,
subscribe_to: subscribe_to
} = state
%Handler{subscription: subscription} = state

{:ok, subscription} =
EventStore.subscribe_to(application, subscribe_to, handler_name, self(), subscribe_from)
case Subscription.subscribe(subscription, self()) do
{:ok, subscription} ->
%Handler{state | subscription: subscription, subscribe_timer: nil}

subscription_ref = Process.monitor(subscription)
{:error, error} ->
{backoff, subscription} = Subscription.backoff(subscription)

%Handler{state | subscription: subscription, subscription_ref: subscription_ref}
Logger.info(fn ->
describe(state) <>
" failed to subscribe to event store due to: " <>
inspect(error) <> ", retrying in " <> inspect(backoff) <> "ms"
end)

subscribe_timer = Process.send_after(self(), :subscribe_to_events, backoff)

%Handler{state | subscription: subscription, subscribe_timer: subscribe_timer}
end
end

defp handle_event(event, handler, context \\ %{})
Expand Down Expand Up @@ -708,27 +721,23 @@ defmodule Commanded.Event.Handler do

# Confirm receipt of event
defp confirm_receipt(%RecordedEvent{} = event, %Handler{} = state) do
%RecordedEvent{event_number: event_number} = event

Logger.debug(fn ->
describe(state) <> " confirming receipt of event ##{inspect(event_number)}"
end)

ack_event(event, state)

%Handler{state | last_seen_event: event_number}
end

defp ack_event(event, %Handler{} = state) do
%Handler{
application: application,
consistency: consistency,
handler_name: handler_name,
subscription: subscription
} = state

:ok = EventStore.ack_event(application, subscription, event)
%RecordedEvent{event_number: event_number} = event

Logger.debug(fn ->
describe(state) <> " confirming receipt of event ##{inspect(event_number)}"
end)

:ok = Subscription.ack_event(subscription, event)
:ok = Subscriptions.ack_event(application, handler_name, consistency, event)

%Handler{state | last_seen_event: event_number}
end

@enrich_metadata_fields [
Expand Down Expand Up @@ -757,22 +766,6 @@ defmodule Commanded.Event.Handler do
end
end

defp start_from(opts) do
case opts[:start_from] || :origin do
start_from when start_from in [:origin, :current] -> start_from
start_from when is_integer(start_from) -> start_from
invalid -> "Invalid `start_from` option: #{inspect(invalid)}"
end
end

defp subscribe_to(opts) do
case opts[:subscribe_to] || :all do
:all -> :all
stream when is_binary(stream) -> stream
invalid -> "Invalid `subscribe_to` option: #{inspect(invalid)}"
end
end

defp describe(%Handler{handler_module: handler_module}),
do: inspect(handler_module)
end
Loading

0 comments on commit 76832a2

Please sign in to comment.