Skip to content

Commit

Permalink
add support for persistent replication slots
Browse files Browse the repository at this point in the history
  • Loading branch information
bbhoss committed Jul 20, 2019
1 parent 29dd31b commit 4c72091
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 14 deletions.
14 changes: 7 additions & 7 deletions lib/cainophile/adapters/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ defmodule Cainophile.Adapters.Postgres do

@impl true
def init(config) do
adapter_impl =
Keyword.get(config, :postgres_adapter, Cainophile.Adapters.Postgres.EpgsqlImplementation)

adapter_impl.init(config)
adapter_impl(config).init(config)
end

@impl true
Expand Down Expand Up @@ -91,11 +88,12 @@ defmodule Cainophile.Adapters.Postgres do
end

defp process_message(
%Commit{lsn: commit_lsn},
%Commit{lsn: commit_lsn, end_lsn: end_lsn},
%State{transaction: {current_txn_lsn, txn}} = state
)
when commit_lsn == current_txn_lsn do
notify_subscribers(txn, state.subscribers)
:ok = adapter_impl(state.config).acknowledge_lsn(state.connection, end_lsn)

%{state | transaction: nil}
end
Expand Down Expand Up @@ -170,8 +168,6 @@ defmodule Cainophile.Adapters.Postgres do
}
end

defp process_message(_, state), do: state

# TODO: Typecast to meaningful Elixir types here later
defp data_tuple_to_map(_columns, nil), do: %{}

Expand All @@ -190,6 +186,10 @@ defmodule Cainophile.Adapters.Postgres do
for sub <- subscribers, is_function(sub), do: sub.(txn)
end

defp adapter_impl(config) do
Keyword.get(config, :postgres_adapter, Cainophile.Adapters.Postgres.EpgsqlImplementation)
end

# Client

def subscribe(pid, receiver_pid) when is_pid(receiver_pid) do
Expand Down
2 changes: 2 additions & 0 deletions lib/cainophile/adapters/postgres/adapter_behavior.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Cainophile.Adapters.Postgres.AdapterBehaviour do
@callback init(config :: term) ::
{:ok, %Cainophile.Adapters.Postgres.State{}} | {:stop, reason :: binary}

@callback acknowledge_lsn(connection :: pid, {xlog :: integer, offset :: integer}) :: :ok
end
31 changes: 29 additions & 2 deletions lib/cainophile/adapters/postgres/epgsql_implementation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,39 @@ defmodule Cainophile.Adapters.Postgres.EpgsqlImplementation do
end
end

@impl true
def acknowledge_lsn(epgsql, {xlog, offset} = lsn_tup) do
decimal_lsn = lsn_tuple_to_decimal(lsn_tup)

:epgsql.standby_status_update(epgsql, decimal_lsn, decimal_lsn)
end

defp lsn_tuple_to_decimal({xlog, offset}) do
<<decimal_lsn::integer-64>> = <<xlog::integer-32, offset::integer-32>>
decimal_lsn
end

defp create_replication_slot(epgsql_pid, slot) do
{slot_name, start_replication_command} =
case slot do
name when is_binary(name) ->
# TODO
{name, "SELECT 1;"}
# Simple query for replication mode so no prepared statements are supported
escaped_name = String.replace(name, "'", "\\'")

{:ok, _, [{existing_slot}]} =
:epgsql.squery(
epgsql_pid,
"SELECT COUNT(*) >= 1 FROM pg_replication_slots WHERE slot_name = '#{escaped_name}'"
)

case existing_slot do
"t" ->
# no-op
{name, "SELECT 1"}

"f" ->
{name, "CREATE_REPLICATION_SLOT #{escaped_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT"}
end

:temporary ->
slot_name = self_as_slot_name()
Expand Down
38 changes: 33 additions & 5 deletions test/cainophile/adapters/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,16 @@ defmodule Cainophile.Adapters.PostgresTest do
:ok
end

# Make sure mocks are verified when the test exits
setup :set_mox_global
setup :create_mocks

setup do
expect(PostgresMock, :init, fn config ->
{:ok, %State{connection: self(), config: config, subscribers: []}}
end)

{:ok, pid} = Postgres.start_link(postgres_adapter: PostgresMock)

%{processor: pid}
end

# Make sure mocks are verified when the test exits
setup :verify_on_exit!

test "allows subscribing to changes by pid", %{processor: processor} do
Expand Down Expand Up @@ -229,6 +227,22 @@ defmodule Cainophile.Adapters.PostgresTest do
# Use inspect as we don't care about microseconds
assert inspect(timestamp) == inspect(expected_dt)
end

test "acknowledges changes on commit", %{processor: processor} do
test_runner_pid = self()

expect(PostgresMock, :acknowledge_lsn, fn connection, lsn_tup ->
assert connection == test_runner_pid
assert lsn_tup == {2, 2_818_146_496}

send(test_runner_pid, {:acknowledged, connection, lsn_tup})
:ok
end)

for msg <- generate_truncate_transaction(), do: send(processor, msg)

assert_receive({:acknowledged, ^test_runner_pid, {2, 2_818_146_496}})
end
end

defp generate_insert_transaction() do
Expand All @@ -250,4 +264,18 @@ defmodule Cainophile.Adapters.PostgresTest do
defp generate_epgsql_message(binary) do
{:epgsql, self(), {:x_log_data, 0, 0, binary}}
end

defp create_mocks(ctx) do
test_runner_pid = self()

expect(PostgresMock, :init, fn config ->
{:ok, %State{connection: test_runner_pid, config: config, subscribers: []}}
end)

stub(PostgresMock, :acknowledge_lsn, fn _connection, _lsn_tup ->
:ok
end)

ctx
end
end

0 comments on commit 4c72091

Please sign in to comment.