Skip to content

Commit

Permalink
Better error Handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Adriano Santos committed Jan 24, 2023
1 parent 3edf875 commit 3637248
Show file tree
Hide file tree
Showing 23 changed files with 1,158 additions and 165 deletions.
6 changes: 3 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ config :logger,
backends: [:console],
truncate: 65536

# compile_time_purge_matching: [
# [level_lower_than: :info]
# ]
# compile_time_purge_matching: [
# [level_lower_than: :info]
# ]

# Our Console Backend-specific configuration
config :logger, :console,
Expand Down
124 changes: 72 additions & 52 deletions lib/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,24 @@ defmodule Actors do

@spec get_state(String.t(), String.t()) :: {:ok, term()} | {:error, term()}
def get_state(system_name, actor_name) do
retry with: exponential_backoff() |> randomize |> expiry(10_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection],
retry with: exponential_backoff() |> randomize |> expiry(30_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
rescue_only: [ErlangError] do
do_lookup_action(
system_name,
{false, system_name, actor_name, actor_name},
nil,
fn actor_ref, _actor_ref_id ->
ActorEntity.get_state(actor_ref)
end
)
try do
do_lookup_action(
system_name,
{false, system_name, actor_name, actor_name},
nil,
fn actor_ref, _actor_ref_id ->
ActorEntity.get_state(actor_ref)
end
)
rescue
e ->
Logger.error("Failure to make a call to actor #{inspect(actor_name)} #{inspect(e)}")

raise ErlangError
end
after
result -> result
else
Expand Down Expand Up @@ -186,43 +193,52 @@ defmodule Actors do
Tracer.with_span opts[:span_ctx], "client invoke", kind: :client do
Tracer.set_attributes(metadata_attributes)

retry with: exponential_backoff() |> randomize |> expiry(10_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection],
retry with: exponential_backoff() |> randomize |> expiry(60_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
rescue_only: [ErlangError] do
Tracer.add_event("lookup", [{"target", actor.id.name}])

actor_fqdn =
unless pooled? do
{pooled?, system.name, actor.id.name, actor.id.name}
else
case ActorRegistry.get_hosts_by_actor(system.name, actor.id.name) do
{:ok, actor_hosts} ->
host = Enum.random(actor_hosts)
{pooled?, system.name, host.actor.id.parent, actor.id.name}

_ ->
{pooled?, system.name, "#{actor.id.name}-1", actor.id.name}
try do
Tracer.add_event("lookup", [{"target", actor.id.name}])

actor_fqdn =
unless pooled? do
{pooled?, system.name, actor.id.name, actor.id.name}
else
case ActorRegistry.get_hosts_by_actor(system.name, actor.id.name) do
{:ok, actor_hosts} ->
host = Enum.random(actor_hosts)
{pooled?, system.name, host.actor.id.parent, actor.id.name}

_ ->
{pooled?, system.name, "#{actor.id.name}-1", actor.id.name}
end
end
end

do_lookup_action(system.name, actor_fqdn, system, fn actor_ref, actor_ref_id ->
%InvocationRequest{
actor: %Actor{} = actor
} = request
do_lookup_action(system.name, actor_fqdn, system, fn actor_ref, actor_ref_id ->
%InvocationRequest{
actor: %Actor{} = actor
} = request

request_params = %InvocationRequest{
request
| actor: %Actor{actor | id: actor_ref_id}
}
request_params = %InvocationRequest{
request
| actor: %Actor{actor | id: actor_ref_id}
}

if is_nil(request.scheduled_to) || request.scheduled_to == 0 do
maybe_invoke_async(async?, actor_ref, request_params, opts)
else
InvocationScheduler.schedule_invoke(request_params)
if is_nil(request.scheduled_to) || request.scheduled_to == 0 do
maybe_invoke_async(async?, actor_ref, request_params, opts)
else
InvocationScheduler.schedule_invoke(request_params)

{:ok, :async}
end
end)
{:ok, :async}
end
end)
rescue
e ->
Logger.error(
"Failure to make a call to actor #{inspect(actor.id.name)} #{inspect(e)}"
)

raise ErlangError
end
after
result -> result
else
Expand Down Expand Up @@ -357,25 +373,29 @@ defmodule Actors do
ActorRegistry.lookup(system_name, actor_name,
filter_by_parent: pooled,
parent: parent
),
{:ok, actor_ref} =
:erpc.call(
) do
case :erpc.call(
node,
__MODULE__,
:try_reactivate_actor,
[system, actor, opts],
@erpc_timeout
) do
Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])
{:ok, actor_ref} ->
Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])

Tracer.add_event("try-reactivate-actor", [
{"reactivation-on-node", "#{inspect(node)}"}
])
Tracer.add_event("try-reactivate-actor", [
{"reactivation-on-node", "#{inspect(node)}"}
])

if pooled,
# Ensures that the name change will not affect the host function call
do: action_fun.(actor_ref, %ActorId{actor.id | name: actor_name}),
else: action_fun.(actor_ref, actor.id)
if pooled,
# Ensures that the name change will not affect the host function call
do: action_fun.(actor_ref, %ActorId{actor.id | name: actor_name}),
else: action_fun.(actor_ref, actor.id)

_ ->
raise ErlangError
end
else
{:not_found, _} ->
Logger.error("Actor #{actor_name} not found on ActorSystem #{system_name}")
Expand Down
19 changes: 13 additions & 6 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Actors.Actor.Entity do
ActorState
}

@default_call_timeout :infinity
@fullsweep_after 10

@impl true
Expand Down Expand Up @@ -137,6 +138,7 @@ defmodule Actors.Actor.Entity do
Logger.warning(
"A conflict has been detected for ActorId #{inspect(id)}. Possible Actor Rebalance or NetSplit!
Trace Data: [
self: #{inspect(self())},
from: #{inspect(from)},
key: #{inspect(key)},
value: #{inspect(value)},
Expand All @@ -145,7 +147,7 @@ defmodule Actors.Actor.Entity do
] "
)

{:stop, :shutdown, state}
{:stop, :conflict, state}
end

defp do_handle_info(
Expand Down Expand Up @@ -181,29 +183,32 @@ defmodule Actors.Actor.Entity do
Lifecycle.terminate(action, state)
end

## Client APIs
def start_link(%EntityState{actor: %Actor{id: %ActorId{name: name} = _id}} = state) do
GenServer.start(__MODULE__, state,
GenServer.start_link(__MODULE__, state,
name: via(name),
spawn_opt: [fullsweep_after: @fullsweep_after]
)
end

@spec get_state(any) :: {:error, term()} | {:ok, term()}
def get_state(ref) when is_pid(ref) do
GenServer.call(ref, :get_state, 20_000)
GenServer.call(ref, :get_state, 30_000)
end

def get_state(ref) do
GenServer.call(via(ref), :get_state, 20_000)
GenServer.call(via(ref), :get_state, 30_000)
end

@spec invoke(any, any, any) :: any
def invoke(ref, request, opts) when is_pid(ref) do
GenServer.call(ref, {:invocation_request, request, opts}, 30_000)
timeout = Keyword.get(opts, :timeout, @default_call_timeout)
GenServer.call(ref, {:invocation_request, request, opts}, timeout)
end

def invoke(ref, request, opts) do
GenServer.call(via(ref), {:invocation_request, request, opts}, 30_000)
timeout = Keyword.get(opts, :timeout, @default_call_timeout)
GenServer.call(via(ref), {:invocation_request, request, opts}, timeout)
end

@spec invoke_async(any, any, any) :: :ok
Expand All @@ -215,6 +220,8 @@ defmodule Actors.Actor.Entity do
GenServer.cast(via(ref), {:invocation_request, request, opts})
end

## Private Functions

defp parse_packed_response(response) do
case response do
{:reply, response, state} -> {:reply, response, EntityState.pack(state)}
Expand Down
18 changes: 10 additions & 8 deletions lib/actors/actor/entity/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Actors.Actor.Entity.Supervisor do
def child_spec() do
{
PartitionSupervisor,
child_spec: DynamicSupervisor, name: __MODULE__
child_spec: DynamicSupervisor, name: __MODULE__, max_restarts: 100
}
end

Expand Down Expand Up @@ -41,17 +41,17 @@ defmodule Actors.Actor.Entity.Supervisor do
restart: :transient
}

case DynamicSupervisor.start_child(via(), child_spec) do
case DynamicSupervisor.start_child(via(child_spec), child_spec) do
{:error, {:already_started, pid}} ->
{:ok, pid}
pid

{:ok, pid} ->
{:ok, pid}

{:error, {:name_conflict, {{Actors.Actor.Entity, name}, _f}, _registry, pid}} ->
Logger.warning("Name conflict on start Actor #{name} from PID #{inspect(pid)}.")

if Process.alive?(pid), do: {:ok, pid}, else: {:error, :name_conflict}
:ignore
end
end

Expand All @@ -68,19 +68,21 @@ defmodule Actors.Actor.Entity.Supervisor do
restart: :transient
}

case DynamicSupervisor.start_child(via(), child_spec) do
case DynamicSupervisor.start_child(via(child_spec), child_spec) do
{:error, {:already_started, pid}} ->
{:ok, pid}
pid

{:ok, pid} ->
{:ok, pid}

{:error, {:name_conflict, {{Actors.Actor.Entity, name}, _f}, _registry, pid}} ->
Logger.warning("Name conflict on start Actor #{name} from PID #{inspect(pid)}.")

if Process.alive?(pid), do: {:ok, pid}, else: {:error, :name_conflict}
:ignore
end
end

defp via(), do: {:via, PartitionSupervisor, {__MODULE__, self()}}
defp get_key(spec), do: :erlang.phash2(Map.drop(spec, [:id]))

defp via(spec), do: {:via, PartitionSupervisor, {__MODULE__, get_key(spec)}}
end
4 changes: 3 additions & 1 deletion spawn_sdk/spawn_sdk_example/benchmark.exs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ Benchee.run(
# #Process.sleep(10000)
# Benchee.run(%{
# "Parallel Stateful Singleton Actor - Get State " => fn -> invok_get_state() end,
# "Parallel Stateful Abstract Spawn and Invoke Actor - Update State" => fn -> spawn_and_invoke() end,
# "Parallel Stateful Singleton Actor - Update State" => fn -> invoke_update_state() end,
# #"Parallel Stateful Abstract Spawn and Invoke Actor - Update State" => fn -> spawn_and_invoke() end,
# "Async Non Parallel Stateful Singleton Actor - Update State" => fn -> async_invoke_update_state() end,
# "Parallel Stateless Pooled Actor - Call Action " => fn -> spawn_invoke_pooled_actors() end
# },
# warmup: 10,
# parallel: 10,
# after_scenario: fn _ctx -> Process.sleep(5000) end,
# formatters: [
# {
# Benchee.Formatters.HTML,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule SpawnSdkExample.Actors.AbstractActor do
use SpawnSdk.Actor,
name: "abs_actor",
kind: :abstract,
deactivate_timeout: 60_000,
state_type: Io.Eigr.Spawn.Example.MyState

require Logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule SpawnSdkExample.Actors.JoeActor do
use SpawnSdk.Actor,
name: "joe",
state_type: Io.Eigr.Spawn.Example.MyState,
deactivate_timeout: 240_000,
deactivate_timeout: 60_000,
snapshot_timeout: 2_000

require Logger
Expand Down
Binary file not shown.

Large diffs are not rendered by default.

Loading

0 comments on commit 3637248

Please sign in to comment.