Skip to content

Commit

Permalink
Merge pull request commanded#311 from commanded/feature/adapters
Browse files Browse the repository at this point in the history
Define adapter behaviour modules for event store, pubsub, and registry
  • Loading branch information
slashdotdash authored Nov 5, 2019
2 parents dd650ab + b459ee3 commit fd3b8a4
Showing 36 changed files with 874 additions and 961 deletions.
2 changes: 1 addition & 1 deletion guides/Choosing an Event Store.md
Original file line number Diff line number Diff line change
@@ -36,6 +36,6 @@ Use the `--run-projections=all --start-standard-projections=true` flags when run

## Writing your own event store provider

To use an alternative event store with Commanded you will need to implement the `Commanded.EventStore` behaviour. This defines the contract to be implemented by an adapter module to allow an event store to be used with Commanded. Tests to verify an adapter conforms to the behaviour are provided in `test/event_store_adapter`.
To use an alternative event store with Commanded you will need to implement the `Commanded.EventStore.Adapter` behaviour. This defines the contract to be implemented by an adapter module to allow an event store to be used with Commanded. Tests to verify an adapter conforms to the behaviour are provided in `test/event_store_adapter`.

You can use one of the existing adapters ([commanded_eventstore_adapter](https://github.com/commanded/commanded-eventstore-adapter) or [commanded_extreme_adapter](https://github.com/commanded/commanded-extreme-adapter)) to understand what is required.
43 changes: 10 additions & 33 deletions lib/application.ex
Original file line number Diff line number Diff line change
@@ -51,31 +51,11 @@ defmodule Commanded.Application do

{otp_app, config} = Commanded.Application.Supervisor.compile_config(__MODULE__, opts)

event_store_adapter = Commanded.EventStore.adapter(__MODULE__, config)
pubsub_adapter = Commanded.PubSub.pubsub_provider(__MODULE__, config)
registry_adapter = Commanded.Registration.registry_provider(__MODULE__, config)

@otp_app otp_app
@config config
@name Keyword.get(opts, :name, __MODULE__)

use Commanded.Commands.CompositeRouter,
application: __MODULE__

defmodule EventStore do
use Commanded.EventStore.Adapter,
adapter: event_store_adapter
end

defmodule PubSub do
use Commanded.PubSub.Adapter,
adapter: pubsub_adapter
end

defmodule Registration do
use Commanded.Registration.Adapter,
adapter: registry_adapter
end
use Commanded.Commands.CompositeRouter, application: __MODULE__

def config do
{:ok, config} =
@@ -93,15 +73,7 @@ defmodule Commanded.Application do
end

def start_link(opts \\ []) do
Commanded.Application.Supervisor.start_link(
__MODULE__,
@otp_app,
EventStore,
PubSub,
Registration,
@config,
opts
)
Commanded.Application.Supervisor.start_link(__MODULE__, @otp_app, @config, opts)
end

def stop(pid, timeout \\ 5000) do
@@ -154,12 +126,17 @@ defmodule Commanded.Application do
| {:error, :consistency_timeout}
| {:error, reason :: term}

alias Commanded.Application.Config

@doc false
def event_store_adapter(application), do: Module.concat([application, EventStore])
@spec event_store_adapter(Commanded.Application.t()) :: {module, map}
def event_store_adapter(application), do: Config.get(application, :event_store)

@doc false
def pubsub_adapter(application), do: Module.concat([application, PubSub])
@spec pubsub_adapter(Commanded.Application.t()) :: {module, map}
def pubsub_adapter(application), do: Config.get(application, :pubsub)

@doc false
def registry_adapter(application), do: Module.concat([application, Registration])
@spec registry_adapter(Commanded.Application.t()) :: {module, map}
def registry_adapter(application), do: Config.get(application, :registry)
end
4 changes: 3 additions & 1 deletion lib/commanded/aggregates/aggregate.ex
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ defmodule Commanded.Aggregates.Aggregate do
alias Commanded.Event.Upcast
alias Commanded.EventStore
alias Commanded.EventStore.{RecordedEvent, SnapshotData}
alias Commanded.Registration
alias Commanded.Snapshotting

@read_event_batch_size 100
@@ -548,7 +549,8 @@ defmodule Commanded.Aggregates.Aggregate do

defp via_name(application, aggregate_module, aggregate_uuid) do
name = name(application, aggregate_module, aggregate_uuid)
via_tuple(application, name)

Registration.via_tuple(application, name)
end

defp describe(%Aggregate{} = aggregate) do
76 changes: 58 additions & 18 deletions lib/commanded/application/supervisor.ex
Original file line number Diff line number Diff line change
@@ -35,37 +35,30 @@ defmodule Commanded.Application.Supervisor do
@doc """
Starts the application supervisor.
"""
def start_link(application, otp_app, event_store, pubsub, registry, config, opts) do
def start_link(application, otp_app, config, opts) do
sup_opts = if name = Keyword.get(opts, :name, application), do: [name: name], else: []

Supervisor.start_link(
__MODULE__,
{application, otp_app, event_store, registry, pubsub, config, opts},
{application, otp_app, config, opts},
sup_opts
)
end

def init({application, otp_app, event_store, pubsub, registry, config, opts}) do
def init({application, otp_app, config, opts}) do
case runtime_config(application, otp_app, config, opts) do
{:ok, config} ->
:ok = Config.associate(self(), config)
{event_store_child_spec, config} = event_store_child_spec(application, config)
{pubsub_child_spec, config} = pubsub_child_spec(application, config)
{registry_child_spec, config} = registry_child_spec(application, config)

task_dispatcher_name = Module.concat([application, Commanded.Commands.TaskDispatcher])
subscriptions_name = Module.concat([application, Commanded.Subscriptions])
registry_name = Module.concat([application, Commanded.Subscriptions.Registry])
snapshotting = Keyword.get(config, :snapshotting, %{})
:ok = Config.associate(self(), config)

children =
event_store.child_spec(Keyword.get(config, :event_store, [])) ++
pubsub.child_spec(Keyword.get(config, :pubsub, [])) ++
registry.child_spec(Keyword.get(config, :registry, [])) ++
[
{Task.Supervisor, name: task_dispatcher_name},
{Commanded.Aggregates.Supervisor,
application: application, snapshotting: snapshotting},
{Commanded.Subscriptions.Registry, application: application, name: registry_name},
{Commanded.Subscriptions, application: application, name: subscriptions_name}
]
event_store_child_spec ++
pubsub_child_spec ++
registry_child_spec ++
commanded_child_spec(application, config)

Supervisor.init(children, strategy: :one_for_one)

@@ -74,6 +67,53 @@ defmodule Commanded.Application.Supervisor do
end
end

defp commanded_child_spec(application, config) do
task_dispatcher_name = Module.concat([application, Commanded.Commands.TaskDispatcher])
subscriptions_name = Module.concat([application, Commanded.Subscriptions])
registry_name = Module.concat([application, Commanded.Subscriptions.Registry])
snapshotting = Keyword.get(config, :snapshotting, %{})

[
{Task.Supervisor, name: task_dispatcher_name},
{Commanded.Aggregates.Supervisor, application: application, snapshotting: snapshotting},
{Commanded.Subscriptions.Registry, application: application, name: registry_name},
{Commanded.Subscriptions, application: application, name: subscriptions_name}
]
end

defp event_store_child_spec(application, config) do
{adapter, adapter_config} =
Commanded.EventStore.adapter(application, Keyword.get(config, :event_store))

{:ok, child_spec, adapter_meta} = adapter.child_spec(application, adapter_config)

config = Keyword.put(config, :event_store, {adapter, adapter_meta})

{child_spec, config}
end

defp pubsub_child_spec(application, config) do
{adapter, adapter_config} =
Commanded.PubSub.adapter(application, Keyword.get(config, :pubsub, :local))

{:ok, child_spec, adapter_meta} = adapter.child_spec(application, adapter_config)

config = Keyword.put(config, :pubsub, {adapter, adapter_meta})

{child_spec, config}
end

defp registry_child_spec(application, config) do
{adapter, adapter_config} =
Commanded.Registration.adapter(application, Keyword.get(config, :registry, :local))

{:ok, child_spec, adapter_meta} = adapter.child_spec(application, adapter_config)

config = Keyword.put(config, :registry, {adapter, adapter_meta})

{child_spec, config}
end

defp application_init(application, config) do
if Code.ensure_loaded?(application) and function_exported?(application, :init, 1) do
application.init(config)
Loading

0 comments on commit fd3b8a4

Please sign in to comment.