From 83dd57b78250d2d61369371325b649f9eb2fbc9e Mon Sep 17 00:00:00 2001 From: argv0 Date: Tue, 5 Apr 2011 14:17:08 -0700 Subject: [PATCH] bz://716 : graceful riak_core_handoff_sender failure on crash of receiving node --- src/riak_core_handoff_sender.erl | 66 ++++++++++++++++++++++++-------- src/riak_core_vnode.erl | 20 ++++++++-- 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl index 40607ee8f..7ebeb06ff 100644 --- a/src/riak_core_handoff_sender.erl +++ b/src/riak_core_handoff_sender.erl @@ -58,35 +58,69 @@ start_fold(TargetNode, Module, Partition, ParentPid) -> M = <>, ok = gen_tcp:send(Socket, M), StartFoldTime = now(), - {Socket,ParentPid,Module,_Ack,SentCount} = + {Socket,ParentPid,Module,_Ack,SentCount,ErrStatus} = riak_core_vnode_master:sync_command({Partition, node()}, ?FOLD_REQ{ foldfun=fun visit_item/3, - acc0={Socket,ParentPid,Module,0,0}}, + acc0={Socket,ParentPid,Module,0,0,ok}}, VMaster, infinity), EndFoldTime = now(), - error_logger:info_msg("Handoff of partition ~p ~p to ~p completed: sent ~p objects in ~.2f seconds", - [Module, Partition, TargetNode, SentCount, - timer:now_diff(EndFoldTime, StartFoldTime) / 1000000]), - gen_fsm:send_event(ParentPid, handoff_complete) - %% Socket will be closed when this process exits + case ErrStatus of + ok -> + error_logger:info_msg("Handoff of partition ~p ~p to ~p " + "completed: sent ~p objects in ~.2f " + "seconds\n", + [Module, Partition, TargetNode, + SentCount, + timer:now_diff( + EndFoldTime, + StartFoldTime) / 1000000]), + gen_fsm:send_event(ParentPid, handoff_complete); + {error, ErrReason} -> + error_logger:error_msg("Handoff of partition ~p ~p to ~p " + "FAILED: (~p) after sending ~p objects " + "in ~.2f seconds\n", + [Module, Partition, TargetNode, + ErrReason, SentCount, + timer:now_diff( + EndFoldTime, + StartFoldTime) / 1000000]), + gen_fsm:send_event(ParentPid, {handoff_error, + fold_error, ErrReason}) + end catch Err:Reason -> error_logger:error_msg("Handoff sender ~p ~p failed ~p:~p\n", - [Module, Partition, Err,Reason]) + [Module, Partition, Err,Reason]), + gen_fsm:send_event(ParentPid, {handoff_error, Err, Reason}) end. -visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total}) -> +%% When a tcp error occurs, the ErrStatus argument is set to {error, Reason}. +%% Since we can't abort the fold, this clause is just a no-op. +visit_item(_K, _V, {Socket, ParentPid, Module, Ack, Total, {error, Reason}}) -> + {Socket, ParentPid, Module, Ack, Total, {error, Reason}}; +visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total, _Err}) -> M = <>, - ok = gen_tcp:send(Socket, M), - {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} = gen_tcp:recv(Socket, 0), - visit_item(K, V, {Socket, ParentPid, Module, 0, Total}); -visit_item(K, V, {Socket, ParentPid, Module, Ack, Total}) -> + case gen_tcp:send(Socket, M) of + ok -> + case gen_tcp:recv(Socket, 0) of + {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> + visit_item(K, V, {Socket, ParentPid, Module, 0, Total, ok}); + {error, Reason} -> + {Socket, ParentPid, Module, 0, Total, {error, Reason}} + end; + {error, Reason} -> + {Socket, ParentPid, Module, 0, Total, {error, Reason}} + end; +visit_item(K, V, {Socket, ParentPid, Module, Ack, Total, _ErrStatus}) -> BinObj = Module:encode_handoff_item(K, V), M = <>, - ok = gen_tcp:send(Socket, M), - {Socket, ParentPid, Module, Ack+1, Total+1}. - + case gen_tcp:send(Socket, M) of + ok -> + {Socket, ParentPid, Module, Ack+1, Total+1, ok}; + {error, Reason} -> + {Socket, ParentPid, Module, Ack, Total, {error, Reason}} + end. get_handoff_port(Node) when is_atom(Node) -> case catch(gen_server2:call({riak_core_handoff_listener, Node}, handoff_port, infinity)) of diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index da22ff07d..422031e10 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -77,6 +77,7 @@ behaviour_info(_Other) -> modstate :: term(), handoff_token :: non_neg_integer(), handoff_node=none :: none | node(), + handoff_pid :: pid(), inactivity_timeout}). start_link(Mod, Index) -> @@ -174,7 +175,18 @@ active(handoff_complete, State=#state{mod=Mod, Mod:handoff_finished(HN, ModState), {ok, NewModState} = Mod:delete(ModState), riak_core_handoff_manager:add_exclusion(Mod, Idx), - {stop, normal, State#state{modstate=NewModState, handoff_node=none}}. + {stop, normal, State#state{modstate=NewModState, + handoff_node=none, + handoff_pid=undefined}}; +active({handoff_error, _Err, _Reason}, State=#state{mod=Mod, + modstate=ModState, + index=Idx, + handoff_token=HT}) -> + riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT), + %% it would be nice to pass {Err, Reason} to the vnode but the + %% API doesn't currently allow for that. + Mod:handoff_cancelled(ModState), + continue(State#state{handoff_node=none}). active(_Event, _From, State) -> Reply = ok, @@ -198,6 +210,8 @@ handle_sync_event({handoff_data,BinObj}, _From, StateName, State#state.inactivity_timeout} end. +handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) -> + continue(State#state{handoff_pid=undefined}); handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod}) -> %% A linked processes has died so use the %% handle_exit callback to allow the vnode @@ -254,8 +268,8 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) - NewState = State#state{modstate=NewModState, handoff_token=HandoffToken, handoff_node=TargetNode}, - riak_core_handoff_sender:start_link(TargetNode, Mod, Idx), - continue(NewState) + {ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx), + continue(NewState#state{handoff_pid=HandoffPid}) end end.