Skip to content

Commit

Permalink
Delete publishers via web interface(#121)
Browse files Browse the repository at this point in the history
* Add deleted publishers to cache when setup the system

* Delete publisher notify to all nodes

* Add missing tests

* Refactor to functions

* Fix method name

* Refactor

* Change method name

* No send messages when publisher is deleted
  • Loading branch information
jjponz authored Mar 8, 2021
1 parent 34e2d61 commit c3b142f
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 36 deletions.
43 changes: 33 additions & 10 deletions lib/postoffice/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,56 @@ defmodule Postoffice.Cache do
@impl true
def init(_args) do
Cachex.start_link(:postoffice, [])
warm_up()
initialize()
{:ok, %{}}
end

@impl true
def handle_info({:publisher_updated, publisher}, state) do
publisher
|> publisher_updated

{:noreply, state}
end

@impl true
def handle_info({:publisher_deleted, publisher}, state) do
publisher
|> publisher_deleted

{:noreply, state}
end

def initialize() do
Messaging.list_disabled_publishers()
|> Enum.map(fn publisher -> publisher.id end)
|> add_publishers(:disabled)

Messaging.list_deleted_publishers()
|> Enum.map(fn publisher -> publisher.id end)
|> add_publishers(:deleted)
end

def publisher_updated(publisher) do
case publisher.active do
false ->
Cachex.put(:postoffice, publisher.id, :disabled)

true ->
Cachex.del(:postoffice, publisher.id)
end

{:noreply, state}
end

defp warm_up() do
Messaging.list_disabled_publishers()
|> Enum.map(fn publisher -> publisher.id end)
|> warm_publishers()
def publisher_deleted(publisher) do
Cachex.del(:postoffice, publisher.id)
Cachex.put(:postoffice, publisher.id, :deleted)
end

defp warm_publishers(disabled_publishers_ids) when disabled_publishers_ids == [], do: :ok

defp warm_publishers(disabled_publishers_ids) do
ids_tuples = Enum.map(disabled_publishers_ids, fn id -> {id, :disabled} end)
defp add_publishers(disabled_publishers_ids, state) when disabled_publishers_ids == [], do: :ok

defp add_publishers(disabled_publishers_ids, state) do
ids_tuples = Enum.map(disabled_publishers_ids, fn id -> {id, state} end)
Cachex.put_many(:postoffice, ids_tuples)
PubSub.subscribe(Postoffice.PubSub, "publishers")
end
Expand Down
29 changes: 26 additions & 3 deletions lib/postoffice/messaging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ defmodule Postoffice.Messaging do
|> Repo.all()
end

def list_publishers do
from(p in Publisher, preload: [:topic])
def list_no_deleted_publishers do
from(p in Publisher, preload: [:topic], where: p.deleted == false)
|> Repo.all()
end

@spec list_disabled_publishers :: any
def list_disabled_publishers do
from(p in Publisher, where: p.active == false)
from(p in Publisher, where: p.active == false and p.deleted == false)
|> Repo.all()
end

Expand Down Expand Up @@ -298,4 +298,27 @@ defmodule Postoffice.Messaging do

{:ok, publisher}
end

def delete_publisher(id) do
case get_publisher!(id) do
nil ->
{:deleting_error}

publisher ->
publisher
|> Publisher.changeset(%{deleted: true})
|> perform_delete_publisher()
end
end

defp perform_delete_publisher(changeset) do
Repo.update(changeset)
|> broadcast_publisher(:publisher_deleted)
end


def list_deleted_publishers do
from(p in Publisher, where: p.deleted == true)
|> Repo.all()
end
end
18 changes: 12 additions & 6 deletions lib/postoffice/workers/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ defmodule Postoffice.Workers.Http do
target: target
)

case check_publisher_active(consumer_id) do
true ->
case check_publisher_state(consumer_id) do
:deleted ->
{:discard, "Deleted publisher"}

:active ->
publish(id, args)

false ->
:disabled ->
Logger.info("Do not process task as publisher is disabled", publisher_id: consumer_id)
{:snooze, @snooze_seconds}
end
Expand Down Expand Up @@ -87,13 +90,16 @@ defmodule Postoffice.Workers.Http do
end
end

defp check_publisher_active(publisher_id) do
defp check_publisher_state(publisher_id) do
case Cachex.get(:postoffice, publisher_id) do
{:ok, :disabled} ->
false
:disabled

{:ok, :deleted} ->
:deleted

{:ok, nil} ->
true
:active
end
end

Expand Down
12 changes: 2 additions & 10 deletions lib/postoffice_web/controllers/api/publisher_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,8 @@ defmodule PostofficeWeb.Api.PublisherController do
end

def delete(conn, %{"id" => id}) do
case Messaging.get_publisher!(id) do
nil ->
{:deleting_error}

publisher ->
publisher
|> Publisher.changeset(%{deleted: true})
|> Messaging.update_publisher()

send_resp(conn, :no_content, "")
with {:ok, _} <- Messaging.delete_publisher(id) do
send_resp(conn, :no_content, "")
end
end
end
10 changes: 9 additions & 1 deletion lib/postoffice_web/controllers/publisher_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule PostofficeWeb.PublisherController do
end

def index(conn, _params) do
publishers = Messaging.list_publishers()
publishers = Messaging.list_no_deleted_publishers()
render(conn, "index.html", publishers: publishers, page_name: "Publishers")
end

Expand Down Expand Up @@ -60,4 +60,12 @@ defmodule PostofficeWeb.PublisherController do
render(conn, "edit.html", publisher: publisher, changeset: changeset)
end
end

def delete(conn, %{"id" => id}) do
with {:ok, _} <- Messaging.delete_publisher(id) do
conn
|> put_flash(:info, "http_consumer deleted successfully.")
|> redirect(to: Routes.publisher_path(conn, :index))
end
end
end
2 changes: 1 addition & 1 deletion lib/postoffice_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule PostofficeWeb.Router do

resources "/topics", TopicController, only: [:index, :new, :create]

resources "/publishers", PublisherController, only: [:index, :new, :create, :edit, :update]
resources "/publishers", PublisherController, only: [:index, :new, :create, :edit, :update, :delete]

resources "/messages", MessageController, only: [:index, :show]

Expand Down
1 change: 1 addition & 0 deletions lib/postoffice_web/templates/publisher/index.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<td><i class="material-icons" style="color:#cc0000">toggle_off</i></td>
<% end %>
<td><%= publisher.type %></td>
<td><%= link("Delete", to: Routes.publisher_path(@conn, :delete, publisher.id), method: :delete, data: [confirm: "Are you sure?"], class: "btn btn-danger") %></td>
</tr>
<% end %>
</tbody>
Expand Down
85 changes: 85 additions & 0 deletions test/postoffice/cache_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Postoffice.CacheTest do
use Postoffice.DataCase, async: true

alias Postoffice.Cache
alias Postoffice.Fixtures

@deleted_publisher_attrs %{
active: false,
target: "http://fake.target3",
initial_message: 0,
type: "http",
deleted: true
}

@disabled_publisher_attrs %{
active: false,
target: "http://fake.another.target",
initial_message: 0,
type: "http",
deleted: false
}

@active_publisher_attrs %{
active: true,
target: "http://fake.last.target3",
initial_message: 0,
type: "http",
deleted: false
}

describe "cache" do
test "initialize cache with publishers" do
topic = Fixtures.create_topic()
active_publisher = Fixtures.create_publisher(topic, @active_publisher_attrs)
deleted_publisher = Fixtures.create_publisher(topic, @deleted_publisher_attrs)
disabled_publisher = Fixtures.create_publisher(topic, @disabled_publisher_attrs)

Cache.initialize()

assert Cachex.get(:postoffice, active_publisher.id) == {:ok, nil}
assert Cachex.get(:postoffice, disabled_publisher.id) == {:ok, :disabled}
assert Cachex.get(:postoffice, deleted_publisher.id) == {:ok, :deleted}
end

test "add to cache disabled publisher" do
topic = Fixtures.create_topic()
disabled_publisher = Fixtures.create_publisher(topic, @disabled_publisher_attrs)

Cache.publisher_updated(disabled_publisher)

assert Cachex.get(:postoffice, disabled_publisher.id) == {:ok, :disabled}

end

test "Remove from cache when activate a disabled publisher" do
topic = Fixtures.create_topic()
active_publisher = Fixtures.create_publisher(topic, @active_publisher_attrs)
Cachex.put(:postoffice, active_publisher.id, :disabled)

Cache.publisher_updated(active_publisher)

assert Cachex.get(:postoffice, active_publisher.id) == {:ok, nil}

end

test "add to cache deleted publisher" do
topic = Fixtures.create_topic()
deleted_publisher = Fixtures.create_publisher(topic, @deleted_publisher_attrs)

Cache.publisher_deleted(deleted_publisher)

assert Cachex.get(:postoffice, deleted_publisher.id) == {:ok, :deleted}
end

test "add to cache deleted when is deactivated first publisher" do
topic = Fixtures.create_topic()
deleted_publisher = Fixtures.create_publisher(topic, @deleted_publisher_attrs)
Cachex.put(:postoffice, deleted_publisher.id, :disabled)

Cache.publisher_deleted(deleted_publisher)

assert Cachex.get(:postoffice, deleted_publisher.id) == {:ok, :deleted}
end
end
end
27 changes: 27 additions & 0 deletions test/postoffice/http_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ defmodule Postoffice.HttpWorkerTest do

setup [:set_mox_from_context, :verify_on_exit!]

@deleted_publisher_attrs %{
active: true,
target: "http://fake.target3",
initial_message: 0,
type: "http",
deleted: true
}

describe "HttpWorker tests" do
test "message is successfully sent" do
topic = Fixtures.create_topic()
Expand Down Expand Up @@ -141,4 +149,23 @@ defmodule Postoffice.HttpWorkerTest do

assert {:ok, sent} = perform_job(HttpWorker, args, attempt: 100)
end

test "not sent message when worker is deleted" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic, @deleted_publisher_attrs)
Cachex.put(:postoffice, publisher.id, :deleted)

args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test"},
"attributes" => %{"hive_id" => "vlc"}
}

assert {:discard, "Deleted publisher"} = perform_job(HttpWorker, args)

assert Kernel.length(all_enqueued(queue: :http)) == 0
assert Kernel.length(HistoricalData.list_sent_messages()) == 0
assert Kernel.length(HistoricalData.list_failed_messages()) == 0
end
end
16 changes: 11 additions & 5 deletions test/postoffice/messaging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,19 @@ defmodule Postoffice.MessagingTest do
assert Messaging.list_topics() == []
end

test "list_publishers/0 returns empty list if no publisher exists" do
assert Messaging.list_publishers() == []
test "list_no_deleted_publishers/0 returns empty list if no publisher exists" do
assert Messaging.list_no_deleted_publishers() == []
end

test "list_publishers/0 returns all existing publishers" do
publisher = Fixtures.create_publisher(Fixtures.create_topic())
listed_publisher = List.first(Messaging.list_publishers())
test "list_no_deleted_publishers/0 returns all existing no deleted publishers" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)
disabled_publisher = Fixtures.create_publisher(topic, @deleted_publisher_attrs)

publishers = Messaging.list_no_deleted_publishers()
assert length(publishers) == 1

listed_publisher = List.first(publishers)

assert publisher.id == listed_publisher.id
assert publisher.target == listed_publisher.target
Expand Down
13 changes: 13 additions & 0 deletions test/postoffice_web/controllers/api/publisher_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule PostofficeWeb.Api.PublisherControllerTest do
alias Postoffice.Messaging
alias Postoffice.Messaging.Publisher
alias Postoffice.Repo
alias Phoenix.PubSub

@valid_http_publisher_payload %{
active: true,
Expand Down Expand Up @@ -152,6 +153,18 @@ defmodule PostofficeWeb.Api.PublisherControllerTest do
assert created_publisher.deleted == true
end

test "Delete publisher broadcast the publisher updated", %{conn: conn} do
PubSub.subscribe(Postoffice.PubSub, "publishers")

publisher =
Fixtures.create_topic()
|> Fixtures.create_publisher()

conn = delete(conn, Routes.api_publisher_path(conn, :delete, publisher))

assert_receive {:publisher_deleted, publisher}
end

test "Returns 400 when can not delete publisher", %{conn: conn} do
Fixtures.create_topic()
|> Fixtures.create_publisher()
Expand Down
Loading

0 comments on commit c3b142f

Please sign in to comment.