diff --git a/lib/cainophile/adapters/postgres.ex b/lib/cainophile/adapters/postgres.ex index f7aa8d0..694dd06 100644 --- a/lib/cainophile/adapters/postgres.ex +++ b/lib/cainophile/adapters/postgres.ex @@ -14,7 +14,7 @@ defmodule Cainophile.Adapters.Postgres do use GenServer require Logger - alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord} + alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord, DeletedRecord} alias PgoutputDecoder.Messages.{ Begin, @@ -99,10 +99,10 @@ defmodule Cainophile.Adapters.Postgres do defp process_message(%Insert{} = msg, state) do relation = Map.get(state.relations, msg.relation_id) - # TODO: Typecast to meaningful Elixir types here later data = data_tuple_to_map(relation.columns, msg.tuple_data) new_record = %NewRecord{relation: {relation.namespace, relation.name}, record: data} + {lsn, txn} = state.transaction %{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}} end @@ -110,7 +110,6 @@ defmodule Cainophile.Adapters.Postgres do defp process_message(%Update{} = msg, state) do relation = Map.get(state.relations, msg.relation_id) - # TODO: Typecast to meaningful Elixir types here later old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data) data = data_tuple_to_map(relation.columns, msg.tuple_data) @@ -124,8 +123,24 @@ defmodule Cainophile.Adapters.Postgres do %{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}} end + defp process_message(%Delete{} = msg, state) do + relation = Map.get(state.relations, msg.relation_id) + + data = + data_tuple_to_map( + relation.columns, + msg.old_tuple_data || msg.changed_key_tuple_data + ) + + new_record = %DeletedRecord{relation: {relation.namespace, relation.name}, old_record: data} + + {lsn, txn} = state.transaction + %{state | transaction: {lsn, %{txn | changes: Enum.reverse([new_record | txn.changes])}}} + end + defp process_message(_, state), do: state + # TODO: Typecast to meaningful Elixir types here later defp data_tuple_to_map(columns, tuple_data) do for {column, index} <- Enum.with_index(columns, 1), do: {column.name, :erlang.element(index, tuple_data)}, diff --git a/lib/cainophile/changes.ex b/lib/cainophile/changes.ex index a3bb502..5142b15 100644 --- a/lib/cainophile/changes.ex +++ b/lib/cainophile/changes.ex @@ -2,4 +2,5 @@ defmodule Cainophile.Changes do defmodule(Transaction, do: defstruct([:changes, :commit_timestamp])) defmodule(NewRecord, do: defstruct([:relation, :record])) defmodule(UpdatedRecord, do: defstruct([:relation, :old_record, :record])) + defmodule(DeletedRecord, do: defstruct([:relation, :old_record])) end diff --git a/test/cainophile/adapters/postgres_test.exs b/test/cainophile/adapters/postgres_test.exs index 4198e46..977ac8b 100644 --- a/test/cainophile/adapters/postgres_test.exs +++ b/test/cainophile/adapters/postgres_test.exs @@ -2,7 +2,7 @@ defmodule Cainophile.Adapters.PostgresTest do use ExUnit.Case import Mox alias Cainophile.Adapters.{Postgres, Postgres.State} - alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord} + alias Cainophile.Changes.{Transaction, NewRecord, UpdatedRecord, DeletedRecord} # TODO: Ideally abstract this out so we can mock out pgdecoder with higher level constructs @insert_txn_bins [ @@ -46,6 +46,24 @@ defmodule Cainophile.Adapters.PostgresTest do 201, 23, 156>> ] + @delete_txn_bins [ + # Begin + <<66, 0, 0, 0, 2, 167, 249, 128, 144, 0, 2, 49, 15, 72, 201, 23, 156, 0, 0, 2, 173>>, + # Type + <<89, 0, 0, 128, 52, 112, 117, 98, 108, 105, 99, 0, 101, 120, 97, 109, 112, 108, 101, 95, 116, + 121, 112, 101, 0>>, + # Relation + <<82, 0, 0, 96, 0, 112, 117, 98, 108, 105, 99, 0, 102, 111, 111, 0, 102, 0, 3, 1, 98, 97, 114, + 0, 0, 0, 0, 25, 255, 255, 255, 255, 1, 105, 100, 0, 0, 0, 0, 23, 255, 255, 255, 255, 1, 99, + 117, 115, 116, 111, 109, 95, 116, 121, 112, 101, 0, 0, 0, 128, 52, 255, 255, 255, 255>>, + # Delete + <<68, 0, 0, 96, 0, 79, 0, 3, 116, 0, 0, 0, 7, 99, 104, 97, 110, 103, 101, 100, 116, 0, 0, 0, + 3, 53, 56, 51, 116, 0, 0, 0, 8, 40, 97, 98, 99, 100, 101, 102, 41>>, + # Commit + <<67, 0, 0, 0, 0, 2, 167, 249, 128, 144, 0, 0, 0, 2, 167, 249, 128, 192, 0, 2, 49, 15, 72, + 201, 23, 156>> + ] + doctest Cainophile.Adapters.Postgres setup_all do @@ -146,6 +164,29 @@ defmodule Cainophile.Adapters.PostgresTest do # Use inspect as we don't care about microseconds assert inspect(timestamp) == inspect(expected_dt) end + + test "publishes delete transaction to pid subscribers", %{processor: processor} do + for msg <- generate_delete_transaction(), do: send(processor, msg) + + assert_receive(%Transaction{ + commit_timestamp: timestamp, + changes: [ + %DeletedRecord{ + relation: {"public", "foo"}, + old_record: %{ + "bar" => "changed", + "id" => "583", + "custom_type" => "(abcdef)" + } + } + ] + }) + + {:ok, expected_dt, _} = DateTime.from_iso8601("2019-07-19T22:47:48Z") + + # Use inspect as we don't care about microseconds + assert inspect(timestamp) == inspect(expected_dt) + end end defp generate_insert_transaction() do @@ -156,6 +197,10 @@ defmodule Cainophile.Adapters.PostgresTest do for bin <- @insert_and_update_txn_bins, do: generate_epgsql_message(bin) end + defp generate_delete_transaction() do + for bin <- @delete_txn_bins, do: generate_epgsql_message(bin) + end + defp generate_epgsql_message(binary) do {:epgsql, self(), {:x_log_data, 0, 0, binary}} end