This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a9bf98bead92de198f5175991442f51d22c7546c
Author: ILYA Khlopotov <[email protected]>
AuthorDate: Tue Jun 10 12:05:16 2025 -0700

    Update HTTP API
    
    Replace comparison with pattern match.
    
    Add QUERY_CARDINALITY_LIMIT to group_by
    
    Use sliding topK
    
    Pass matcher to group_by and friends
    
    Add RPC
    
    Add HTTP
---
 src/chttpd/src/chttpd_httpd_handlers.erl           |   2 +-
 .../src/couch_stats_resource_tracker.hrl           |   1 +
 src/couch_stats/src/csrt.erl                       |  50 +++++++
 src/couch_stats/src/csrt_httpd.erl                 | 165 +++++++++++++++++++++
 src/couch_stats/src/csrt_query.erl                 | 145 +++++++++++++-----
 5 files changed, 328 insertions(+), 35 deletions(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl 
b/src/chttpd/src/chttpd_httpd_handlers.erl
index e1b260222..9256f08b1 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -20,7 +20,7 @@ url_handler(<<"_utils">>) -> fun 
chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
 url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
-url_handler(<<"_active_resources">>) -> fun 
chttpd_misc:handle_resource_status_req/1;
+url_handler(<<"_active_resources">>) -> fun 
csrt_httpd:handle_resource_status_req/1;
 url_handler(<<"_scheduler">>) -> fun 
couch_replicator_httpd:handle_scheduler_req/1;
 url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun 
chttpd_misc:handle_reload_query_servers_req/1;
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl 
b/src/couch_stats/src/couch_stats_resource_tracker.hrl
index aec21115e..350438135 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.hrl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl
@@ -41,6 +41,7 @@
 -define(CONF_MATCHERS_ENABLED, "csrt_logger.matchers_enabled").
 -define(CONF_MATCHERS_THRESHOLD, "csrt_logger.matchers_threshold").
 -define(CONF_MATCHERS_DBNAMES, "csrt_logger.dbnames_io").
+-define(QUERY_CARDINALITY_LIMIT, 10_000).
 
 %% Mapping of couch_stat metric names to #rctx{} field names.
 %% These are used for fields that we inc a counter on.
diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl
index 603e444e9..123a3c0de 100644
--- a/src/couch_stats/src/csrt.erl
+++ b/src/couch_stats/src/csrt.erl
@@ -71,6 +71,12 @@
     should_track_init_p/1
 ]).
 
+%% RPC api
+-export([
+    rpc/2,
+    call/1
+]).
+
 %% aggregate query api
 -export([
     active/0,
@@ -99,6 +105,50 @@
     proc_window/3
 ]).
 
+%%
+%% RPC operations
+%%
+
+-spec rpc(FName :: atom(), Args :: [any()]) ->
+    {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}], 
BadNodes :: [node()]}.
+rpc(FName, Args) when is_atom(FName) andalso is_list(Args) ->
+    {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]),
+    {Results, Errors} = split_response(Resp),
+    {Results, lists:usort(Errors), BadNodes}.
+
+split_response(Resp) ->
+    lists:foldl(fun(Message, {Results, Errors}) ->
+        case Message of
+            {badrpc, _} = E ->
+                {Results, [E | Errors]};
+            Result ->
+                {[Result | Results], Errors}
+        end
+    end, {[], []}, Resp).
+
+call({active, []}) -> {node(), active()};
+call({active, [json]}) -> {node(), active(json)};
+call({active_coordinators, []}) -> {node(), active_coordinators()};
+call({active_coordinators, [json]}) -> {node(), active_coordinators(json)};
+call({active_workers, []}) -> {node(), active_workers()};
+call({active_workers, [json]}) -> {node(), active_workers(json)};
+call({count_by, [Key]}) -> {node(), count_by(Key)};
+call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)};
+call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)};
+call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)};
+call({find_workers_by_pidref, [PidRef]}) -> {node(), 
find_workers_by_pidref(PidRef)};
+call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)};
+call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)};
+call({sorted, [Map]}) -> {node(), sorted(Map)};
+call({sorted_by, [Key]}) -> {node(), sorted_by(Key)};
+call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)};
+call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)};
+call({FunName, Args}) ->
+    FunNameBin = atom_to_binary(FunName),
+    ArityBin = integer_to_binary(length(Args)),
+    {error, <<"No such function '"/binary, FunNameBin/binary, "/"/binary, 
ArityBin/binary>>}.
+
+
 %%
 %% PidRef operations
 %%
diff --git a/src/couch_stats/src/csrt_httpd.erl 
b/src/couch_stats/src/csrt_httpd.erl
new file mode 100644
index 000000000..c02e6295c
--- /dev/null
+++ b/src/couch_stats/src/csrt_httpd.erl
@@ -0,0 +1,165 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_httpd).
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_stats_resource_tracker.hrl").
+
+-export([handle_resource_status_req/1]).
+
+-import(
+    chttpd,
+    [
+        send_json/2, send_json/3,
+        send_method_not_allowed/2
+    ]
+).
+
+rpc_to_json({Resp, _Errors, _Nodes}) ->
+    #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [], 
<<"bad_nodes">> => []}.
+
+resp_to_json([{N, R} | Rest], Acc) ->
+    resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc));
+resp_to_json([], Acc) ->
+    Acc.
+
+% handle_resource_status_req(#httpd{method = 'GET', path_parts = 
[<<"_active_resources">>]} = Req) ->
+%     ok = chttpd:verify_is_server_admin(Req),
+%     %% TODO: incorporate Bad responses
+%     Resp = rpc_to_json(csrt:rpc(active, [json])),
+%     send_json(Req, Resp);
+handle_resource_status_req(#httpd{method = 'POST', path_parts = 
[<<"_active_resources">>, <<"_match">>, MatcherName]} = Req) ->
+    chttpd:validate_ctype(Req, "application/json"),
+    {JsonProps} = chttpd:json_body_obj(Req),
+    GroupBy = couch_util:get_value(<<"group_by">>, JsonProps),
+    SortBy = couch_util:get_value(<<"sort_by">>, JsonProps),
+    CountBy = couch_util:get_value(<<"count_by">>, JsonProps),
+
+    case {GroupBy, SortBy, CountBy} of
+        {undefined, undefined, {Query}} ->
+            handle_count_by(Req, MatcherName, Query);
+        {undefined, {Query}, undefined} ->
+            handle_sort_by(Req, MatcherName, Query);
+        {{Query}, undefined, undefined} ->
+            handle_group_by(Req, MatcherName, Query);
+        {_, _, _} ->
+            throw({bad_request, <<"Multiple aggregations are not supported">>})
+    end;
+handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} = 
Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    send_method_not_allowed(Req, "GET,HEAD");
+
+handle_resource_status_req(Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    send_method_not_allowed(Req, "GET,HEAD,POST").
+
+handle_count_by(Req, MatcherName, CountBy) ->
+    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy),
+    AggregationKey = parse_key(AggregationKeys),
+    case csrt_query:count_by(matching(MatcherName), AggregationKey) of
+        {ok, Map} ->
+            send_json(Req, {aggregation_result_to_json(Map)});
+        Else ->
+            %% TODO handle error
+            throw({bad_request, Else})
+    end.
+
+handle_sort_by(Req, MatcherName, SortBy) ->
+    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy),
+    CounterKey = couch_util:get_value(<<"counter_key">>, SortBy),
+    AggregationKey = parse_key(AggregationKeys),
+    ValueKey = parse_key(CounterKey),
+    case csrt_query:sort_by(matching(MatcherName), AggregationKey, ValueKey) of
+        {ok, Map} ->
+            send_json(Req, {aggregation_result_to_json(Map)});
+        Else ->
+            %% TODO handle error
+            throw({bad_request, Else})
+    end.
+
+handle_group_by(Req, MatcherName, GroupBy) ->
+    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy),
+    CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy),
+    AggregationKey = parse_key(AggregationKeys),
+    ValueKey = parse_key(CounterKey),
+    case csrt_query:group_by(matching(MatcherName), AggregationKey, ValueKey) 
of
+        {ok, Map} ->
+            send_json(Req, {aggregation_result_to_json(Map)});
+        Else ->
+            %% TODO handle error
+            throw({bad_request, Else})
+    end.
+
+aggregation_result_to_json(Map) when is_map(Map) ->
+    maps:fold(fun(K, V, Acc) -> [{key_to_string(K), V} | Acc] end, [], Map).
+
+key_to_string(Key) when is_tuple(Key) ->
+    list_to_binary(string:join([atom_to_list(K) || K <- tuple_to_list(Key)], 
","));
+key_to_string(Key) when is_atom(Key) ->
+    atom_to_binary(Key).
+
+matching(MatcherName) ->
+    case csrt_logger:get_matcher(binary_to_list(MatcherName)) of
+        undefined ->
+            throw({bad_request, <<"unknown matcher '"/binary, 
MatcherName/binary, "'"/binary>>});
+        Matcher ->
+            Matcher
+    end.
+
+% extract one of the predefined matchers
+%   - docs_read
+%   - rows_read
+%   - docs_written
+%   - worker_changes_processed
+%   - ioq_calls
+query_matcher(MatcherName, AggregationKey, CounterKey) ->
+    case csrt_logger:get_matcher(binary_to_list(MatcherName)) of
+        undefined ->
+            {error, <<"unknown matcher '"/binary, MatcherName/binary, 
"'"/binary>>};
+        Matcher ->
+            csrt_query:query_matcher(Matcher, AggregationKey, CounterKey)
+    end.
+
+-spec to_key(BinKey :: binary() | string()) -> Key :: rctx_field()
+    | throw({bad_request, Reason :: binary()}).
+
+to_key(<<"pid_ref">>) -> pid_ref;
+to_key(<<"nonce">>) -> nonce;
+to_key(<<"type">>) -> type;
+to_key(<<"dbname">>) -> dbname;
+to_key(<<"username">>) -> username;
+to_key(<<"db_open">>) -> db_open;
+to_key(<<"docs_read">>) -> docs_read;
+to_key(<<"rows_read">>) -> rows_read;
+to_key(<<"changes_returned">>) -> changes_returned;
+to_key(<<"ioq_calls">>) -> ioq_calls;
+to_key(<<"js_filter">>) -> js_filter;
+to_key(<<"js_filtered_docs">>) -> js_filtered_docs;
+to_key(<<"get_kv_node">>) -> get_kv_node;
+to_key(<<"get_kp_node">>) -> get_kp_node;
+to_key(Other) when is_binary(Other) -> throw({bad_request, <<"Invalid key 
'"/binary, Other/binary, "'"/binary>>}).
+
+-spec parse_key(Keys :: binary() | [binary()]) -> [rctx_field()]
+    | throw({bad_request, Reason :: binary()}).
+
+parse_key(Keys) when is_list(Keys) ->
+    parse_key(Keys, []);
+parse_key(BinKey) when is_binary(BinKey) ->
+    to_key(BinKey);
+parse_key(undefined) ->
+    undefined.
+
+parse_key([BinKey | Rest], Keys) ->
+    parse_key(Rest, [to_key(BinKey) | Keys]);
+parse_key([], Keys) ->
+    lists:reverse(Keys).
+
diff --git a/src/couch_stats/src/csrt_query.erl 
b/src/couch_stats/src/csrt_query.erl
index 5c1f7cd7a..4edf5365c 100644
--- a/src/couch_stats/src/csrt_query.erl
+++ b/src/couch_stats/src/csrt_query.erl
@@ -23,17 +23,17 @@
     active_coordinators/1,
     active_workers/0,
     active_workers/1,
-    count_by/1,
+    all/0,
+    count_by/2,
     find_by_nonce/1,
     find_by_pid/1,
     find_by_pidref/1,
     find_workers_by_pidref/1,
-    group_by/2,
     group_by/3,
-    sorted/1,
-    sorted_by/1,
+    group_by/4,
     sorted_by/2,
-    sorted_by/3
+    sorted_by/3,
+    sorted_by/4
 ]).
 
 %%
@@ -112,45 +112,122 @@ field(#rctx{updated_at = Val}, updated_at) -> Val.
 curry_field(Field) ->
     fun(Ele) -> field(Ele, Field) end.
 
-count_by(KeyFun) ->
-    group_by(KeyFun, fun(_) -> 1 end).
+count_by(Matcher, KeyFun) ->
+    group_by(Matcher, KeyFun, fun(_) -> 1 end).
 
-group_by(KeyFun, ValFun) ->
-    group_by(KeyFun, ValFun, fun erlang:'+'/2).
+group_by(Matcher, KeyFun, ValFun) ->
+    group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2).
 
-group_by(KeyL, ValFun, AggFun) when is_list(KeyL) ->
+group_by(Matcher, KeyFun, ValFun, AggFun) ->
+    group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT).
+
+-spec all() -> matcher().
+
+all() ->
+    Spec = ets:fun2ms(fun(#rctx{} = R) -> R end),
+    {Spec, ets:match_spec_compile(Spec)}.
+
+%% eg: group_by(all(), mfa, docs_read).
+%% eg: group_by(all(), fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, 
ioq_calls).
+%% eg: ^^ or: group_by(all(), [mfa, docs_read], ioq_calls).
+%% eg: group_by(all(), [username, dbname, mfa], docs_read).
+%% eg: group_by(all(), [username, dbname, mfa], ioq_calls).
+%% eg: group_by(all(), [username, dbname, mfa], js_filters).
+group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) ->
     KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end,
-    group_by(KeyFun, ValFun, AggFun);
-group_by(Key, ValFun, AggFun) when is_atom(Key) ->
-    group_by(curry_field(Key), ValFun, AggFun);
-group_by(KeyFun, Val, AggFun) when is_atom(Val) ->
-    group_by(KeyFun, curry_field(Val), AggFun);
-group_by(KeyFun, ValFun, AggFun) ->
+    group_by(Matcher, KeyFun, ValFun, AggFun, Limit);
+group_by(Matcher, Key, ValFun, AggFun, Limit) when is_atom(Key) ->
+    group_by(Matcher, curry_field(Key), ValFun, AggFun, Limit);
+group_by(Matcher, KeyFun, Val, AggFun, Limit) when is_atom(Val) ->
+    group_by(Matcher, KeyFun, curry_field(Val), AggFun, Limit);
+group_by(Matcher, KeyFun, ValFun, AggFun, Limit) ->
     FoldFun = fun(Ele, Acc) ->
-        Key = KeyFun(Ele),
-        Val = ValFun(Ele),
-        CurrVal = maps:get(Key, Acc, 0),
-        NewVal = AggFun(CurrVal, Val),
-        %% TODO: should we skip here? how to make this optional?
-        case NewVal > 0 of
+        case maps:size(Acc) =< Limit of
             true ->
-                maps:put(Key, NewVal, Acc);
+                case maybe_match(Ele, Matcher) of
+                    true ->
+                        Key = KeyFun(Ele),
+                        Val = ValFun(Ele),
+                        CurrVal = maps:get(Key, Acc, 0),
+                        case AggFun(CurrVal, Val) of
+                            0 ->
+                                Acc;
+                            NewVal ->
+                                maps:put(Key, NewVal, Acc)
+                        end;
+                    false ->
+                        Acc
+                end;
             false ->
-                Acc
-        end
+                throw({limit, Acc})
+            end
     end,
-    ets:foldl(FoldFun, #{}, ?CSRT_ETS).
+    try
+        {ok, ets:foldl(FoldFun, #{}, ?CSRT_ETS)}
+    catch throw:{limit, Acc} ->
+        {limit, Acc}
+    end.
 
-%% Sorts largest first
-sorted(Map) when is_map(Map) ->
-    lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)).
+maybe_match(_Ele, undefined) ->
+    true;
+maybe_match(Ele, {_, MS}) ->
+    ets:match_spec_run([Ele], MS) =/= [].
 
-shortened(L) ->
-    lists:sublist(L, 10).
+%%
+%% Auxiliary functions to calculate topK
+%%
 
-sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))).
-sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))).
-sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, 
AggFun))).
+-record(topK, {
+    % we store ordered elements in ascending order
+    seq = [] :: list(pos_integer()),
+    % we rely on erlang sorting order where `number < atom`
+    min = infinite  :: infinite | pos_integer(),
+    max = 0 :: pos_integer(),
+    size = 0 :: non_neg_integer(),
+    % capacity cannot be less than 1
+    capacity = 1 :: pos_integer()
+}).
+
+new_topK(K) when K >= 1 ->
+    #topK{capacity = K}.
+
+% when we are at capacity
+% don't bother adding the value since it is less than what we already saw
+update_topK(_Key, Value, #topK{size = S, capacity = S, min = Min} = Top) when 
Value < Min ->
+    Top#topK{min = Value};
+% when we are at capacity evict smallest value
+update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} = 
Top) when Value > Max ->
+    % capacity cannot be less than 1, so we can avoid handling the case when 
Seq is empty
+    [_ | Truncated] = Seq,
+    Top#topK{max = Value, seq = lists:keysort(2, [{Key, Value} | Truncated])};
+% when we are at capacity and value is in between min and max evict smallest 
value
+update_topK(Key, Value, #topK{size = S, capacity = S, seq = Seq} = Top) ->
+    % capacity cannot be less than 1, so we can avoid handling the case when 
Seq is empty
+    [_ | Truncated] = Seq,
+    Top#topK{seq = lists:keysort(2, [{Key, Value} | Truncated])};
+update_topK(Key, Value, #topK{size = S, min = Min, seq = Seq} = Top) when 
Value < Min ->
+    Top#topK{size = S + 1, min = Value, seq = lists:keysort(2, [{Key, Value} | 
Seq])};
+update_topK(Key, Value, #topK{size = S, max = Max, seq = Seq} = Top) when 
Value > Max ->
+    Top#topK{size = S + 1, max = Value, seq = lists:keysort(2, [{Key, Value} | 
Seq])};
+update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) ->
+    Top#topK{size = S + 1, seq = lists:keysort(2, [{Key, Value} | Seq])}.
+
+get_topK(#topK{seq = S}) ->
+    lists:reverse(S).
+
+topK(Results, K) ->
+    TopK = maps:fold(fun update_topK/3, new_topK(K), Results),
+    get_topK(TopK).
+
+%% eg: sorted_by([username, dbname, mfa], ioq_calls)
+%% eg: sorted_by([dbname, mfa], doc_reads)
+sorted_by(KeyFun) -> topK(count_by(KeyFun), 10).
+sorted_by(KeyFun, ValFun) ->
+    {Result, Acc} = group_by(KeyFun, ValFun),
+    {Result, topK(Acc, 10)}.
+sorted_by(KeyFun, ValFun, AggFun) ->
+    {Result, Acc} = group_by(KeyFun, ValFun, AggFun),
+    {Result, topK(Acc, 10)}.
 
 to_json_list(List) when is_list(List) ->
     lists:map(fun csrt_util:to_json/1, List).

Reply via email to