Skip to content

Commit

Permalink
PISTON-976: acdc status stats query/storage performance optimization (2…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfinke authored and jamesaimonetti committed Dec 12, 2019
1 parent 9f697dc commit 213d405
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 50 deletions.
150 changes: 108 additions & 42 deletions applications/acdc/src/acdc_agent_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
%%% @copyright (C) 2014-2019, 2600Hz
%%% @doc Collector of stats for agents
%%% @author James Aimonetti
%%% @author Daniel Finke
%%%
%%% This Source Code Form is subject to the terms of the Mozilla Public
%%% License, v. 2.0. If a copy of the MPL was not distributed with this
Expand All @@ -24,6 +25,7 @@
,handle_status_stat/2
,handle_status_query/2

,status_stat_key/3
,status_stat_id/3

,status_table_id/0
Expand All @@ -36,15 +38,19 @@
-include("acdc.hrl").
-include("acdc_stats.hrl").

%%------------------------------------------------------------------------------
%% @doc Status stat table configuration
%% @end
%%------------------------------------------------------------------------------
-spec status_table_id() -> atom().
status_table_id() -> 'acdc_stats_status'.

-spec status_key_pos() -> pos_integer().
status_key_pos() -> #status_stat.id.
status_key_pos() -> #status_stat.key.

-spec status_table_opts() -> kz_term:proplist().
status_table_opts() ->
['protected', 'named_table'
['ordered_set', 'protected', 'named_table'
,{'keypos', status_key_pos()}
].

Expand Down Expand Up @@ -215,16 +221,15 @@ handle_status_stat(JObj, Props) ->
'false'
end,

AgentId = kz_json:get_value(<<"Agent-ID">>, JObj),
AccountId = kz_json:get_ne_binary_value(<<"Account-ID">>, JObj),
AgentId = kz_json:get_ne_binary_value(<<"Agent-ID">>, JObj),
Timestamp = kz_json:get_integer_value(<<"Timestamp">>, JObj),

gen_listener:cast(props:get_value('server', Props)
,{'create_status'
,#status_stat{id=status_stat_id(AgentId, Timestamp, EventName)
,agent_id=AgentId
,account_id=kz_json:get_value(<<"Account-ID">>, JObj)
,#status_stat{key=status_stat_key(AccountId, AgentId, Timestamp)
,id=status_stat_id(AgentId, Timestamp, EventName)
,status=EventName
,timestamp=Timestamp
,callid=kz_json:get_value(<<"Call-ID">>, JObj)
,wait_time=acdc_stats_util:wait_time(EventName, JObj)
,pause_time=acdc_stats_util:pause_time(EventName, JObj)
Expand All @@ -235,6 +240,18 @@ handle_status_stat(JObj, Props) ->
}
).

%%------------------------------------------------------------------------------
%% @doc Status stat table key is in an order which can optimize the ordered_set
%% lookup if partially bound
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_key(kz_term:ne_binary(), kz_term:ne_binary(), pos_integer()) -> status_stat_key().
status_stat_key(AccountId, AgentId, Timestamp) ->
#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}.

-spec status_stat_id(kz_term:ne_binary(), pos_integer(), any()) -> kz_term:ne_binary().
status_stat_id(AgentId, Timestamp, _EventName) ->
<<AgentId/binary, "::", (kz_term:to_binary(Timestamp))/binary>>.
Expand All @@ -260,12 +277,19 @@ publish_query_errors(RespQ, MsgId, Errors) ->
lager:debug("responding with errors to req ~s: ~p", [MsgId, Errors]),
kapi_acdc_stats:publish_status_err(RespQ, API).

%%------------------------------------------------------------------------------
%% @doc Build a match spec for querying status stats
%% @end
%%------------------------------------------------------------------------------
-spec status_build_match_spec(kz_json:object()) ->
{'ok', ets:match_spec()} |
{'error', kz_json:object()}.
status_build_match_spec(JObj) ->
case kz_json:get_value(<<"Account-ID">>, JObj) of
'undefined' ->
{'error', kz_json:from_list([{<<"Account-ID">>, <<"missing but required">>}])};
AccountId ->
AcctMatch = {#status_stat{account_id='$1', _='_'}
AcctMatch = {#status_stat{key=#status_stat_key{account_id='$1'}, _='_'}
,[{'=:=', '$1', {'const', AccountId}}]
},
status_build_match_spec(JObj, AcctMatch)
Expand All @@ -282,7 +306,8 @@ status_build_match_spec(JObj, AcctMatch) ->

status_match_builder_fold(_, _, {'error', _Err}=E) -> E;
status_match_builder_fold(<<"Agent-ID">>, AgentId, {StatusStat, Contstraints}) ->
{StatusStat#status_stat{agent_id='$2'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{agent_id='$2'}}
,[{'=:=', '$2', {'const', AgentId}} | Contstraints]
};
status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints}) ->
Expand All @@ -302,7 +327,8 @@ status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints})
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'>=', '$3', N} | Contstraints]
}
catch
Expand All @@ -324,7 +350,8 @@ status_match_builder_fold(<<"End-Range">>, End, {StatusStat, Contstraints}) ->
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'=<', '$3', N} | Contstraints]
}
catch
Expand All @@ -337,51 +364,90 @@ status_match_builder_fold(<<"Status">>, Status, {StatusStat, Contstraints}) ->
};
status_match_builder_fold(_, _, Acc) -> Acc.

%%------------------------------------------------------------------------------
%% @doc Execute a status query
%% @end
%%------------------------------------------------------------------------------
-spec query_statuses(kz_term:ne_binary(), kz_term:ne_binary(), ets:match_spec(), pos_integer() | 'no_limit') -> 'ok'.
query_statuses(RespQ, MsgId, Match, Limit) ->
Stats = ets:select(status_table_id(), Match),
Stats = ets:select_reverse(status_table_id(), Match),

case Stats of
[] -> lager:debug("no stats found (requester: ~s)", [RespQ]);
_ -> 'ok'
end,

QueryResults = lists:foldl(fun query_status_fold/2, kz_json:new(), Stats),
TrimmedResults = kz_json:map(fun(A, B) ->
{A, trim_query_statuses(B, Limit)}
end, QueryResults),

Resp = [{<<"Agents">>, TrimmedResults}
Resp = [{<<"Agents">>, query_statuses_group_by_agent(Stats, Limit)}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_resp(RespQ, Resp).

-spec trim_query_statuses(kz_json:object(), pos_integer() | 'no_limit') -> kz_json:object().
trim_query_statuses(Statuses, Limit) ->
StatusProps = kz_json:to_proplist(Statuses),
SortedProps = lists:sort(fun({A, _}, {B, _}) ->
kz_term:to_integer(A) >= kz_term:to_integer(B)
end, StatusProps),
LimitedProps = case Limit of
'no_limit' -> SortedProps;
_ -> lists:sublist(SortedProps, Limit)
end,
kz_json:from_list(LimitedProps).

-spec query_status_fold(status_stat(), kz_json:object()) -> kz_json:object().
query_status_fold(#status_stat{agent_id=AgentId
,timestamp=T
}=Stat, Acc) ->
Doc = kz_doc:public_fields(status_stat_to_doc(Stat)),
kz_json:set_value([AgentId, kz_term:to_binary(T)], Doc, Acc).
%%------------------------------------------------------------------------------
%% @doc Group status stats by agent and return the whole map as a JObj. Each
%% agent grouping will contain at max "Limit" number of status stats
%% @end
%%------------------------------------------------------------------------------
query_statuses_group_by_agent(Stats, Limit) ->
query_statuses_fold(Stats, Limit, #{}).

query_statuses_fold([], _, StatsByAgent) -> kz_json:from_map(StatsByAgent);
query_statuses_fold([#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
}=Stat
| Stats
], Limit, StatsByAgent) ->
AgentStats = maps:get(AgentId, StatsByAgent, #{}),
StatsByAgent1 = case Limit =:= 'no_limit'
orelse maps:size(AgentStats) < Limit
of
'true' ->
TimestampBin = kz_term:to_binary(Timestamp),
Stat1 = status_stat_to_map(Stat),
AgentStats1 = AgentStats#{TimestampBin => Stat1},
StatsByAgent#{AgentId => AgentStats1};
'false' -> StatsByAgent
end,
query_statuses_fold(Stats, Limit, StatsByAgent1).

%%------------------------------------------------------------------------------
%% @doc Convert a status stat record to a map that can be efficiently parsed
%% into a JObj
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_to_map(status_stat()) -> map().
status_stat_to_map(#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,wait_time=WT
,pause_time=PT
,callid=CallId
,caller_id_name=CIDName
,caller_id_number=CIDNum
,queue_id=QueueId
}) ->
#{'agent_id' => AgentId
,'timestamp' => Timestamp
,'id' => Id
,'status' => Status
,'wait_time' => WT
,'pause_time' => PT
,'call_id' => CallId
,'caller_id_name' => CIDName
,'caller_id_number' => CIDNum
,'queue_id' => QueueId
}.

-spec status_stat_to_doc(status_stat()) -> kz_json:object().
status_stat_to_doc(#status_stat{id=Id
,agent_id=AgentId
,account_id=AccountId
status_stat_to_doc(#status_stat{key=#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,timestamp=Timestamp
,wait_time=WT
,pause_time=PT
,callid=CallId
Expand Down Expand Up @@ -420,7 +486,7 @@ archive_status_data(Srv, 'true') ->
archive_status_data(Srv, 'false') ->
kz_log:put_callid(<<"acdc_stats.status_archiver">>),
Past = kz_time:now_s() - ?ARCHIVE_WINDOW,
Match = [{#status_stat{timestamp='$1'
Match = [{#status_stat{key=#status_stat_key{timestamp='$1'}
,is_archived='$2'
,_='_'
}
Expand All @@ -447,7 +513,7 @@ maybe_archive_status_data(Srv, Match) ->
end.

-spec archive_status_fold(status_stat(), dict:dict()) -> dict:dict().
archive_status_fold(#status_stat{account_id=AccountId}=Stat, Acc) ->
archive_status_fold(#status_stat{key=#status_stat_key{account_id=AccountId}}=Stat, Acc) ->
Doc = status_stat_to_doc(Stat),
dict:update(AccountId, fun(L) -> [Doc | L] end, [Doc], Acc).

Expand Down
2 changes: 1 addition & 1 deletion applications/acdc/src/acdc_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ cleanup_data(Srv) ->
}],
gen_listener:cast(Srv, {'remove_call', CallMatch}),

StatusMatch = [{#status_stat{timestamp='$1', _='_'}
StatusMatch = [{#status_stat{key=#status_stat_key{timestamp='$1'}, _='_'}
,[{'=<', '$1', Past}]
,['$_']
}],
Expand Down
13 changes: 9 additions & 4 deletions applications/acdc/src/acdc_stats.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@
,<<"connecting">>, <<"connected">>
,<<"wrapup">>, <<"paused">>, <<"outbound">>
]).
-record(status_stat, {id :: kz_term:api_binary() | '_'
,agent_id :: kz_term:api_binary() | '$2' | '_'
,account_id :: kz_term:api_binary() | '$1' | '_'

%% This key optimizes lookups in the ordered_set ETS table
-record(status_stat_key, {account_id = '_' :: kz_term:ne_binary() | '$1' | '_'
,agent_id = '_' :: kz_term:ne_binary() | '$2' | '_'
,timestamp = '_' :: pos_integer() | '$1' | '$3' | '_'
}).
-type status_stat_key() :: #status_stat_key{}.
-record(status_stat, {key = '_' :: status_stat_key() | '_'
,id :: kz_term:api_binary() | '_'
,status :: kz_term:api_binary() | '$4' | '_'
,timestamp :: kz_term:api_pos_integer() | '$1' | '$3' | '$5' | '_'

,wait_time :: kz_term:api_integer() | '_'
,pause_time :: kz_term:api_integer() | '_'
Expand Down
11 changes: 8 additions & 3 deletions applications/acdc/src/cb_agents.erl
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ fetch_current_status(Context, AgentId, 'true') ->
Req = props:filter_undefined(
[{<<"Account-ID">>, cb_context:account_id(Context)}
,{<<"Agent-ID">>, AgentId}
,{<<"Limit">>, 1}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
]),
case kz_amqp_worker:call(Req
Expand All @@ -338,11 +339,15 @@ fetch_current_status(Context, AgentId, 'true') ->
,Context
);
{'ok', Resp} ->
Stats = kz_json:get_value([<<"Agents">>, AgentId], Resp),
{_, StatusJObj} = kz_json:foldl(fun acdc_agent_util:find_most_recent_fold/3, {0, kz_json:new()}, Stats),
crossbar_util:response(StatusJObj, Context)
Agents = kz_json:get_value(<<"Agents">>, Resp, kz_json:new()),
Agents1 = kz_json:map(fun(K, AgentStats) -> {K, remove_timestamps(AgentStats)} end, Agents),
crossbar_util:response(kz_json:get_json_value(AgentId, Agents1), Context)
end.

remove_timestamps(AgentStats) ->
[Key|_] = kz_json:get_keys(AgentStats),
kz_json:get_json_value(Key, AgentStats).

-spec fetch_all_current_statuses(cb_context:context(), kz_term:api_binary(), kz_term:api_binary()) ->
cb_context:context().
fetch_all_current_statuses(Context, AgentId, Status) ->
Expand Down

0 comments on commit 213d405

Please sign in to comment.