Skip to content

Commit

Permalink
Merge pull request commanded#334 from commanded/feature/process-manag…
Browse files Browse the repository at this point in the history
…er-uuid

Process manager `indentity/0` function
  • Loading branch information
slashdotdash authored Jan 10, 2020
2 parents 4b45fb6 + 47dc09c commit 3b1e8cd
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Commanded scheduler:
- Handle custom type serialization in snapshot source type ([#165](https://github.com/commanded/commanded/pull/165)).
- Fix compiler warnings in generated code (routers, event handlers, and process managers).
- Add `InMemory.reset!/0` for testing purposes ([#175](https://github.com/commanded/commanded/pull/175)).
- Process manager `indentity/0` function ([#334](https://github.com/commanded/commanded/pull/334)).

### Bug fixes

Expand Down
27 changes: 27 additions & 0 deletions lib/commanded/process_managers/process_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,33 @@ defmodule Commanded.ProcessManagers.ProcessManager do
end
end

@doc """
Get the identity of the current process instance.
This must only be called within a process manager's `handle/2` or `apply/2`
callback function.
## Example
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
application: MyApp.Application,
name: __MODULE__
def interested?(%ProcessStarted{uuids: uuids}), do: {:start, uuids}
def handle(%IdentityProcessManager{}, %ProcessStarted{} = event) do
# Identify which uuid is associated with the current instance from the
# list of uuids in the event.
uuid = Commanded.ProcessManagers.ProcessManager.identity()
# ...
end
end
"""
defdelegate identity, to: Commanded.ProcessManagers.ProcessManagerInstance

def compile_config(module, opts) do
application = Keyword.get(opts, :application)

Expand Down
22 changes: 13 additions & 9 deletions lib/commanded/process_managers/process_manager_instance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
GenServer.call(instance, :process_state)
end

@doc """
Get the current process manager instance's identity.
"""
def identity, do: Process.get(:process_uuid)

@doc false
@impl GenServer
def init(%State{} = state) do
Expand All @@ -82,10 +87,10 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
"""
@impl GenServer
def handle_continue(:fetch_state, %State{} = state) do
%State{application: application} = state
%State{application: application, process_uuid: process_uuid} = state

state =
case EventStore.read_snapshot(application, process_state_uuid(state)) do
case EventStore.read_snapshot(application, snapshot_uuid(state)) do
{:ok, snapshot} ->
%State{
state
Expand All @@ -97,6 +102,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
state
end

Process.put(:process_uuid, process_uuid)

{:noreply, state}
end

Expand Down Expand Up @@ -386,7 +393,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
} = state

snapshot = %SnapshotData{
source_uuid: process_state_uuid(state),
source_uuid: snapshot_uuid(state),
source_version: source_version,
source_type: Atom.to_string(process_manager_module),
data: process_state
Expand All @@ -398,7 +405,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
defp delete_state(%State{} = state) do
%State{application: application} = state

EventStore.delete_snapshot(application, process_state_uuid(state))
EventStore.delete_snapshot(application, snapshot_uuid(state))
end

defp ack_event(%RecordedEvent{} = event, %State{} = state) do
Expand All @@ -407,11 +414,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
ProcessRouter.ack_event(process_router, event, self())
end

defp process_state_uuid(%State{} = state) do
%State{
process_manager_name: process_manager_name,
process_uuid: process_uuid
} = state
defp snapshot_uuid(%State{} = state) do
%State{process_manager_name: process_manager_name, process_uuid: process_uuid} = state

inspect(process_manager_name) <> "-" <> inspect(process_uuid)
end
Expand Down
50 changes: 42 additions & 8 deletions test/process_managers/process_manager_instance_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do
alias Commanded.ExampleDomain.MoneyTransfer.Events.MoneyTransferRequested
alias Commanded.ExampleDomain.TransferMoneyProcessManager
alias Commanded.EventStore.Adapters.Mock, as: MockEventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.EventStore.SnapshotData
alias Commanded.ProcessManagers.ProcessManagerInstance
alias Commanded.ProcessManagers.ProcessRouter

setup :set_mox_global
setup :verify_on_exit!
Expand Down Expand Up @@ -63,24 +63,52 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do

{:ok, instance} = start_process_manager_instance(transfer_uuid)

event = %RecordedEvent{
event_number: 1,
stream_id: "stream-id",
stream_version: 1,
data: %MoneyTransferRequested{
event =
to_recorded_event(%MoneyTransferRequested{
transfer_uuid: transfer_uuid,
debit_account: debit_account,
credit_account: credit_account,
amount: 100
}
}
})

:ok = ProcessManagerInstance.process_event(instance, event)

# Should send ack to process router after processing event
assert_receive({:"$gen_cast", {:ack_event, ^event, _instance}}, 1_000)
end

test "get current process identity" do
alias Commanded.ProcessManagers.IdentityProcessManager
alias Commanded.ProcessManagers.IdentityProcessManager.AnEvent
alias Commanded.DefaultApp
alias Commanded.Helpers.Wait

start_supervised!(DefaultApp)

{:ok, process_router} = start_supervised(IdentityProcessManager)

process_uuids = Enum.sort([UUID.uuid4(), UUID.uuid4(), UUID.uuid4()])

event = to_recorded_event(%AnEvent{uuids: process_uuids, reply_to: self()})

send(process_router, {:events, [event]})

process_instances =
Wait.until(fn ->
process_instances = ProcessRouter.process_instances(process_router)

assert process_instances
|> Enum.map(fn {process_uuid, _pid} -> process_uuid end)
|> Enum.sort() == process_uuids

process_instances
end)

for {process_uuid, pid} <- process_instances do
assert_receive {:identity, ^process_uuid, ^pid}
end
end

test "ignore unexpected messages" do
import ExUnit.CaptureLog

Expand Down Expand Up @@ -151,4 +179,10 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do
process_uuid: transfer_uuid}
)
end

defp to_recorded_event(event) do
alias Commanded.EventStore.RecordedEvent

%RecordedEvent{event_number: 1, stream_id: "stream-id", stream_version: 1, data: event}
end
end
32 changes: 32 additions & 0 deletions test/process_managers/support/identity/identity_process_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Commanded.ProcessManagers.IdentityProcessManager do
@moduledoc false

alias Commanded.DefaultApp
alias Commanded.ProcessManagers.IdentityProcessManager
alias Commanded.ProcessManagers.ProcessManager

use Commanded.ProcessManagers.ProcessManager,
application: DefaultApp,
name: __MODULE__

@derive Jason.Encoder
defstruct [:uuid]

defmodule AnEvent do
defstruct [:uuids, :reply_to]
end

def interested?(%AnEvent{uuids: uuids}), do: {:start, uuids}

def handle(%IdentityProcessManager{}, %AnEvent{} = event) do
%AnEvent{reply_to: reply_to} = event

uuid = ProcessManager.identity()

send(reply_to, {:identity, uuid, self()})

[]
end

def apply(%IdentityProcessManager{} = state, _event), do: state
end

0 comments on commit 3b1e8cd

Please sign in to comment.