This is an automated email from the ASF dual-hosted git repository. chewbranca pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a9bf98bead92de198f5175991442f51d22c7546c Author: ILYA Khlopotov <[email protected]> AuthorDate: Tue Jun 10 12:05:16 2025 -0700 Update HTTP API Replace comparison with pattern match. Add QUERY_CARDINALITY_LIMIT to group_by Use sliding topK Pass matcher to group_by and friends Add RPC Add HTTP --- src/chttpd/src/chttpd_httpd_handlers.erl | 2 +- .../src/couch_stats_resource_tracker.hrl | 1 + src/couch_stats/src/csrt.erl | 50 +++++++ src/couch_stats/src/csrt_httpd.erl | 165 +++++++++++++++++++++ src/couch_stats/src/csrt_query.erl | 145 +++++++++++++----- 5 files changed, 328 insertions(+), 35 deletions(-) diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index e1b260222..9256f08b1 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -20,7 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1; url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1; url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1; url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1; -url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1; +url_handler(<<"_active_resources">>) -> fun csrt_httpd:handle_resource_status_req/1; url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1; url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1; url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1; diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl b/src/couch_stats/src/couch_stats_resource_tracker.hrl index aec21115e..350438135 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.hrl +++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl @@ -41,6 +41,7 @@ -define(CONF_MATCHERS_ENABLED, "csrt_logger.matchers_enabled"). -define(CONF_MATCHERS_THRESHOLD, "csrt_logger.matchers_threshold"). -define(CONF_MATCHERS_DBNAMES, "csrt_logger.dbnames_io"). +-define(QUERY_CARDINALITY_LIMIT, 10_000). %% Mapping of couch_stat metric names to #rctx{} field names. %% These are used for fields that we inc a counter on. diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl index 603e444e9..123a3c0de 100644 --- a/src/couch_stats/src/csrt.erl +++ b/src/couch_stats/src/csrt.erl @@ -71,6 +71,12 @@ should_track_init_p/1 ]). +%% RPC api +-export([ + rpc/2, + call/1 +]). + %% aggregate query api -export([ active/0, @@ -99,6 +105,50 @@ proc_window/3 ]). +%% +%% RPC operations +%% + +-spec rpc(FName :: atom(), Args :: [any()]) -> + {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}], BadNodes :: [node()]}. +rpc(FName, Args) when is_atom(FName) andalso is_list(Args) -> + {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]), + {Results, Errors} = split_response(Resp), + {Results, lists:usort(Errors), BadNodes}. + +split_response(Resp) -> + lists:foldl(fun(Message, {Results, Errors}) -> + case Message of + {badrpc, _} = E -> + {Results, [E | Errors]}; + Result -> + {[Result | Results], Errors} + end + end, {[], []}, Resp). + +call({active, []}) -> {node(), active()}; +call({active, [json]}) -> {node(), active(json)}; +call({active_coordinators, []}) -> {node(), active_coordinators()}; +call({active_coordinators, [json]}) -> {node(), active_coordinators(json)}; +call({active_workers, []}) -> {node(), active_workers()}; +call({active_workers, [json]}) -> {node(), active_workers(json)}; +call({count_by, [Key]}) -> {node(), count_by(Key)}; +call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)}; +call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)}; +call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)}; +call({find_workers_by_pidref, [PidRef]}) -> {node(), find_workers_by_pidref(PidRef)}; +call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)}; +call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)}; +call({sorted, [Map]}) -> {node(), sorted(Map)}; +call({sorted_by, [Key]}) -> {node(), sorted_by(Key)}; +call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)}; +call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)}; +call({FunName, Args}) -> + FunNameBin = atom_to_binary(FunName), + ArityBin = integer_to_binary(length(Args)), + {error, <<"No such function '"/binary, FunNameBin/binary, "/"/binary, ArityBin/binary>>}. + + %% %% PidRef operations %% diff --git a/src/couch_stats/src/csrt_httpd.erl b/src/couch_stats/src/csrt_httpd.erl new file mode 100644 index 000000000..c02e6295c --- /dev/null +++ b/src/couch_stats/src/csrt_httpd.erl @@ -0,0 +1,165 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(csrt_httpd). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_stats_resource_tracker.hrl"). + +-export([handle_resource_status_req/1]). + +-import( + chttpd, + [ + send_json/2, send_json/3, + send_method_not_allowed/2 + ] +). + +rpc_to_json({Resp, _Errors, _Nodes}) -> + #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [], <<"bad_nodes">> => []}. + +resp_to_json([{N, R} | Rest], Acc) -> + resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc)); +resp_to_json([], Acc) -> + Acc. + +% handle_resource_status_req(#httpd{method = 'GET', path_parts = [<<"_active_resources">>]} = Req) -> +% ok = chttpd:verify_is_server_admin(Req), +% %% TODO: incorporate Bad responses +% Resp = rpc_to_json(csrt:rpc(active, [json])), +% send_json(Req, Resp); +handle_resource_status_req(#httpd{method = 'POST', path_parts = [<<"_active_resources">>, <<"_match">>, MatcherName]} = Req) -> + chttpd:validate_ctype(Req, "application/json"), + {JsonProps} = chttpd:json_body_obj(Req), + GroupBy = couch_util:get_value(<<"group_by">>, JsonProps), + SortBy = couch_util:get_value(<<"sort_by">>, JsonProps), + CountBy = couch_util:get_value(<<"count_by">>, JsonProps), + + case {GroupBy, SortBy, CountBy} of + {undefined, undefined, {Query}} -> + handle_count_by(Req, MatcherName, Query); + {undefined, {Query}, undefined} -> + handle_sort_by(Req, MatcherName, Query); + {{Query}, undefined, undefined} -> + handle_group_by(Req, MatcherName, Query); + {_, _, _} -> + throw({bad_request, <<"Multiple aggregations are not supported">>}) + end; +handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + send_method_not_allowed(Req, "GET,HEAD"); + +handle_resource_status_req(Req) -> + ok = chttpd:verify_is_server_admin(Req), + send_method_not_allowed(Req, "GET,HEAD,POST"). + +handle_count_by(Req, MatcherName, CountBy) -> + AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy), + AggregationKey = parse_key(AggregationKeys), + case csrt_query:count_by(matching(MatcherName), AggregationKey) of + {ok, Map} -> + send_json(Req, {aggregation_result_to_json(Map)}); + Else -> + %% TODO handle error + throw({bad_request, Else}) + end. + +handle_sort_by(Req, MatcherName, SortBy) -> + AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy), + CounterKey = couch_util:get_value(<<"counter_key">>, SortBy), + AggregationKey = parse_key(AggregationKeys), + ValueKey = parse_key(CounterKey), + case csrt_query:sort_by(matching(MatcherName), AggregationKey, ValueKey) of + {ok, Map} -> + send_json(Req, {aggregation_result_to_json(Map)}); + Else -> + %% TODO handle error + throw({bad_request, Else}) + end. + +handle_group_by(Req, MatcherName, GroupBy) -> + AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy), + CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy), + AggregationKey = parse_key(AggregationKeys), + ValueKey = parse_key(CounterKey), + case csrt_query:group_by(matching(MatcherName), AggregationKey, ValueKey) of + {ok, Map} -> + send_json(Req, {aggregation_result_to_json(Map)}); + Else -> + %% TODO handle error + throw({bad_request, Else}) + end. + +aggregation_result_to_json(Map) when is_map(Map) -> + maps:fold(fun(K, V, Acc) -> [{key_to_string(K), V} | Acc] end, [], Map). + +key_to_string(Key) when is_tuple(Key) -> + list_to_binary(string:join([atom_to_list(K) || K <- tuple_to_list(Key)], ",")); +key_to_string(Key) when is_atom(Key) -> + atom_to_binary(Key). + +matching(MatcherName) -> + case csrt_logger:get_matcher(binary_to_list(MatcherName)) of + undefined -> + throw({bad_request, <<"unknown matcher '"/binary, MatcherName/binary, "'"/binary>>}); + Matcher -> + Matcher + end. + +% extract one of the predefined matchers +% - docs_read +% - rows_read +% - docs_written +% - worker_changes_processed +% - ioq_calls +query_matcher(MatcherName, AggregationKey, CounterKey) -> + case csrt_logger:get_matcher(binary_to_list(MatcherName)) of + undefined -> + {error, <<"unknown matcher '"/binary, MatcherName/binary, "'"/binary>>}; + Matcher -> + csrt_query:query_matcher(Matcher, AggregationKey, CounterKey) + end. + +-spec to_key(BinKey :: binary() | string()) -> Key :: rctx_field() + | throw({bad_request, Reason :: binary()}). + +to_key(<<"pid_ref">>) -> pid_ref; +to_key(<<"nonce">>) -> nonce; +to_key(<<"type">>) -> type; +to_key(<<"dbname">>) -> dbname; +to_key(<<"username">>) -> username; +to_key(<<"db_open">>) -> db_open; +to_key(<<"docs_read">>) -> docs_read; +to_key(<<"rows_read">>) -> rows_read; +to_key(<<"changes_returned">>) -> changes_returned; +to_key(<<"ioq_calls">>) -> ioq_calls; +to_key(<<"js_filter">>) -> js_filter; +to_key(<<"js_filtered_docs">>) -> js_filtered_docs; +to_key(<<"get_kv_node">>) -> get_kv_node; +to_key(<<"get_kp_node">>) -> get_kp_node; +to_key(Other) when is_binary(Other) -> throw({bad_request, <<"Invalid key '"/binary, Other/binary, "'"/binary>>}). + +-spec parse_key(Keys :: binary() | [binary()]) -> [rctx_field()] + | throw({bad_request, Reason :: binary()}). + +parse_key(Keys) when is_list(Keys) -> + parse_key(Keys, []); +parse_key(BinKey) when is_binary(BinKey) -> + to_key(BinKey); +parse_key(undefined) -> + undefined. + +parse_key([BinKey | Rest], Keys) -> + parse_key(Rest, [to_key(BinKey) | Keys]); +parse_key([], Keys) -> + lists:reverse(Keys). + diff --git a/src/couch_stats/src/csrt_query.erl b/src/couch_stats/src/csrt_query.erl index 5c1f7cd7a..4edf5365c 100644 --- a/src/couch_stats/src/csrt_query.erl +++ b/src/couch_stats/src/csrt_query.erl @@ -23,17 +23,17 @@ active_coordinators/1, active_workers/0, active_workers/1, - count_by/1, + all/0, + count_by/2, find_by_nonce/1, find_by_pid/1, find_by_pidref/1, find_workers_by_pidref/1, - group_by/2, group_by/3, - sorted/1, - sorted_by/1, + group_by/4, sorted_by/2, - sorted_by/3 + sorted_by/3, + sorted_by/4 ]). %% @@ -112,45 +112,122 @@ field(#rctx{updated_at = Val}, updated_at) -> Val. curry_field(Field) -> fun(Ele) -> field(Ele, Field) end. -count_by(KeyFun) -> - group_by(KeyFun, fun(_) -> 1 end). +count_by(Matcher, KeyFun) -> + group_by(Matcher, KeyFun, fun(_) -> 1 end). -group_by(KeyFun, ValFun) -> - group_by(KeyFun, ValFun, fun erlang:'+'/2). +group_by(Matcher, KeyFun, ValFun) -> + group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2). -group_by(KeyL, ValFun, AggFun) when is_list(KeyL) -> +group_by(Matcher, KeyFun, ValFun, AggFun) -> + group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT). + +-spec all() -> matcher(). + +all() -> + Spec = ets:fun2ms(fun(#rctx{} = R) -> R end), + {Spec, ets:match_spec_compile(Spec)}. + +%% eg: group_by(all(), mfa, docs_read). +%% eg: group_by(all(), fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). +%% eg: ^^ or: group_by(all(), [mfa, docs_read], ioq_calls). +%% eg: group_by(all(), [username, dbname, mfa], docs_read). +%% eg: group_by(all(), [username, dbname, mfa], ioq_calls). +%% eg: group_by(all(), [username, dbname, mfa], js_filters). +group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) -> KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, - group_by(KeyFun, ValFun, AggFun); -group_by(Key, ValFun, AggFun) when is_atom(Key) -> - group_by(curry_field(Key), ValFun, AggFun); -group_by(KeyFun, Val, AggFun) when is_atom(Val) -> - group_by(KeyFun, curry_field(Val), AggFun); -group_by(KeyFun, ValFun, AggFun) -> + group_by(Matcher, KeyFun, ValFun, AggFun, Limit); +group_by(Matcher, Key, ValFun, AggFun, Limit) when is_atom(Key) -> + group_by(Matcher, curry_field(Key), ValFun, AggFun, Limit); +group_by(Matcher, KeyFun, Val, AggFun, Limit) when is_atom(Val) -> + group_by(Matcher, KeyFun, curry_field(Val), AggFun, Limit); +group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> FoldFun = fun(Ele, Acc) -> - Key = KeyFun(Ele), - Val = ValFun(Ele), - CurrVal = maps:get(Key, Acc, 0), - NewVal = AggFun(CurrVal, Val), - %% TODO: should we skip here? how to make this optional? - case NewVal > 0 of + case maps:size(Acc) =< Limit of true -> - maps:put(Key, NewVal, Acc); + case maybe_match(Ele, Matcher) of + true -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + case AggFun(CurrVal, Val) of + 0 -> + Acc; + NewVal -> + maps:put(Key, NewVal, Acc) + end; + false -> + Acc + end; false -> - Acc - end + throw({limit, Acc}) + end end, - ets:foldl(FoldFun, #{}, ?CSRT_ETS). + try + {ok, ets:foldl(FoldFun, #{}, ?CSRT_ETS)} + catch throw:{limit, Acc} -> + {limit, Acc} + end. -%% Sorts largest first -sorted(Map) when is_map(Map) -> - lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). +maybe_match(_Ele, undefined) -> + true; +maybe_match(Ele, {_, MS}) -> + ets:match_spec_run([Ele], MS) =/= []. -shortened(L) -> - lists:sublist(L, 10). +%% +%% Auxiliary functions to calculate topK +%% -sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). -sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). -sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). +-record(topK, { + % we store ordered elements in ascending order + seq = [] :: list(pos_integer()), + % we rely on erlang sorting order where `number < atom` + min = infinite :: infinite | pos_integer(), + max = 0 :: pos_integer(), + size = 0 :: non_neg_integer(), + % capacity cannot be less than 1 + capacity = 1 :: pos_integer() +}). + +new_topK(K) when K >= 1 -> + #topK{capacity = K}. + +% when we are at capacity +% don't bother adding the value since it is less than what we already saw +update_topK(_Key, Value, #topK{size = S, capacity = S, min = Min} = Top) when Value < Min -> + Top#topK{min = Value}; +% when we are at capacity evict smallest value +update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} = Top) when Value > Max -> + % capacity cannot be less than 1, so we can avoid handling the case when Seq is empty + [_ | Truncated] = Seq, + Top#topK{max = Value, seq = lists:keysort(2, [{Key, Value} | Truncated])}; +% when we are at capacity and value is in between min and max evict smallest value +update_topK(Key, Value, #topK{size = S, capacity = S, seq = Seq} = Top) -> + % capacity cannot be less than 1, so we can avoid handling the case when Seq is empty + [_ | Truncated] = Seq, + Top#topK{seq = lists:keysort(2, [{Key, Value} | Truncated])}; +update_topK(Key, Value, #topK{size = S, min = Min, seq = Seq} = Top) when Value < Min -> + Top#topK{size = S + 1, min = Value, seq = lists:keysort(2, [{Key, Value} | Seq])}; +update_topK(Key, Value, #topK{size = S, max = Max, seq = Seq} = Top) when Value > Max -> + Top#topK{size = S + 1, max = Value, seq = lists:keysort(2, [{Key, Value} | Seq])}; +update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) -> + Top#topK{size = S + 1, seq = lists:keysort(2, [{Key, Value} | Seq])}. + +get_topK(#topK{seq = S}) -> + lists:reverse(S). + +topK(Results, K) -> + TopK = maps:fold(fun update_topK/3, new_topK(K), Results), + get_topK(TopK). + +%% eg: sorted_by([username, dbname, mfa], ioq_calls) +%% eg: sorted_by([dbname, mfa], doc_reads) +sorted_by(KeyFun) -> topK(count_by(KeyFun), 10). +sorted_by(KeyFun, ValFun) -> + {Result, Acc} = group_by(KeyFun, ValFun), + {Result, topK(Acc, 10)}. +sorted_by(KeyFun, ValFun, AggFun) -> + {Result, Acc} = group_by(KeyFun, ValFun, AggFun), + {Result, topK(Acc, 10)}. to_json_list(List) when is_list(List) -> lists:map(fun csrt_util:to_json/1, List).
