Skip to content

Commit

Permalink
handle account processing timers (2600hz#2931)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo authored and k-anderson committed Dec 8, 2016
1 parent 2bb2d08 commit ea22275
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions applications/fax/src/fax_jobs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,22 @@ handle_cast(_Msg, State) ->
-spec handle_info(any(), state()) -> handle_info_ret_state(state()).
handle_info('timeout', #state{queue='undefined'}=State) ->
{'noreply', State, ?POLLING_INTERVAL};
handle_info('timeout', #state{stale= ?STALE_LIMIT}=State) ->
{'stop', 'normal', State};
handle_info('timeout', #state{jobs=#{distribute := []
,serialize := []
,pending := []
,running := []
}
,stale= Stale
handle_info('timeout', #state{account_id=AccountId
,stale=?STALE_LIMIT
}=State) ->
{'noreply', State#state{stale = Stale + 1}, ?POLLING_INTERVAL};
lager:debug("stale limit of ~B reached for account ~s. exiting", [?STALE_LIMIT, AccountId]),
{'stop', 'normal', State};
handle_info('timeout', #state{account_id=AccountId
,jobs=#{distribute := Distribute
,pending := Pending
,running := Running
}=Map0
,stale=Stale
}=State) ->
Map2 = #{running := Running} = maps:fold(fun check_pending/3, Map0, Pending),
Map1 = maps:fold(fun check_running/3, Map2, Running),
Map = Map1#{distribute => Distribute ++ get_account_jobs(AccountId)},
{'noreply', distribute_jobs(State#state{jobs=Map, stale = 0}), ?POLLING_INTERVAL};
{'noreply', distribute_jobs(State#state{jobs=Map, stale = Stale + 1}), ?POLLING_INTERVAL};
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{'noreply', State, ?POLLING_INTERVAL}.
Expand Down Expand Up @@ -254,15 +250,23 @@ code_change(_OldVsn, State, _Extra) ->
-spec distribute_jobs(state()) -> state().
distribute_jobs(#state{jobs=#{distribute := []
,serialize := []
,pending := #{}
,running := #{}
}
}=State) -> State;
distribute_jobs(#state{jobs=#{distribute := []
,serialize := []
}
}=State) ->
State#state{stale=0};
distribute_jobs(#state{jobs=#{distribute := []
,serialize := Serialize
}=Jobs
}=State) ->
State#state{jobs=Jobs#{distribute => Serialize
,serialize => []
}};
}
,stale=0};
distribute_jobs(#state{account_id=AccountId
,limits= #{account := MaxAccount}
,jobs=#{pending := Pending
Expand All @@ -271,15 +275,15 @@ distribute_jobs(#state{account_id=AccountId
}=State)
when map_size(Pending) + map_size(Running) >= MaxAccount ->
lager:warning("fax outbound limits (~b) reached for account ~s", [MaxAccount, AccountId]),
State;
State#state{stale=0};
distribute_jobs(#state{jobs=#{distribute := [Job | Jobs]}=Map}=State) ->
maybe_distribute_job(Job, State#state{jobs=Map#{distribute => Jobs}}).

-spec maybe_distribute_job(kz_json:object(), state()) -> state().
maybe_distribute_job(Job, #state{account_id=AccountId}=State) ->
Number = number(Job),
case is_number_valid(Number, AccountId) of
'true' -> distribute_job(knm_converters:normalize(Number, AccountId), Job, State);
'true' -> distribute_job(knm_converters:normalize(Number, AccountId), Job, State#state{stale=0});
'false' -> invalidate_job(Job, State)
end.

Expand Down

0 comments on commit ea22275

Please sign in to comment.