Skip to content

Commit

Permalink
Oban cron to clean historical data from sent_messages table. (#148)
Browse files Browse the repository at this point in the history
* Naive implementation

* Use env variables

* Add crontab env variable

* Update README

* Format code
  • Loading branch information
elreplicante authored Sep 29, 2021
1 parent 532eb2d commit d4ee134
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 1 deletion.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ To start your Phoenix server:
* Start Phoenix endpoint with `mix phx.server`
Now you can visit [`localhost:4000`](http://localhost:4000) from your browser or run tests with `mix test`

## Environment variables

* `GOOGLE_APPLICATION_CREDENTIALS` is the Google Cloud Platform service account json path.
* `GCLOUD_PUBSUB_PROJECT_ID` is the Google Cloud Platform project where your PubSub topics/subscriptions are located.
* `MAX_BULK_MESSAGES` is the maximum number of messages that Postoffice would be able to instert in bulk.
* `CLEAN_MESSAGES_THRESHOLD` defines from what time you want to keep the historical data from the `sent_messages` table (in seconds)
* `CLEAN_MESSAGES_CRONTAB` defines when the Oban cronjob to clean historical data from the `sent_messages` table should be run. Must be a valid crontab declaration.

## Clustering
Postoffice has been developed to be used forming a cluster. We use [libcluster](https://github.com/bitwalker/libcluster) under the hood to create the cluster. You can take a look at its documentation in case you want to tune settings.

Expand Down
6 changes: 5 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ config :libcluster,
config :postoffice, Oban,
repo: Postoffice.Repo,
plugins: [
{Oban.Plugins.Pruner, max_age: String.to_integer(System.get_env("OBAN_PRUNER_MAX_AGE", "60"))}
{Oban.Plugins.Pruner, max_age: String.to_integer(System.get_env("OBAN_PRUNER_MAX_AGE", "60"))},
{Oban.Plugins.Cron,
crontab: [
{System.get_env("CLEAN_MESSAGES_CRONTAB"), Postoffice.Workers.CleanMessages},
]}
],
queues: [default: 10, http: 100, pubsub: 15]

Expand Down
2 changes: 2 additions & 0 deletions config/releases.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ config :postoffice, PostofficeWeb.Endpoint,

config :postoffice, pubsub_project_name: {:system, "GCLOUD_PUBSUB_PROJECT_ID", default: "test"}
config :postoffice, max_bulk_messages: {:system, "MAX_BULK_MESSAGES", default: 3000}
config :postoffice, clean_messages_threshold: {:system, "CLEAN_MESSAGES_THRESHOLD", default: "7890000"}
config :postoffice, clean_messages_crontab: {:system, "CLEAN_MESSAGES_CRONTAB", default: "0 12 * * 0"}

config :postoffice, Postoffice.Repo,
username: {:system, "DB_USERNAME", default: "postgres"},
Expand Down
11 changes: 11 additions & 0 deletions lib/postoffice/historical_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,15 @@ defmodule Postoffice.HistoricalData do
def change_failed_messages(%FailedMessages{} = failed_messages, attrs \\ %{}) do
FailedMessages.changeset(failed_messages, attrs)
end

def clean_sent_messages() do
threshold = Application.get_env(:postoffice, :clean_messages_threshold)
|> String.to_integer()

maintain_messages_from = DateTime.utc_now()
|> DateTime.add(-threshold, :second)

(from message in SentMessages, where: message.inserted_at < ^maintain_messages_from)
|> Repo.delete_all()
end
end
13 changes: 13 additions & 0 deletions lib/postoffice/workers/clean_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Postoffice.Workers.CleanMessages do
use Oban.Worker

alias Postoffice.HistoricalData


@impl Oban.Worker
def perform(_job) do
HistoricalData.clean_sent_messages()

:ok
end
end
51 changes: 51 additions & 0 deletions test/postoffice/historical_data_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Postoffice.HistoricalDataTest do
use Postoffice.DataCase

alias Postoffice.HistoricalData
alias Postoffice.FakeClock

describe "sent_messages" do
alias Postoffice.HistoricalData.SentMessages
Expand Down Expand Up @@ -137,4 +138,54 @@ defmodule Postoffice.HistoricalDataTest do
assert %Ecto.Changeset{} = HistoricalData.change_failed_messages(failed_messages)
end
end

describe "clean_messages" do
alias Postoffice.HistoricalData.SentMessages

@message_to_delete %{
attributes: %{"key" => "some attributes"},
consumer_id: 42,
message_id: 42,
payload: [%{"key" => "some payload"}]
}
@message_to_preserve %{
attributes: %{"key" => "some attributes"},
consumer_id: 43,
message_id: 43,
payload: [%{"key" => "some payload"}]
}

def clean_messages_fixture(attrs \\ %{}) do
{:ok, sent_messages} =
attrs
|> Enum.into(@message_to_delete)
|> HistoricalData.create_sent_messages()

{:ok, sent_messages} =
attrs
|> Enum.into(@message_to_preserve)
|> HistoricalData.create_sent_messages()

end

test "cleans historical data" do
clean_messages_fixture()

FakeClock.freeze(~U[2021-09-02 23:00:07Z])

Repo.get_by(SentMessages, message_id: 42)
|> change(%{inserted_at: ~N[2021-01-03 23:00:07]})
|> Repo.update()

message_to_preserve = Repo.get_by(SentMessages, message_id: 43)

HistoricalData.clean_sent_messages()

messages_count = Repo.one(from data in SentMessages, select: count(data.id))

assert messages_count == 1
assert Repo.one(SentMessages) == message_to_preserve
end

end
end
14 changes: 14 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,17 @@ Application.put_env(
:max_bulk_messages,
3
)


Application.put_env(
:postoffice,
:clean_messages_threshold,
"7890000"
)


defmodule Postoffice.FakeClock do
def freeze(%DateTime{} = on) do
Process.put(:mock_utc_now, on)
end
end

0 comments on commit d4ee134

Please sign in to comment.