Skip to content

Commit

Permalink
publish deleted records to subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
bbhoss committed Jul 19, 2019
1 parent c4ae49f commit 5151a3b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
21 changes: 18 additions & 3 deletions lib/cainophile/adapters/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Cainophile.Adapters.Postgres do
use GenServer
require Logger

alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord}
alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord, DeletedRecord}

alias PgoutputDecoder.Messages.{
Begin,
Expand Down Expand Up @@ -99,18 +99,17 @@ defmodule Cainophile.Adapters.Postgres do
defp process_message(%Insert{} = msg, state) do
relation = Map.get(state.relations, msg.relation_id)

# TODO: Typecast to meaningful Elixir types here later
data = data_tuple_to_map(relation.columns, msg.tuple_data)

new_record = %NewRecord{relation: {relation.namespace, relation.name}, record: data}

{lsn, txn} = state.transaction
%{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}}
end

defp process_message(%Update{} = msg, state) do
relation = Map.get(state.relations, msg.relation_id)

# TODO: Typecast to meaningful Elixir types here later
old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data)
data = data_tuple_to_map(relation.columns, msg.tuple_data)

Expand All @@ -124,8 +123,24 @@ defmodule Cainophile.Adapters.Postgres do
%{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}}
end

defp process_message(%Delete{} = msg, state) do
relation = Map.get(state.relations, msg.relation_id)

data =
data_tuple_to_map(
relation.columns,
msg.old_tuple_data || msg.changed_key_tuple_data
)

new_record = %DeletedRecord{relation: {relation.namespace, relation.name}, old_record: data}

{lsn, txn} = state.transaction
%{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}}
end

defp process_message(_, state), do: state

# TODO: Typecast to meaningful Elixir types here later
defp data_tuple_to_map(columns, tuple_data) do
for {column, index} <- Enum.with_index(columns, 1),
do: {column.name, :erlang.element(index, tuple_data)},
Expand Down
1 change: 1 addition & 0 deletions lib/cainophile/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ defmodule Cainophile.Changes do
defmodule(Transaction, do: defstruct([:changes, :commit_timestamp]))
defmodule(NewRecord, do: defstruct([:relation, :record]))
defmodule(UpdatedRecord, do: defstruct([:relation, :old_record, :record]))
defmodule(DeletedRecord, do: defstruct([:relation, :old_record]))
end
47 changes: 46 additions & 1 deletion test/cainophile/adapters/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Cainophile.Adapters.PostgresTest do
use ExUnit.Case
import Mox
alias Cainophile.Adapters.{Postgres, Postgres.State}
alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord}
alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord, DeletedRecord}

# TODO: Ideally abstract this out so we can mock out pgdecoder with higher level constructs
@insert_txn_bins [
Expand Down Expand Up @@ -46,6 +46,24 @@ defmodule Cainophile.Adapters.PostgresTest do
201, 23, 156>>
]

@delete_txn_bins [
# Begin
<<66, 0, 0, 0, 2, 167, 249, 128, 144, 0, 2, 49, 15, 72, 201, 23, 156, 0, 0, 2, 173>>,
# Type
<<89, 0, 0, 128, 52, 112, 117, 98, 108, 105, 99, 0, 101, 120, 97, 109, 112, 108, 101, 95, 116,
121, 112, 101, 0>>,
# Relation
<<82, 0, 0, 96, 0, 112, 117, 98, 108, 105, 99, 0, 102, 111, 111, 0, 102, 0, 3, 1, 98, 97, 114,
0, 0, 0, 0, 25, 255, 255, 255, 255, 1, 105, 100, 0, 0, 0, 0, 23, 255, 255, 255, 255, 1, 99,
117, 115, 116, 111, 109, 95, 116, 121, 112, 101, 0, 0, 0, 128, 52, 255, 255, 255, 255>>,
# Delete
<<68, 0, 0, 96, 0, 79, 0, 3, 116, 0, 0, 0, 7, 99, 104, 97, 110, 103, 101, 100, 116, 0, 0, 0,
3, 53, 56, 51, 116, 0, 0, 0, 8, 40, 97, 98, 99, 100, 101, 102, 41>>,
# Commit
<<67, 0, 0, 0, 0, 2, 167, 249, 128, 144, 0, 0, 0, 2, 167, 249, 128, 192, 0, 2, 49, 15, 72,
201, 23, 156>>
]

doctest Cainophile.Adapters.Postgres

setup_all do
Expand Down Expand Up @@ -146,6 +164,29 @@ defmodule Cainophile.Adapters.PostgresTest do
# Use inspect as we don't care about microseconds
assert inspect(timestamp) == inspect(expected_dt)
end

test "publishes delete transaction to pid subscribers", %{processor: processor} do
for msg <- generate_delete_transaction(), do: send(processor, msg)

assert_receive(%Transaction{
commit_timestamp: timestamp,
changes: [
%DeletedRecord{
relation: {"public", "foo"},
old_record: %{
"bar" => "changed",
"id" => "583",
"custom_type" => "(abcdef)"
}
}
]
})

{:ok, expected_dt, _} = DateTime.from_iso8601("2019-07-19T22:47:48Z")

# Use inspect as we don't care about microseconds
assert inspect(timestamp) == inspect(expected_dt)
end
end

defp generate_insert_transaction() do
Expand All @@ -156,6 +197,10 @@ defmodule Cainophile.Adapters.PostgresTest do
for bin <- @insert_and_update_txn_bins, do: generate_epgsql_message(bin)
end

defp generate_delete_transaction() do
for bin <- @delete_txn_bins, do: generate_epgsql_message(bin)
end

defp generate_epgsql_message(binary) do
{:epgsql, self(), {:x_log_data, 0, 0, binary}}
end
Expand Down

0 comments on commit 5151a3b

Please sign in to comment.