Skip to content

Commit

Permalink
Monitor watcher processes to cleanup stopped ones.
Browse files Browse the repository at this point in the history
  • Loading branch information
hairyhum committed Feb 14, 2018
1 parent e606666 commit 4f4ac3c
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions src/aten_detector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
interval = ?POLL_INTERVAL_MS :: non_neg_integer(),
threshold = ?DEFAULT_THRESHOLD :: float(),
node_states = #{} :: #{node() => float()}, % last threshold
watchers = #{} :: #{node() => [pid()]}}).
watchers = #{} :: #{node() => #{pid() => reference()}}}).

%%%===================================================================
%%% API functions
Expand Down Expand Up @@ -57,14 +57,18 @@ handle_call(_Request, _From, State) ->
{reply, Reply, State}.

handle_cast({register, Node, Pid}, #state{watchers = Watchers0} = State) ->
Watchers = maps:update_with(Node, fun (Pids) -> [Pid | Pids] end,
[Pid], Watchers0),
Pids0 = maps:get(Node, Watchers0, #{}),
Pids = case Pids0 of
#{Pid := _Mon} -> Pids0;
#{} -> Pids0#{Pid => erlang:monitor(process, Pid)}
end,
Watchers = maps:put(Node, Pids, Watchers0),
ok = try_connect(Node),
{noreply, State#state{watchers = Watchers}};
handle_cast({unregister, Node, Pid}, #state{watchers = Watchers0} = State) ->
Watchers = case Watchers0 of
#{Node := Pids} ->
Watchers0#{Node => lists:delete(Pid, Pids)};
Watchers0#{Node => maps:remove(Pid, Pids)};
_ ->
Watchers0
end,
Expand All @@ -78,7 +82,14 @@ handle_info(poll, #state{threshold = Th,
{Up, Down} = analyse(Probs, Prev, Th),
ok = notify(Watchers, Down, down),
ok = notify(Watchers, Up, up),
{noreply, State#state{node_states = Probs}}.
{noreply, State#state{node_states = Probs}};
handle_info({'DOWN', _Mon, process, Pid, _R},
#state{watchers = Watchers0} = State) ->
Watchers = maps:map(fun(_Node, Pids) ->
maps:remove(Pid, Pids)
end,
Watchers0),
{noreply, State#state{watchers = Watchers}}.

terminate(_Reason, _State) ->
ok.
Expand All @@ -95,8 +106,7 @@ notify(_Watchers, [], _Evt) ->
notify(Watchers, [Node | Nodes], Evt) ->
case Watchers of
#{Node := Pids} ->
[Pid ! {node_event, Node, Evt}
|| Pid <- Pids];
maps:map(fun(Pid, _) -> Pid ! {node_event, Node, Evt} end, Pids);
_ ->
ok
end,
Expand Down

0 comments on commit 4f4ac3c

Please sign in to comment.