Skip to content

Commit

Permalink
bz://716 : graceful riak_core_handoff_sender failure on crash of rece…
Browse files Browse the repository at this point in the history
…iving node
  • Loading branch information
argv0 authored and Vagabond committed Apr 6, 2011
1 parent 8ff2769 commit 83dd57b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 19 deletions.
66 changes: 50 additions & 16 deletions src/riak_core_handoff_sender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,69 @@ start_fold(TargetNode, Module, Partition, ParentPid) ->
M = <<?PT_MSG_INIT:8,Partition:160/integer>>,
ok = gen_tcp:send(Socket, M),
StartFoldTime = now(),
{Socket,ParentPid,Module,_Ack,SentCount} =
{Socket,ParentPid,Module,_Ack,SentCount,ErrStatus} =
riak_core_vnode_master:sync_command({Partition, node()},
?FOLD_REQ{
foldfun=fun visit_item/3,
acc0={Socket,ParentPid,Module,0,0}},
acc0={Socket,ParentPid,Module,0,0,ok}},
VMaster, infinity),
EndFoldTime = now(),
error_logger:info_msg("Handoff of partition ~p ~p to ~p completed: sent ~p objects in ~.2f seconds",
[Module, Partition, TargetNode, SentCount,
timer:now_diff(EndFoldTime, StartFoldTime) / 1000000]),
gen_fsm:send_event(ParentPid, handoff_complete)
%% Socket will be closed when this process exits
case ErrStatus of
ok ->
error_logger:info_msg("Handoff of partition ~p ~p to ~p "
"completed: sent ~p objects in ~.2f "
"seconds\n",
[Module, Partition, TargetNode,
SentCount,
timer:now_diff(
EndFoldTime,
StartFoldTime) / 1000000]),
gen_fsm:send_event(ParentPid, handoff_complete);
{error, ErrReason} ->
error_logger:error_msg("Handoff of partition ~p ~p to ~p "
"FAILED: (~p) after sending ~p objects "
"in ~.2f seconds\n",
[Module, Partition, TargetNode,
ErrReason, SentCount,
timer:now_diff(
EndFoldTime,
StartFoldTime) / 1000000]),
gen_fsm:send_event(ParentPid, {handoff_error,
fold_error, ErrReason})
end
catch
Err:Reason ->
error_logger:error_msg("Handoff sender ~p ~p failed ~p:~p\n",
[Module, Partition, Err,Reason])
[Module, Partition, Err,Reason]),
gen_fsm:send_event(ParentPid, {handoff_error, Err, Reason})
end.

visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total}) ->
%% When a tcp error occurs, the ErrStatus argument is set to {error, Reason}.
%% Since we can't abort the fold, this clause is just a no-op.
visit_item(_K, _V, {Socket, ParentPid, Module, Ack, Total, {error, Reason}}) ->
{Socket, ParentPid, Module, Ack, Total, {error, Reason}};
visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total, _Err}) ->
M = <<?PT_MSG_OLDSYNC:8,"sync">>,
ok = gen_tcp:send(Socket, M),
{ok,[?PT_MSG_OLDSYNC|<<"sync">>]} = gen_tcp:recv(Socket, 0),
visit_item(K, V, {Socket, ParentPid, Module, 0, Total});
visit_item(K, V, {Socket, ParentPid, Module, Ack, Total}) ->
case gen_tcp:send(Socket, M) of
ok ->
case gen_tcp:recv(Socket, 0) of
{ok,[?PT_MSG_OLDSYNC|<<"sync">>]} ->
visit_item(K, V, {Socket, ParentPid, Module, 0, Total, ok});
{error, Reason} ->
{Socket, ParentPid, Module, 0, Total, {error, Reason}}
end;
{error, Reason} ->
{Socket, ParentPid, Module, 0, Total, {error, Reason}}
end;
visit_item(K, V, {Socket, ParentPid, Module, Ack, Total, _ErrStatus}) ->
BinObj = Module:encode_handoff_item(K, V),
M = <<?PT_MSG_OBJ:8,BinObj/binary>>,
ok = gen_tcp:send(Socket, M),
{Socket, ParentPid, Module, Ack+1, Total+1}.

case gen_tcp:send(Socket, M) of
ok ->
{Socket, ParentPid, Module, Ack+1, Total+1, ok};
{error, Reason} ->
{Socket, ParentPid, Module, Ack, Total, {error, Reason}}
end.

get_handoff_port(Node) when is_atom(Node) ->
case catch(gen_server2:call({riak_core_handoff_listener, Node}, handoff_port, infinity)) of
Expand Down
20 changes: 17 additions & 3 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ behaviour_info(_Other) ->
modstate :: term(),
handoff_token :: non_neg_integer(),
handoff_node=none :: none | node(),
handoff_pid :: pid(),
inactivity_timeout}).

start_link(Mod, Index) ->
Expand Down Expand Up @@ -174,7 +175,18 @@ active(handoff_complete, State=#state{mod=Mod,
Mod:handoff_finished(HN, ModState),
{ok, NewModState} = Mod:delete(ModState),
riak_core_handoff_manager:add_exclusion(Mod, Idx),
{stop, normal, State#state{modstate=NewModState, handoff_node=none}}.
{stop, normal, State#state{modstate=NewModState,
handoff_node=none,
handoff_pid=undefined}};
active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
modstate=ModState,
index=Idx,
handoff_token=HT}) ->
riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
%% it would be nice to pass {Err, Reason} to the vnode but the
%% API doesn't currently allow for that.
Mod:handoff_cancelled(ModState),
continue(State#state{handoff_node=none}).

active(_Event, _From, State) ->
Reply = ok,
Expand All @@ -198,6 +210,8 @@ handle_sync_event({handoff_data,BinObj}, _From, StateName,
State#state.inactivity_timeout}
end.

handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) ->
continue(State#state{handoff_pid=undefined});
handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod}) ->
%% A linked processes has died so use the
%% handle_exit callback to allow the vnode
Expand Down Expand Up @@ -254,8 +268,8 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
NewState = State#state{modstate=NewModState,
handoff_token=HandoffToken,
handoff_node=TargetNode},
riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
continue(NewState)
{ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
continue(NewState#state{handoff_pid=HandoffPid})
end
end.

Expand Down

0 comments on commit 83dd57b

Please sign in to comment.