Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gen_fsm replaced by gen_statem in vnode #80

Closed
wants to merge 17 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into 19_3_gen_statem
  • Loading branch information
WoelkiM authored Oct 13, 2020
commit abee5519691ad25a98195c2c07fd2518ce246210
63 changes: 42 additions & 21 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@
core_status/1,
send_command_after/2]).

-export([cast_finish_handoff/1, send_an_event/2,
send_req/2, send_all_proxy_req/2, cancel_handoff/1,
handoff_complete/1, resize_transfer_complete/2,
handoff_data/3, unregistered/1]).
-export([cast_finish_handoff/1,
send_an_event/2,
send_req/2,
send_all_proxy_req/2,
cancel_handoff/1,
handoff_complete/1,
resize_transfer_complete/2,
handoff_data/3,
unregistered/1]).

-ifdef(TEST).

Expand Down Expand Up @@ -281,14 +286,17 @@ trigger_handoff(VNode, TargetIdx, TargetNode) ->
trigger_handoff(VNode, TargetNode) ->
gen_statem:cast(VNode, {trigger_handoff, TargetNode}).


%% #8 cast
trigger_delete(VNode) ->
gen_statem:cast(VNode, trigger_delete).


%% #9 cast
core_status(VNode) ->
gen_statem:call(VNode, core_status).


%% #10 %TODO
%% Sends a command to the FSM that called it after Time
%% has passed.
Expand All @@ -301,6 +309,7 @@ send_command_after(Time, Request) ->
self(),
{'$gen_cast', #riak_vnode_req_v1{request = Request}}).


%% #11 - riak_core_vnode_manager - handle_vnode_event
cast_finish_handoff(VNode) ->
gen_statem:cast(VNode, finish_handoff).
Expand All @@ -309,6 +318,7 @@ cast_finish_handoff(VNode) ->
cancel_handoff(VNode) ->
gen_statem:cast(VNode, cancel_handoff).


%% #13 - riak_core_vnode_master - send_an_event
send_an_event(VNode, Event) ->
gen_statem:cast(VNode, Event).
Expand Down Expand Up @@ -337,6 +347,7 @@ handoff_data(VNode, MsgData, VNodeTimeout) ->
{handoff_data, MsgData},
VNodeTimeout).


%% #19 - riak_core_vnode_proxy - handle_cast
unregistered(VNode) ->
gen_statem:call(VNode, unregistered).
Expand Down Expand Up @@ -384,17 +395,18 @@ monitor(ignore) -> erlang:monitor(process, self()).
%% ========
%% ========================
-record(state,
{index :: partition(), mod :: module(),
modstate :: term(),
forward :: node() | [{integer(), node()}],
handoff_target = none :: none | {integer(), node()},
handoff_pid :: pid() | undefined,
handoff_type ::
riak_core_handoff_manager:ho_type() | undefined,
pool_pid :: pid() | undefined,
pool_config :: tuple() | undefined,
manager_event_timer :: reference() | undefined,
inactivity_timeout :: non_neg_integer()}).
{index :: partition(),
mod :: module(),
modstate :: term(),
forward :: node() | [{integer(), node()}],
handoff_target = none :: none | {integer(), node()},
handoff_pid :: pid() | undefined,
handoff_type ::
riak_core_handoff_manager:ho_type() | undefined,
pool_pid :: pid() | undefined,
pool_config :: tuple() | undefined,
manager_event_timer :: reference() | undefined,
inactivity_timeout :: non_neg_integer()}).

callback_mode() -> state_functions.

Expand Down Expand Up @@ -750,6 +762,7 @@ active(info, {'EXIT', Pid, Reason},
WorkerArgs, worker_props),
continue(State#state{pool_pid = NewPoolPid})
end;

%%
active(info, {'DOWN', _Ref, process, _Pid, normal},
State = #state{modstate = {deleted, _}}) ->
Expand All @@ -758,17 +771,21 @@ active(info, {'DOWN', _Ref, process, _Pid, normal},
%% only dustbin them in the deleted modstate, because pipe vnodes
%% need them in other states
continue(State);

%%
active({info, _F}, Info,
State = #state{mod = Module, modstate = {deleted, _},
index = Index}) ->

logger:info("~p ~p ignored handle_info ~p - vnode "
"unregistering\n",
[Index, Module, Info]),
continue(State);

%%
active({info, _F}, {'EXIT', Pid, Reason},
State = #state{mod = Module, modstate = ModState}) ->

%% A linked processes has died so use the
%% handle_exit callback to allow the vnode
%% process to take appropriate action.
Expand Down Expand Up @@ -815,6 +832,7 @@ active({_C, From}, _MSG, State) ->
State,
[State#state.inactivity_timeout, {reply, From, ok}]}.


%% ========================
%% ========
%% Internal Helper Functions
Expand Down Expand Up @@ -1370,7 +1388,7 @@ mod_set_forwarding(Forward,
%% ===================================================================
%% Test API
%% ===================================================================
-ifdef(TEST).


-type state() :: #state{}.

Expand All @@ -1380,8 +1398,9 @@ mod_set_forwarding(Forward,
get_modstate(Pid) ->
{_StateName, State} = gen_statem:call(Pid,
current_state),
{State#state.mod, State#state.modstate}.

{State#state.mod, State#state.modstate}.
-ifdef(TEST).
%% Start the garbage collection server
test_link(Mod, Index) ->
gen_statem:start_link(?MODULE,
Expand Down Expand Up @@ -1438,15 +1457,17 @@ wait_for_process_death(_Pid, false) -> ok.
wait_for_state_update(OriginalStateData, Pid) ->
{_, CurrentStateData} = (?MODULE):current_state(Pid),
wait_for_state_update(OriginalStateData,
CurrentStateData, Pid).
CurrentStateData,
Pid).

wait_for_state_update(OriginalStateData,
OriginalStateData, Pid) ->
OriginalStateData, Pid) ->
{_, CurrentStateData} = (?MODULE):current_state(Pid),
wait_for_state_update(OriginalStateData,
CurrentStateData, Pid);
CurrentStateData,
Pid);
wait_for_state_update(_OriginalState, _StateData,
_Pid) ->
_Pid) ->
ok.

-endif.
You are viewing a condensed version of this merge commit. You can view the full changes here.