Skip to content

Commit

Permalink
Merge pull request eigr#200 from eigr/feat/cross-border
Browse files Browse the repository at this point in the history
Feat. Cross border
  • Loading branch information
sleipnir authored May 17, 2023
2 parents c3cb7a0 + eea2f97 commit bba9883
Show file tree
Hide file tree
Showing 39 changed files with 601 additions and 163 deletions.
4 changes: 2 additions & 2 deletions examples/k8s/test/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ spec:
name: proxy-https
protocol: TCP
resources:
limits:
memory: 1Gi
requests:
cpu: 100m
memory: 80Mi
ephemeral-storage: "100Ki"
volumeMounts:
- mountPath: /app/certs
name: volume-certs
Expand Down
96 changes: 82 additions & 14 deletions lib/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Actors do

alias Actors.Registry.{ActorRegistry, HostActor}

alias Actors.Config.Vapor, as: Config

alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
Expand All @@ -26,6 +28,7 @@ defmodule Actors do
}

alias Eigr.Functions.Protocol.{
ActorInvocationResponse,
InvocationRequest,
ProxyInfo,
RegistrationRequest,
Expand All @@ -37,6 +40,7 @@ defmodule Actors do
}

alias Sidecar.Measurements
alias Spawn.Utils.Nats

import Spawn.Utils.Common, only: [to_existing_atom_or_new: 1]

Expand Down Expand Up @@ -176,24 +180,88 @@ defmodule Actors do
"""
@spec invoke(InvocationRequest.t()) :: {:ok, :async} | {:ok, term()} | {:error, term()}
def invoke(
%InvocationRequest{} = request,
%InvocationRequest{
system: %ActorSystem{name: system_name} = _system
} = request,
opts \\ []
) do
invoke_with_span(request, opts)
case Config.get(Actors, :actor_system_name) do
name when name === system_name ->
invoke_with_span(request, opts)

_ ->
invoke_with_nats(request, opts)
end
end

defp invoke_with_span(
%InvocationRequest{
actor: %Actor{} = actor,
system: %ActorSystem{} = system,
command_name: command_name,
async: async?,
metadata: metadata,
caller: caller,
pooled: pooled?
} = request,
opts
) do
@doc """
Makes a request to an actor using Nats broker.
* `request` - The InvocationRequest
* `opts` - The options to Invoke Actors
##
"""
@spec invoke(InvocationRequest.t()) :: {:ok, :async} | {:ok, term()} | {:error, term()}
def invoke_with_nats(
%InvocationRequest{
actor: actor,
system: %ActorSystem{name: system_name} = _system,
async: async?
} = request,
opts \\ []
) do
{_current, opts} =
Keyword.get_and_update(opts, :span_ctx, fn span_ctx ->
maybe_include_span(span_ctx)
end)

trace_context = :otel_propagator_text_map.inject_from(opts[:span_ctx], [])

opts =
Keyword.put(opts, :trace_context, trace_context)
|> Keyword.merge(async: async?)

case Nats.request(system_name, request, opts) do
{:ok, %{body: {:error, error}}} ->
{:error, error}

{:ok, %{body: :async}} ->
{:ok, :async}

{:ok, %{body: body}} when is_binary(body) ->
{:ok, ActorInvocationResponse.decode(body)}

{:ok, %{body: _body}} ->
{:error, :bad_response_type}

{:error, :no_responders} ->
Logger.error("Actor #{actor.id.name} not found on ActorSystem #{system_name}")
{:error, :not_found}

{:error, :timeout} ->
Logger.error(
"A timeout occurred while invoking the Actor #{actor.id.name} on ActorSystem #{system_name}"
)

{:error, :timeout}

{:error, error} ->
{:error, error}
end
end

def invoke_with_span(
%InvocationRequest{
actor: %Actor{} = actor,
system: %ActorSystem{} = system,
command_name: command_name,
async: async?,
metadata: metadata,
caller: caller,
pooled: pooled?
} = request,
opts
) do
{time, result} =
:timer.tc(fn ->
metadata_attributes =
Expand Down
4 changes: 2 additions & 2 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ defmodule Actors.Actor.Entity do
defp do_handle_info(
message,
%EntityState{
actor: %Actor{id: %ActorId{name: name} = _id, state: actor_state}
actor: %Actor{id: %ActorId{name: name} = id, state: actor_state}
} = state
) do
Logger.warning(
"No handled internal message for actor #{name}. Message: #{inspect(message)}. Actor state: #{inspect(state)}"
)

if not is_nil(actor_state), do: StateManager.save(name, actor_state)
if not is_nil(actor_state), do: StateManager.save(id, actor_state)

{:noreply, state, :hibernate}
end
Expand Down
13 changes: 7 additions & 6 deletions lib/actors/actor/entity/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ defmodule Actors.Actor.Entity.Lifecycle do
def load_state(
%EntityState{
actor:
%Actor{settings: %ActorSettings{stateful: true}, id: %ActorId{name: name}} = actor
%Actor{settings: %ActorSettings{stateful: true}, id: %ActorId{name: name} = id} =
actor
} = state
) do
if is_nil(actor.state) or (!is_nil(actor.state) and is_nil(actor.state.state)) do
Expand All @@ -88,7 +89,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
end
|> Logger.debug()

case StateManager.load(name) do
case StateManager.load(id) do
{:ok, current_state} ->
{:noreply, %EntityState{state | actor: %Actor{actor | state: current_state}},
{:continue, :call_init_action}}
Expand All @@ -107,13 +108,13 @@ defmodule Actors.Actor.Entity.Lifecycle do

def terminate(reason, %EntityState{
actor: %Actor{
id: %ActorId{name: name} = _id,
id: %ActorId{name: name} = id,
settings: %ActorSettings{stateful: stateful},
state: actor_state
}
}) do
if stateful && !is_nil(actor_state) do
StateManager.save(name, actor_state)
StateManager.save(id, actor_state)
end

Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
Expand Down Expand Up @@ -148,7 +149,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
state_hash: old_hash,
actor:
%Actor{
id: %ActorId{name: name} = _id,
id: %ActorId{name: name} = id,
state: %ActorState{} = actor_state,
settings: %ActorSettings{
stateful: true,
Expand All @@ -167,7 +168,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
Logger.debug("Snapshotting actor #{name}")

# Execute with timeout equals timeout strategy - 1 to avoid mailbox congestions
case StateManager.save_async(name, actor_state, timeout - 1) do
case StateManager.save_async(id, actor_state, timeout - 1) do
{:ok, _, hash} ->
{:noreply, %{state | state_hash: hash}, :hibernate}

Expand Down
61 changes: 35 additions & 26 deletions lib/actors/actor/state_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
`StateManager` Implements behavior that allows an Actor's state to be saved
to persistent storage using database drivers.
"""

@behaviour Actors.Actor.StateManager.Behaviour

require Logger

alias Eigr.Functions.Protocol.Actors.ActorState
alias Eigr.Functions.Protocol.Actors.{ActorId, ActorState}
alias Google.Protobuf.Any
alias Statestores.Schemas.Event
alias Statestores.Manager.StateManager, as: StateStoreManager

@impl true
def is_new?(_old_hash, new_state) when is_nil(new_state), do: false

def is_new?(old_hash, new_state) do
Expand All @@ -30,10 +26,11 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
{:error, error}
end

@impl true
@spec load(String.t()) :: {:ok, any}
def load(name) do
case StateStoreManager.load(name) do
@spec load(ActorId.t()) :: {:ok, any}
def load(%ActorId{name: name, system: system} = _actor_id) do
key = generate_key(system, name)

case StateStoreManager.load(key) do
%Event{revision: _rev, tags: tags, data_type: type, data: data} = _event ->
{:ok, %ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}}}

Expand All @@ -45,23 +42,28 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
{:error, error}
end

@impl true
@spec save(String.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
@spec save(ActorId.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save(_name, nil), do: {:ok, nil}
def save(_actor_id, nil), do: {:ok, nil}

def save(_name, %ActorState{state: actor_state} = _state)
def save(_actor_id, %ActorState{state: actor_state} = _state)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}

def save(name, %ActorState{tags: tags, state: actor_state} = _state) do
def save(
%ActorId{name: name, system: system} = _actor_id,
%ActorState{tags: tags, state: actor_state} = _state
) do
Logger.debug("Saving state for actor #{name}")

with bytes_from_state <- Any.encode(actor_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
hash <- :crypto.hash(:sha256, bytes_from_state),
key <- generate_key(system, name) do
%Event{
id: key,
actor: name,
system: system,
revision: 0,
tags: tags,
data_type: actor_state.type_url,
Expand All @@ -84,26 +86,33 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
{:error, error, actor_state}
end

@impl true
@spec save_async(String.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
@spec save_async(ActorId.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save_async(name, state, timeout \\ 5000)
def save_async(_name, nil, _timeout), do: {:ok, %{}}
def save_async(actor_id, state, timeout \\ 5000)

def save_async(_name, %ActorState{state: actor_state} = _state, _timeout)
def save_async(_actor_id, nil, _timeout), do: {:ok, %{}}

def save_async(_actor_id, %ActorState{state: actor_state} = _state, _timeout)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}

def save_async(name, %ActorState{tags: tags, state: actor_state} = _state, timeout) do
def save_async(
%ActorId{name: name, system: system} = _actor_id,
%ActorState{tags: tags, state: actor_state} = _state,
timeout
) do
parent = self()

persist_data_task =
Task.async(fn ->
Logger.debug("Saving state for actor #{name}")
key = generate_key(system, name)

%Event{
id: key,
actor: name,
system: system,
revision: 0,
tags: tags,
data_type: actor_state.type_url,
Expand Down Expand Up @@ -139,6 +148,8 @@ if Code.ensure_loaded?(Statestores.Supervisor) do
end
end

defp generate_key(system, name), do: Base.encode16("#{system}:#{name}")

defp inserted_successfully?(ref, pid) do
receive do
{^ref, :ok} -> true
Expand All @@ -151,16 +162,14 @@ else
defmodule Actors.Actor.StateManager do
@moduledoc false

@behaviour Actors.Actor.StateManager.Behaviour

@not_loaded_message """
Statestores not loaded properly
If you are creating actors with flag `persistent: true` consider adding :spawn_statestores to your deps list
"""

def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message)
def load(_key), do: raise(@not_loaded_message)
def save(_name, _state), do: raise(@not_loaded_message)
def save_async(_name, _state, _timeout), do: raise(@not_loaded_message)
def load(_actor_id), do: raise(@not_loaded_message)
def save(_actor_id, _state), do: raise(@not_loaded_message)
def save_async(_actor_id, _state, _timeout), do: raise(@not_loaded_message)
end
end
Loading

0 comments on commit bba9883

Please sign in to comment.