Skip to content

Commit

Permalink
Support Phoenix.PubSub v1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Aug 26, 2018
1 parent a4890b3 commit 10c9f88
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Enhancements

- Do not start new process manager instance on `:continue` ([#181](https://github.com/commanded/commanded/pull/181)).
- Support [`Phoenix.PubSub` v1.1.0](https://hexdocs.pm/phoenix/1.1.0/Phoenix.PubSub.html).


## v0.17.0

Expand Down
4 changes: 2 additions & 2 deletions lib/commanded/pubsub/phoenix_pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ if Code.ensure_loaded?(Phoenix.PubSub) do

def start_link(opts) do
opts = Keyword.merge([name: __MODULE__], opts)
GenServer.start_link(Phoenix.Tracker, [__MODULE__, opts, opts], name: __MODULE__)
Phoenix.Tracker.start_link(__MODULE__, opts, opts)
end

def init(opts) do
Expand Down Expand Up @@ -76,7 +76,7 @@ if Code.ensure_loaded?(Phoenix.PubSub) do
%{
id: Tracker,
start: {Tracker, :start_link, [[pubsub_server: __MODULE__]]},
type: :worker
type: :supervisor
}
]
end
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ defmodule Commanded.Mixfile do

# Build & test tools
{:dialyxir, "~> 0.5", only: :dev, runtime: false},
{:ex_doc, "~> 0.18", only: :dev},
{:mix_test_watch, "~> 0.7", only: :dev},
{:ex_doc, "~> 0.19", only: :dev},
{:mix_test_watch, "~> 0.8", only: :dev},
{:mox, "~> 0.4", only: :test},

# Optional dependencies
Expand Down
12 changes: 7 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
%{
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.4", "4406b8891cecf1352f49975c6d554e62e4341ceb41b9338949077b0d4a97b949", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"file_system": {:hex, :file_system, "0.2.6", "fd4dc3af89b9ab1dc8ccbcc214a0e60c41f34be251d9307920748a14bf41f1d3", [:mix], [], "hexpm"},
"fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], [], "hexpm"},
"mix_test_watch": {:hex, :mix_test_watch, "0.7.0", "205f77063ed9b81ca41a2ab78486c653f9bfb1e5a341b5cf8fc2747dae67b0df", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "0.5.1", "966c5c2296da272d42f1de178c1d135e432662eca795d6dc12e5e8787514edf7", [:mix], [{:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.8.0", "1204a2f5b4f181775a0e456154830524cf2207cf4f9112215c05e0b76e4eca8b", [:mix], [{:makeup, "~> 0.5.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"mix_test_watch": {:hex, :mix_test_watch, "0.8.0", "acf97da2abc66532e7dc1aa66a5d6c9fc4442d7992d5d7eb4faeaeb964c2580e", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"},
"mox": {:hex, :mox, "0.4.0", "7f120840f7d626184a3d65de36189ca6f37d432e5d63acd80045198e4c5f7e6e", [:mix], [], "hexpm"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "1.0.2", "bfa7fd52788b5eaa09cb51ff9fcad1d9edfeb68251add458523f839392f034c1", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.2.2", "d526b23bdceb04c7ad15b33c57c4526bf5f50aaa70c7c141b4b4624555c68259", [:mix], [], "hexpm"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.0", "d55e25ff1ff8ea2f9964638366dfd6e361c52dedfd50019353598d11d4441d14", [:mix], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"},
}
151 changes: 122 additions & 29 deletions test/subscriptions/subscriptions_test.exs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
defmodule Commanded.SubscriptionsTest do
use ExUnit.Case
use Commanded.StorageCase

alias Commanded.EventStore.RecordedEvent
alias Commanded.Subscriptions

setup do
Subscriptions.reset()
end

describe "register event handler" do
test "should be registered" do
:ok = Subscriptions.register("handler1", :strong)
Expand All @@ -19,7 +15,12 @@ defmodule Commanded.SubscriptionsTest do

test "should ack event" do
:ok = Subscriptions.register("handler1", :strong)
:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})

:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

assert Subscriptions.handled?("stream1", 1)
assert Subscriptions.handled?("stream1", 2)
Expand All @@ -29,14 +30,29 @@ defmodule Commanded.SubscriptionsTest do
:ok = Subscriptions.register("handler1", :strong)
:ok = Subscriptions.register("handler2", :strong)

:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

refute Subscriptions.handled?("stream1", 1)

:ok = Subscriptions.ack_event("handler2", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})
:ok =
Subscriptions.ack_event("handler2", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

assert Subscriptions.handled?("stream1", 1)
refute Subscriptions.handled?("stream1", 2)

:ok = Subscriptions.ack_event("handler2", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler2", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

assert Subscriptions.handled?("stream1", 1)
assert Subscriptions.handled?("stream1", 2)
end
Expand All @@ -57,8 +73,17 @@ defmodule Commanded.SubscriptionsTest do
test "should immediately succeed when waited event has already been ack'd" do
:ok = Subscriptions.register("handler", :strong)

:ok = Subscriptions.ack_event("handler", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})
:ok = Subscriptions.ack_event("handler", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

:ok =
Subscriptions.ack_event("handler", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

assert :ok == Subscriptions.wait_for("stream1", 2)
end
Expand All @@ -72,12 +97,22 @@ defmodule Commanded.SubscriptionsTest do
test "should succeed when waited event is ack'd" do
:ok = Subscriptions.register("handler", :strong)

wait_task = Task.async(fn ->
Subscriptions.wait_for("stream1", 2, [], 1_000)
end)
wait_task =
Task.async(fn ->
Subscriptions.wait_for("stream1", 2, [], 1_000)
end)

:ok =
Subscriptions.ack_event("handler", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

:ok = Subscriptions.ack_event("handler", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})
:ok = Subscriptions.ack_event("handler", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

assert :ok == Task.await(wait_task, 1_000)
end
Expand All @@ -97,13 +132,31 @@ defmodule Commanded.SubscriptionsTest do

refute Subscriptions.handled?("stream1", 2)

:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})
:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

refute Subscriptions.handled?("stream1", 2)

:ok = Subscriptions.ack_event("handler2", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})
:ok = Subscriptions.ack_event("handler2", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler2", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

:ok =
Subscriptions.ack_event("handler2", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

assert Subscriptions.handled?("stream1", 2)
end
Expand All @@ -113,7 +166,11 @@ defmodule Commanded.SubscriptionsTest do

refute Subscriptions.handled?("stream1", 2)

:ok = Subscriptions.ack_event("handler", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 4})
:ok =
Subscriptions.ack_event("handler", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 4
})

assert Subscriptions.handled?("stream1", 2)
end
Expand All @@ -122,7 +179,11 @@ defmodule Commanded.SubscriptionsTest do
:ok = Subscriptions.register("handler1", :strong)
:ok = Subscriptions.register("handler2", :strong)

:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

refute Subscriptions.handled?("stream1", 2)
assert :ok == Subscriptions.wait_for("stream1", 2, consistency: ["handler1"])
Expand All @@ -134,23 +195,50 @@ defmodule Commanded.SubscriptionsTest do
:ok = Subscriptions.register("handler3", :strong)
:ok = Subscriptions.register("handler3", :eventual)

:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok = Subscriptions.ack_event("handler2", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 2})
:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

:ok =
Subscriptions.ack_event("handler2", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 2
})

refute Subscriptions.handled?("stream1", 2)
assert Subscriptions.handled?("stream1", 2, consistency: ["handler1", "handler2"])
refute Subscriptions.handled?("stream1", 2, consistency: ["handler1", "handler2", "handler3"])
assert Subscriptions.handled?("stream1", 2, consistency: ["handler1", "handler2", "handler4"])

refute Subscriptions.handled?("stream1", 2,
consistency: ["handler1", "handler2", "handler3"]
)

assert Subscriptions.handled?("stream1", 2,
consistency: ["handler1", "handler2", "handler4"]
)

assert :ok == Subscriptions.wait_for("stream1", 2, consistency: ["handler1", "handler2"])
assert {:error, :timeout} == Subscriptions.wait_for("stream1", 2, [consistency: ["handler1", "handler2", "handler3"]], 100)

assert {:error, :timeout} ==
Subscriptions.wait_for(
"stream1",
2,
[consistency: ["handler1", "handler2", "handler3"]],
100
)
end
end

describe "expire stream acks" do
test "should expire stale acks" do
:ok = Subscriptions.register("handler1", :strong)
:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})

:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

assert Subscriptions.handled?("stream1", 1)

Expand All @@ -162,7 +250,12 @@ defmodule Commanded.SubscriptionsTest do

test "should not expire fresh acks" do
:ok = Subscriptions.register("handler1", :strong)
:ok = Subscriptions.ack_event("handler1", :strong, %RecordedEvent{stream_id: "stream1", stream_version: 1})

:ok =
Subscriptions.ack_event("handler1", :strong, %RecordedEvent{
stream_id: "stream1",
stream_version: 1
})

assert Subscriptions.handled?("stream1", 1)

Expand Down

0 comments on commit 10c9f88

Please sign in to comment.