This is an automated email from the ASF dual-hosted git repository. iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b0af4ef8960dafabd531be5018f2cdde6dcdba48 Author: ILYA Khlopotov <[email protected]> AuthorDate: Thu Jul 10 00:37:36 2025 -0700 Rewrite csrt_query to support declarative queries --- .../src/couch_stats_resource_tracker.hrl | 2 + src/couch_stats/src/csrt.erl | 462 ++++++++++----- src/couch_stats/src/csrt_entry.erl | 5 +- src/couch_stats/src/csrt_httpd.erl | 113 ++-- src/couch_stats/src/csrt_query.erl | 640 +++++++++++++++------ 5 files changed, 867 insertions(+), 355 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl b/src/couch_stats/src/couch_stats_resource_tracker.hrl index 9b52826c7..29ed684ac 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.hrl +++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl @@ -191,8 +191,10 @@ -type tuple_of_field_values() :: tuple(). -type tuple_of_field_names() :: tuple(). +-type field_value() :: any(). -type query_options() :: #{limit => pos_integer()}. -type aggregation_key() :: tuple_of_field_names(). +-type aggregation_value() :: field_value(). -type aggregation_values() :: tuple_of_field_values(). -type aggregation_result() :: #{aggregation_key() => non_neg_integer()}. -type ordered_result() :: [{aggregation_key(), non_neg_integer()}]. diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl index 1efdf5ec3..bcb1b229c 100644 --- a/src/couch_stats/src/csrt.erl +++ b/src/couch_stats/src/csrt.erl @@ -73,8 +73,8 @@ %% RPC api -export([ - rpc/2, - call/1 + rpc_run/1, + rpc_unsafe_run/1 ]). %% aggregate query api @@ -85,25 +85,12 @@ active_coordinators/1, active_workers/0, active_workers/1, - count_by/1, find_by_nonce/1, find_by_pid/1, find_by_pidref/1, find_workers_by_pidref/1, - group_by/2, - group_by/3, - query_group_by/3, - query_group_by/4, - query_sort_by/3, - query_sort_by/4, - query_count_by/2, - query_count_by/3, query_matcher/1, - query_matcher/2, - sorted/1, - sorted_by/1, - sorted_by/2, - sorted_by/3 + query_matcher/2 ]). %% Recon API Ports of https://github.com/ferd/recon/releases/tag/2.5.6 @@ -113,48 +100,92 @@ proc_window/3 ]). +-export([ + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 +]). + +-export_type([ + query/0, + query_expression/0, + query_option/0 +]). + +-opaque query() :: csrt_query:query(). +-opaque query_expression() :: csrt_query:query_expression(). +-opaque query_option() :: csrt_query:query_option(). + %% %% RPC operations %% --spec rpc(FName :: atom(), Args :: [any()]) -> - {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}], BadNodes :: [node()]}. -rpc(FName, Args) when is_atom(FName) andalso is_list(Args) -> - {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]), - {Results, Errors} = split_response(Resp), - {Results, lists:usort(Errors), BadNodes}. - -split_response(Resp) -> - lists:foldl(fun(Message, {Results, Errors}) -> - case Message of - {badrpc, _} = E -> - {Results, [E | Errors]}; - Result -> - {[Result | Results], Errors} - end - end, {[], []}, Resp). - -call({active, []}) -> {node(), active()}; -call({active, [json]}) -> {node(), active(json)}; -call({active_coordinators, []}) -> {node(), active_coordinators()}; -call({active_coordinators, [json]}) -> {node(), active_coordinators(json)}; -call({active_workers, []}) -> {node(), active_workers()}; -call({active_workers, [json]}) -> {node(), active_workers(json)}; -call({count_by, [Key]}) -> {node(), count_by(Key)}; -call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)}; -call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)}; -call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)}; -call({find_workers_by_pidref, [PidRef]}) -> {node(), find_workers_by_pidref(PidRef)}; -call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)}; -call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)}; -call({sorted, [Map]}) -> {node(), sorted(Map)}; -call({sorted_by, [Key]}) -> {node(), sorted_by(Key)}; -call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)}; -call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)}; -call({FunName, Args}) -> - FunNameBin = atom_to_binary(FunName), - ArityBin = integer_to_binary(length(Args)), - {error, <<"No such function '", FunNameBin/binary, "/", ArityBin/binary>>}. +-spec rpc_run(Query :: query()) -> + [#{ + node => node(), + result => [{aggregation_key(), pos_integer()}], + errors => [atom()] + }]. +rpc_run(Query) -> + Nodes = mem3:nodes(), + merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, run, [Query])). + +-spec rpc_unsafe_run(Query :: query()) -> + [#{ + node => node(), + result => [{aggregation_key(), pos_integer()}], + errors => [atom()] + }]. +rpc_unsafe_run(Query) -> + Nodes = mem3:nodes(), + merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, unsafe_run, [Query])). + +merge_results(Nodes, Resp) -> + %% The result of erpc:multicall is returned as a list where the result from each + %% node is placed at the same position as the node name is placed in Nodes. + %% That is why we can use `lists:zip/2` here. + lists:map(fun format_response/1, lists:zip(Nodes, Resp)). + +format_response({Node, {ok, {ok, Result}}}) -> + #{ + node => Node, + result => Result, + errors => [] + }; +format_response({Node, {ok, {error, Reason}}}) -> + #{ + node => Node, + result => none, + errors => [Reason] + }; +format_response({Node, {ok, Result}}) -> + #{ + node => Node, + result => Result, + errors => [] + }; +format_response({Node, {error, {erpc, Reason}}}) -> + #{ + node => Node, + result => none, + errors => [Reason] + }; +format_response({Node, {Tag, _}}) -> + #{ + node => Node, + result => none, + errors => [Tag] + }. %% %% PidRef operations @@ -450,10 +481,6 @@ active_workers() -> active_workers(Type) -> csrt_query:active_workers(Type). --spec count_by(Key :: string()) -> map(). -count_by(Key) -> - csrt_query:count_by(Key). - find_by_nonce(Nonce) -> csrt_query:find_by_nonce(Nonce). @@ -466,12 +493,6 @@ find_by_pidref(PidRef) -> find_workers_by_pidref(PidRef) -> csrt_query:find_workers_by_pidref(PidRef). -group_by(Key, Val) -> - csrt_query:group_by(Key, Val). - -group_by(Key, Val, Agg) -> - csrt_query:group_by(Key, Val, Agg). - -spec pid_ref_matchspec(AttrName :: rctx_field()) -> term() | throw(any()). pid_ref_matchspec(AttrName) -> csrt_logger:pid_ref_matchspec(AttrName). @@ -499,76 +520,6 @@ query_matcher(MatcherName) -> query_matcher(MatcherName, Limit) -> csrt_query:query_matcher(MatcherName, Limit). --spec query_group_by(MatcherName, AggregationKeys, ValueKey) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -query_group_by(MatcherName, AggregationKeys, ValueKey) -> - csrt_query:query_group_by(MatcherName, AggregationKeys, ValueKey, #{}). - --spec query_group_by(MatcherName, AggregationKeys, ValueKey, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -query_group_by(MatcherName, AggregationKeys, ValueKey, Options) -> - csrt_query:query_group_by(MatcherName, AggregationKeys, ValueKey, Options). - --spec query_sort_by(MatcherName, AggregationKeys, ValueKey) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -query_sort_by(MatcherName, AggregationKeys, ValueKey) -> - csrt_query:query_sort_by(MatcherName, AggregationKeys, ValueKey, #{}). - --spec query_sort_by(MatcherName, AggregationKeys, ValueKey, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -query_sort_by(MatcherName, AggregationKeys, ValueKey, Options) -> - csrt_query:query_sort_by(MatcherName, AggregationKeys, ValueKey, Options). - --spec query_count_by(MatcherName, AggregationKeys) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()]. -query_count_by(MatcherName, AggregationKeys) -> - csrt_query:query_count_by(MatcherName, AggregationKeys, #{}). - --spec query_count_by(MatcherName, AggregationKeys, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()]. -query_count_by(MatcherName, AggregationKeys, Options) -> - csrt_query:query_count_by(MatcherName, AggregationKeys, Options). - -sorted(Map) -> - csrt_query:sorted(Map). - -sorted_by(Key) -> - csrt_query:sorted_by(Key). - -sorted_by(Key, Val) -> - csrt_query:sorted_by(Key, Val). - -sorted_by(Key, Val, Agg) -> - csrt_query:sorted_by(Key, Val, Agg). - %% %% Delta API %% @@ -593,6 +544,249 @@ maybe_add_delta(T) -> maybe_add_delta(T, Delta) -> csrt_util:maybe_add_delta(T, Delta). +%% +%% Query API functions +%% +%% + +%% @doc Construct query from the expressions. +%% There are following types of expressions allowed in the query. +%% <li>group_by/1 @see group_by/1</li> +%% <li>group_by/2 @see group_by/1</li> +%% <li>sort_by/1 @see sort_by/1</li> +%% <li>count_by/1 @see count_by/1</li> +%% <li>options/1 @see options/1</li> +%% <li>from/1 @see from/1</li> +%% The order of expressions doesn't matter. +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% </code> +%% @end +-spec query(QueryExpression :: [query_expression()]) -> + query() | {error, any()}. +query(QueryExpression) -> + csrt_query:query(QueryExpression). + +%% @doc Specify the matcher to use for the query. +%% If atom 'all' is used then all entries would be in the scope of the query. +%% Also the use of 'all' makes the query 'unsafe'. Because it scans through all entries +%% and can return many matching rows. +%% Unsafe queries can only be run using 'unsafe_run/1'. +%% <code> +%% Q = query([ +%% ... +%% from("docs_read") +%% ]), +%% </code> +%% @end +-spec from(MatcherNameOrAll :: string() | all) -> + query_expression() | {error, any()}. +from(MatcherNameOrAll) -> + csrt_query:from(MatcherNameOrAll). + +%% @doc Request 'group_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% group_by([username, dbname]) +%% ]), +%% </code> +%% @end +-spec group_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +group_by(AggregationKeys) -> + csrt_query:group_by(AggregationKeys). + +%% @doc Request 'group_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% group_by([username, dbname], ioq_calls) +%% ]), +%% </code> +%% @end +-spec group_by(AggregationKeys, ValueKey) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + binary() + | rctx_field(). +group_by(AggregationKeys, ValueKey) -> + csrt_query:group_by(AggregationKeys, ValueKey). + +%% @doc Request 'sort_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% sort_by([username, dbname]) +%% ]), +%% </code> +%% @end +-spec sort_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +sort_by(AggregationKeys) -> + csrt_query:sort_by(AggregationKeys). + +%% @doc Request 'sort_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% sort_by([username, dbname], ioq_calls) +%% ]), +%% </code> +%% @end +-spec sort_by(AggregationKeys, ValueKey) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + binary() + | rctx_field(). +sort_by(AggregationKeys, ValueKey) -> + csrt_query:sort_by(AggregationKeys, ValueKey). + +%% @doc Request 'count_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% count_by(username) +%% ]), +%% </code> +%% @end +-spec count_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +count_by(AggregationKeys) -> + csrt_query:count_by(AggregationKeys). + +%% @doc Construct 'options' query expression. +%% There are following types of expressions allowed in the query. +%% <li>unlimited/0 @see unlimited/0 (cannot be used with 'with_limit/1')</li> +%% <li>with_limit/1 @see with_limit/1 (cannot be used with 'unlimited/0')</li> +%% The order of expressions doesn't matter. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% ... +%% ]) +%% ]), +%% </code> +%% @end +-spec options([query_option()]) -> + query_expression() | {error, any()}. +options(OptionsExpression) -> + csrt_query:options(OptionsExpression). + + +%% @doc Enable unlimited number of results from the query. +%% The use of 'unlimited' makes the query 'unsafe'. Because it can return many matching rows. +%% Unsafe queries can only be run using 'unsafe_run/1'. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% unlimited() +%% ]) +%% ]), +%% </code> +%% @end +-spec unlimited() -> + query_expression(). +unlimited() -> + csrt_query:unlimited(). + +%% @doc Set limit on number of results returned from the query. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% with_limit(100) +%% ]) +%% ]), +%% </code> +%% @end +-spec with_limit(Limit :: pos_integer()) -> + query_expression() | {error, any()}. +with_limit(Limit) -> + csrt_query:with_limit(Limit). + +%% @doc Executes provided query. Only 'safe' queries can be executed using 'run'. +%% The query considered 'unsafe' if any of the conditions bellow are met: +%% <li>Query uses 'unlimited/0'</li> +%% <li>Query uses 'from(all)'</li> +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% run(Q) +%% </code> +%% @end +-spec run(query()) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +run(Query) -> + csrt_query:run(Query). + +%% @doc Executes provided query. This function is similar to 'run/1', +%% however it supports 'unsafe' queries. Be very careful using it. +%% Pay attention to cardinality of the result. +%% The query considered 'unsafe' if any of the conditions bellow are met: +%% <li>Query uses 'unlimited/0'</li> +%% <li>Query uses 'from(all)'</li> +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% unsafe_run(Q) +%% </code> +%% @end +-spec unsafe_run(query()) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +unsafe_run(Query) -> + csrt_query:unsafe_run(Query). + %% %% Tests %% diff --git a/src/couch_stats/src/csrt_entry.erl b/src/couch_stats/src/csrt_entry.erl index 094e46ca7..f50d9559d 100644 --- a/src/couch_stats/src/csrt_entry.erl +++ b/src/couch_stats/src/csrt_entry.erl @@ -52,8 +52,9 @@ value(get_kp_node, #rctx{get_kp_node = Val}) -> Val; value(started_at, #rctx{started_at = Val}) -> Val; value(updated_at, #rctx{updated_at = Val}) -> Val. --spec key(BinKey :: binary() | string() | atom()) -> Key :: rctx_field() - | throw({bad_request, Reason :: binary()}). +-spec key(BinKey :: binary() | string() | atom()) -> + Key :: rctx_field() + | {error, Reason :: any()}. key(Key) when is_atom(Key) -> key_from_atom(Key); diff --git a/src/couch_stats/src/csrt_httpd.erl b/src/couch_stats/src/csrt_httpd.erl index a14d219fa..01077cf9d 100644 --- a/src/couch_stats/src/csrt_httpd.erl +++ b/src/couch_stats/src/csrt_httpd.erl @@ -24,37 +24,54 @@ ] ). -rpc_to_json({Resp, _Errors, _Nodes}) -> - #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [], <<"bad_nodes">> => []}. - -resp_to_json([{N, R} | Rest], Acc) -> - resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc)); -resp_to_json([], Acc) -> - Acc. +-import( + csrt, + [ + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1 + ] +). -% handle_resource_status_req(#httpd{method = 'GET', path_parts = [<<"_active_resources">>]} = Req) -> -% ok = chttpd:verify_is_server_admin(Req), -% %% TODO: incorporate Bad responses -% Resp = rpc_to_json(csrt:rpc(active, [json])), -% send_json(Req, Resp); handle_resource_status_req( - #httpd{method = 'POST', path_parts = [<<"_active_resources">>, <<"_match">>, MatcherName]} = Req + #httpd{method = 'POST', path_parts = [<<"_active_resources">>, <<"_match">>, MatcherNameBin]} = Req ) -> chttpd:validate_ctype(Req, "application/json"), {JsonProps} = chttpd:json_body_obj(Req), GroupBy = couch_util:get_value(<<"group_by">>, JsonProps), SortBy = couch_util:get_value(<<"sort_by">>, JsonProps), CountBy = couch_util:get_value(<<"count_by">>, JsonProps), - - case {GroupBy, SortBy, CountBy} of - {undefined, undefined, {Query}} -> - handle_count_by(Req, binary_to_list(MatcherName), Query); - {undefined, {Query}, undefined} -> - handle_sort_by(Req, binary_to_list(MatcherName), Query); - {{Query}, undefined, undefined} -> - handle_group_by(Req, binary_to_list(MatcherName), Query); + MatcherName = binary_to_list(MatcherNameBin), + {AggregationKeys, Query} = case {GroupBy, SortBy, CountBy} of + {undefined, undefined, {Props}} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + {Keys, csrt:query([from(MatcherName), count_by(Keys)])}; + {undefined, {Props}, undefined} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + CounterKey = couch_util:get_value(<<"counter_key">>, Props), + {Keys, query([from(MatcherName), sort_by(Keys, CounterKey)])}; + {{Props}, undefined, undefined} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + CounterKey = couch_util:get_value(<<"counter_key">>, Props), + {Keys, query([from(MatcherName), group_by(Keys, CounterKey)])}; {_, _, _} -> throw({bad_request, <<"Multiple aggregations are not supported">>}) + end, + case Query of + {error, Reason} -> + send_error(Req, Reason); + Q -> + JSON = to_json(AggregationKeys, csrt:rpc_run(Q)), + send_json(Req, JSON) end; handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} = Req) -> ok = chttpd:verify_is_server_admin(Req), @@ -64,39 +81,21 @@ handle_resource_status_req(Req) -> ok = chttpd:verify_is_server_admin(Req), send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_count_by(Req, MatcherName, CountBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy), - case csrt:query_count_by(MatcherName, AggregationKeys) of - {ok, Map} -> - send_json(Req, aggregation_result_to_json(AggregationKeys, Map)); - {error, Reason} -> - send_error(Req, Reason) - end. - -handle_sort_by(Req, MatcherName, SortBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy), - CounterKey = couch_util:get_value(<<"counter_key">>, SortBy), - case csrt:query_sort_by(MatcherName, AggregationKeys, CounterKey) of - {ok, Map} -> - send_json(Req, aggregation_result_to_json(AggregationKeys, Map)); - {error, Reason} -> - send_error(Req, Reason) - end. - -handle_group_by(Req, MatcherName, GroupBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy), - CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy), - case csrt:query_group_by(MatcherName, AggregationKeys, CounterKey) of - {ok, Map} -> - send_json(Req, aggregation_result_to_json(AggregationKeys, Map)); - {error, Reason} -> - send_error(Req, Reason) - end. - -%% [{{<<"user_foo">>,<<"eunit-test-db-3950c6fc68955a4b629cebbece5bdfac">>}, 10}] -% -type aggregation_result() :: #{aggregation_key() => non_neg_integer()}. -% -type ordered_result() :: [{aggregation_key(), non_neg_integer()}]. -% -type query_result() :: aggregation_result() | ordered_result(). +to_json(AggregationKeys, Results) -> + lists:map(fun(E) -> node_reply_to_json(AggregationKeys, E) end, Results). + +node_reply_to_json(_AggregationKeys, #{node := Node, result := none, errors := Errors}) -> + #{ + node => atom_to_binary(Node), + result => none, + errors => lists:map(fun erlang:atom_to_list/1, Errors) + }; +node_reply_to_json(AggregationKeys, #{node := Node, result := Result, errors := Errors}) -> + #{ + node => atom_to_binary(Node), + result => aggregation_result_to_json(AggregationKeys, Result), + errors => lists:map(fun erlang:atom_to_list/1, Errors) + }. encode_key(AggregationKeys, Key) -> maps:from_list(lists:zip(AggregationKeys, tuple_to_list(Key))). @@ -160,10 +159,10 @@ aggregation_result_to_json(AggregationKey, Ordered) when Ordered ). -send_error(Req, {unknown_matcher, Matcher}) -> +send_error(Req, [{unknown_matcher, Matcher} | _]) -> MatcherBin = list_to_binary(Matcher), chttpd:send_error(Req, {bad_request, <<"Unknown matcher '", MatcherBin/binary, "'">>}); -send_error(Req, {invalid_key, FieldName}) -> +send_error(Req, [{invalid_key, FieldName} | _]) -> chttpd:send_error(Req, {bad_request, <<"Unknown field name '", FieldName/binary, "'">>}); -send_error(Req, Reason) -> +send_error(Req, [Reason | _]) -> chttpd:send_error(Req, {error, Reason}). \ No newline at end of file diff --git a/src/couch_stats/src/csrt_query.erl b/src/couch_stats/src/csrt_query.erl index be95a93a4..7ca871e2d 100644 --- a/src/couch_stats/src/csrt_query.erl +++ b/src/couch_stats/src/csrt_query.erl @@ -25,27 +25,97 @@ active_coordinators/1, active_workers/0, active_workers/1, + all/0, - count_by/1, - count_by/2, find_by_nonce/1, find_by_pid/1, find_by_pidref/1, find_workers_by_pidref/1, - group_by/3, - group_by/4, - query_group_by/4, - query_sort_by/4, - query_count_by/3, + query_matcher/1, query_matcher/2, query_matcher_rows/1, query_matcher_rows/2, + + query/1, + from/1, + group_by/1, + group_by/2, sort_by/1, sort_by/2, - sort_by/3 + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 +]). + +-export_type([ + query/0, + query_expression/0, + query_option/0 ]). +-type aggregation_keys_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type value_key_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type count_key_fun() :: fun((A :: pos_integer(), B :: pos_integer()) -> pos_integer()). + +-record(selector, { + aggregation_keys = undefined :: + rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + rctx_field() + | undefined +}). + +-record(unsafe_selector, { + aggregation_keys = undefined :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + value_key_fun() + | rctx_field() + | undefined +}). + +-record(query_options, { + limit = undefined :: pos_integer() | unlimited | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-type aggregation() :: group_by | sort_by | count_by. + +-record(query, { + matcher = undefined :: matcher_name() | all | undefined, + selector = undefined :: #selector{} | #unsafe_selector{} | undefined, + limit = undefined :: pos_integer() | unlimited | undefined, + aggregation = undefined :: aggregation() | undefined, + is_safe = true :: boolean() +}). + +-record(from, { + matcher = undefined :: matcher_name() | all | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-opaque query() :: #query{}. +-opaque query_expression() :: + #from{} + | #query_options{} + | #selector{} + | #unsafe_selector{} + | query_option() + | {aggregation(), #selector{}} + | {aggregation(), #unsafe_selector{}}. +-opaque query_option() :: + {limit, pos_integer() | unlimited | undefined}. + %% %% Aggregate query API %% @@ -98,42 +168,39 @@ find_workers_by_pidref(PidRef) -> curry_field(Field) -> fun(Ele) -> csrt_entry:value(Field, Ele) end. -count_by(KeyFun) -> - csrt_query:count_by(all(), KeyFun). - --spec count_by(Matcher :: matcher(), KeyFun) -> - query_result() +-spec group_by(Matcher, KeyFun, ValFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()). -count_by(Matcher, KeyFun) -> - group_by(Matcher, KeyFun, fun(_) -> 1 end). - --spec group_by(KeyFun, ValFun) -> - query_result() -when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()). -group_by(KeyFun, ValFun) -> - csrt_query:group_by(all(), KeyFun, ValFun). - --spec group_by(Matcher :: matcher(), KeyFun, ValFun) -> - query_result() -when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()). + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(). group_by(Matcher, KeyFun, ValFun) -> - group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2). + AggFun = fun erlang:'+'/2, + group_by(Matcher, KeyFun, ValFun, AggFun). --spec group_by(Matcher :: matcher(), KeyFun, ValFun, AggFun) -> - query_result() +-spec group_by(Matcher, KeyFun, ValFun, AggFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(). group_by(Matcher, KeyFun, ValFun, AggFun) -> group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT). --spec all() -> matcher(). +-spec all() -> + matcher(). all() -> Spec = ets:fun2ms(fun(#rctx{} = R) -> R end), @@ -145,6 +212,22 @@ all() -> %% eg: group_by(all(), [username, dbname, mfa], docs_read). %% eg: group_by(all(), [username, dbname, mfa], ioq_calls). %% eg: group_by(all(), [username, dbname, mfa], js_filters). +-spec group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> + {ok, aggregation_result()} | {limit, aggregation_result()} +when + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(), + Limit :: pos_integer(). + + group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) -> KeyFun = fun(Ele) -> list_to_tuple([csrt_entry:value(Key, Ele) || Key <- KeyL]) end, group_by(Matcher, KeyFun, ValFun, AggFun, Limit); @@ -156,7 +239,7 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> FoldFun = fun(Ele, Acc) -> case maps:size(Acc) =< Limit of true -> - case maybe_match(Ele, Matcher) of + case ets_match(Ele, Matcher) of true -> Key = KeyFun(Ele), Val = ValFun(Ele), @@ -181,9 +264,7 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> {limit, Acc} end. -maybe_match(_Ele, undefined) -> - true; -maybe_match(Ele, {_, MS}) -> +ets_match(Ele, {_, MS}) -> ets:match_spec_run([Ele], MS) =/= []. %% @@ -192,10 +273,10 @@ maybe_match(Ele, {_, MS}) -> -record(topK, { % we store ordered elements in ascending order - seq = [] :: list(pos_integer()), + seq = [] :: list({aggregation_key(), pos_integer()}), % we rely on erlang sorting order where `number < atom` min = infinite :: infinite | pos_integer(), - max = 0 :: pos_integer(), + max = 0 :: non_neg_integer(), size = 0 :: non_neg_integer(), % capacity cannot be less than 1 capacity = 1 :: pos_integer() @@ -230,44 +311,369 @@ update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) -> get_topK(#topK{seq = S}) -> lists:reverse(S). +-spec topK(aggregation_result(), pos_integer()) -> + ordered_result(). topK(Results, K) -> TopK = maps:fold(fun update_topK/3, new_topK(K), Results), get_topK(TopK). --spec sort_by(KeyFun) -> - query_result() +%% +%% Query API functions +%% + +-spec from(MatcherName :: matcher_name() | all) -> + {ok, #{from => matcher_name() | all, is_unsafe => boolean()}} | {error, any()}. +from(all) -> + #from{matcher = all, is_safe = false}; +from(MatcherName) -> + case csrt_logger:get_matcher(MatcherName) of + undefined -> + {error, {unknown_matcher, MatcherName}}; + _ -> + #from{matcher = MatcherName, is_safe = true} + end. + +%% @doc Construct 'options' query expression. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% ... +%% ]) +%% ]), +%% </code> +%% @end +-spec options([query_option()]) -> + #query_options{} | {error, any()}. +options(Options) -> + lists:foldl(fun + (_, {error, _} = Error) -> + Error; + ({limit, unlimited}, Acc) -> + Acc#query_options{limit = unlimited, is_safe = false}; + ({limit, Limit}, {[], Acc}) when is_integer(Limit) -> + Acc#query_options{limit = Limit}; + ({error, _} = Error, _Acc) -> + Error + end, #query_options{is_safe = true}, Options). + +unlimited() -> + {limit, unlimited}. + +with_limit(Limit) when is_integer(Limit) -> + case Limit =< query_limit() of + true -> + {limit, Limit}; + false -> + {error, {beyond_limit, Limit}} + end; +with_limit(Limit) -> + {error, {invalid_limit, Limit}}. + +-spec count_by(AggregationKeys) -> + {count_by, #selector{}} | {count_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +count_by(AggregationKeys) -> + with_tag(select(AggregationKeys), count_by). + +-spec sort_by(AggregationKeys) -> + {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()). + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +sort_by(AggregationKeys) -> + with_tag(select(AggregationKeys), sort_by). + +-spec sort_by(AggregationKeys, ValueKey) -> + {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + value_key_fun() + | binary() + | rctx_field(). +sort_by(AggregationKeys, ValueKey) -> + with_tag(select(AggregationKeys, ValueKey), sort_by). + +-spec group_by(AggregationKeys) -> + {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +group_by(AggregationKeys) -> + with_tag(select(AggregationKeys), group_by). + +-spec group_by(AggregationKeys, ValueKey) -> + {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + value_key_fun() + | binary() + | rctx_field(). +group_by(AggregationKeys, ValueKey) -> + with_tag(select(AggregationKeys, ValueKey), group_by). + +query(Query) -> + % start assuming safe query and turn to unsafe when we detect issues + Acc = #query{is_safe = true}, + Result = lists:foldr(fun + ({Aggregation, #unsafe_selector{} = Selector}, {E, #query{selector = undefined} = Q}) -> + {E, Q#query{selector = Selector, is_safe = false, aggregation = Aggregation}}; + ({Aggregation, #unsafe_selector{}}, {E, Q}) -> + {[{more_than_once, {select, Aggregation}} | E], Q}; + ({Aggregation, #selector{} = Selector}, {E, #query{selector = undefined} = Q}) -> + {E, Q#query{selector = Selector, aggregation = Aggregation}}; + ({Aggregation, #selector{}}, {E, Q}) -> + {[{more_than_once, {select, Aggregation}} | E], Q}; + (#query_options{is_safe = false, limit = Limit}, {E, #query{limit = undefined} = Q}) -> + {E, Q#query{limit = Limit, is_safe = false}}; + (#query_options{limit = Limit}, {E, #query{limit = undefined} = Q}) -> + {E, Q#query{limit = Limit}}; + (#query_options{}, {E, Q}) -> + {[{more_than_once, options} | E], Q}; + (#from{matcher = Matcher, is_safe = false}, {E, #query{matcher = undefined} = Q}) -> + {E, Q#query{matcher = Matcher, is_safe = false}}; + (#from{matcher = Matcher}, {E, #query{matcher = undefined} = Q}) -> + {E, Q#query{matcher = Matcher}}; + (#from{}, {E, Q}) -> + {[{more_than_once, from} | E], Q}; + ({error, Reason}, {E, Q}) -> + {[Reason | E], Q} + end, {[], Acc}, Query), + case Result of + {[], #query{} = Q} -> + Q; + {Errors, _} -> + {error, Errors} + end. -%% eg: sort_by([username, dbname, type], ioq_calls) -%% eg: sort_by([dbname, type], doc_reads) -sort_by(KeyFun) -> - topK(count_by(KeyFun), 10). +-spec run(#query{}) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +run(#query{is_safe = true, matcher = MatcherName, selector = #selector{} = Selector, limit = Limit, aggregation = Aggregation}) -> + % we validated the presence of the matcher so this shouldn't fail + {ok, Matcher} = get_matcher(MatcherName), + case {Aggregation, Selector} of + {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + {Result, Acc} = group_by(Matcher, AKey, ValFun), + to_map({Result, topK(Acc, Limit)}); + {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + to_map({Result, topK(Acc, Limit)}); + {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + {Result, topK(Acc, Limit)}; + {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + {Result, Acc} = group_by(Matcher, AKey, ValFun), + to_map({Result, topK(Acc, Limit)}); + {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + to_map({Result, topK(Acc, Limit)}) + end; +run(#query{}) -> + {error, {unsafe_query, "Please use 'unsafe(Query)' instead if you really know what you are doing."}}. + +-spec unsafe_run(#query{}) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +unsafe_run(#query{selector = #unsafe_selector{} = Selector} = Query) -> + %% mutate the record (since all fields stay the same) + unsafe_run(Query#query{selector = setelement(1, Selector, selector)}); +unsafe_run(#query{matcher = MatcherName, selector = #selector{} = Selector, limit = Limit, aggregation = Aggregation}) -> + Matcher = choose_matcher(MatcherName), + case {Aggregation, Selector} of + {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit)); + {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit)); + {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit); + {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit)); + {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit)) + end. + +%% +%% Query API auxiliary functions +%% --spec sort_by(ValFun, AggFun) -> - query_result() +-spec select(AggregationKeys) -> + #selector{} | #unsafe_selector{} | {error, any()} when - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + AggregationKeys :: aggregation_keys_fun() | binary() | rctx_field() | [binary()] | [rctx_field()]. -sort_by(KeyFun, ValFun) -> - {Result, Acc} = group_by(KeyFun, ValFun), - {Result, topK(Acc, 10)}. +select(AggregationKeys) -> + maybe + {ok, AKey} ?= parse_aggregation_keys(AggregationKeys), + case is_safe_key(AKey) of + true -> + #selector{aggregation_keys = AKey}; + false -> + #unsafe_selector{aggregation_keys = AKey} + end + end. --spec sort_by(KeyFun, ValFun, AggFun) -> - query_result() +-spec select(AggregationKeys, ValueKey) -> + {ok, #selector{} | #unsafe_selector{}} | {error, any()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + AggregationKeys :: aggregation_keys_fun() | binary() | rctx_field() | [binary()] | [rctx_field()], + ValueKey :: value_key_fun() | binary() | rctx_field(). -sort_by(KeyFun, ValFun, AggFun) -> - {Result, Acc} = group_by(KeyFun, ValFun, AggFun), - {Result, topK(Acc, 10)}. +select(AggregationKeys, ValueKey) -> + maybe + {ok, AKey} ?= parse_aggregation_keys(AggregationKeys), + {ok, VKey} ?= parse_value_key(ValueKey), + case is_safe_key(AKey) andalso is_safe_key(VKey) of + true -> + #selector{aggregation_keys = AKey, value_key = VKey}; + false -> + #unsafe_selector{aggregation_keys = AKey, value_key = VKey} + end + end. -to_json_list(List) when is_list(List) -> - lists:map(fun csrt_entry:to_json/1, List). +is_safe_key(Fun) when is_function(Fun) -> + false; +is_safe_key(_) -> + true. + +parse_aggregation_keys(Fun) when is_function(Fun) -> + validate_fun(Fun, key_fun); +parse_aggregation_keys(Keys) -> + with_ok(parse_key(Keys)). + +parse_value_key(Fun) when is_function(Fun) -> + validate_fun(Fun, value_fun); +parse_value_key(Key) -> + case parse_key(Key) of + {error, _} = Error -> + Error; + Keys when is_list(Keys) -> + {error, multiple_value_keys}; + K -> + {ok, K} + end. +with_tag({error, _} = Error, _) -> + Error; +with_tag(Result, Tag) -> + {Tag, Result}. + +with_ok({error, _} = Error) -> + Error; +with_ok(Result) -> + {ok, Result}. + +validate_fun(Fun, Tag) when is_function(Fun, 1) -> + try Fun(#rctx{}) of + _ -> + {ok, Fun} + catch + _:_ -> + {error, {invalid_fun, Tag}} + end; +validate_fun(_Fun, Tag) -> + {error, {invalid_fun, Tag}}. + +choose_matcher(all) -> + all(); +choose_matcher(MatcherName) -> + % we validated the presence of the matcher so this shouldn't fail + {ok, Matcher} = get_matcher(MatcherName), + Matcher. + +-spec maybe_apply_limit(ResultsOrError, Limit) -> OrderedResultsOrError +when + ResultsOrError :: + {ok, aggregation_result()} + | {limit, aggregation_result()} + | {error, any()}, + Limit :: unlimited | undefined | pos_integer(), + OrderedResultsOrError :: + {ok, ordered_result()} + | {limit, ordered_result()} + | {ok, aggregation_result()} + | {limit, aggregation_result()} + | {error, any()}. + +maybe_apply_limit({Result, Results}, unlimited) -> + {Result, Results}; +maybe_apply_limit({Result, Results}, undefined) -> + {Result, topK(Results, query_limit())}; +maybe_apply_limit({Result, Results}, Limit) when is_integer(Limit) -> + {Result, topK(Results, Limit)}. + +-spec to_map(ResultsOrError) -> OrderedResultsOrError +when + ResultsOrError :: + {ok, ordered_result() | aggregation_result()} + | {limit, ordered_result() | aggregation_result()}, + OrderedResultsOrError :: + {ok, aggregation_result()} + | {limit, aggregation_result()}. +to_map({Result, Results}) when is_list(Results) -> + {Result, maps:from_list(Results)}; +to_map({Result, Results}) when is_map(Results) -> + {Result, Results}. + +-spec parse_key(Keys :: binary() | atom() | [binary()] | [atom()]) -> + rctx_field() + | [rctx_field()] + | {error, Reason :: any()}. + +parse_key([C | _] = Key) when is_integer(C) -> + csrt_entry:key(Key); +parse_key(Keys) when is_list(Keys) -> + parse_key(Keys, []); +parse_key(BinKey) when is_binary(BinKey) -> + csrt_entry:key(BinKey); +parse_key(undefined) -> + undefined; +parse_key(Key) when is_atom(Key) -> + csrt_entry:key(Key). + +parse_key([BinKey | Rest], Keys) -> + case csrt_entry:key(BinKey) of + {error, _} = Error -> + Error; + Key -> + parse_key(Rest, [Key | Keys]) + end; +parse_key([], Keys) -> + lists:reverse(Keys). + +%% +%% Scanning with matchers +%% -spec query_matcher(MatcherName :: string()) -> {ok, query_result()} | {error, any()}. @@ -323,102 +729,12 @@ get_matcher(MatcherName) -> {ok, Matcher} end. --spec query_group_by(MatcherName, AggregationKeys, ValueKey, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}} -query_group_by(MatcherName, AggregationKeys, ValueKey, Options) -> - AggregationKey = parse_key(AggregationKeys), - VKey = parse_key(ValueKey), - Limit = maps:get(limit, Options, query_limit()), - maybe - ok ?= validate_not_error(AggregationKey), - ok ?= validate_not_error(VKey), - ok ?= validate_limit(Limit), - {ok, Matcher} ?= get_matcher(MatcherName), - group_by(Matcher, AggregationKey, VKey) - end. - --spec query_sort_by(MatcherName, AggregationKeys, ValueKey, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()], - ValueKey :: binary() | rctx_field(). -%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}} -query_sort_by(MatcherName, AggregationKeys, ValueKey, Options) -> - AggregationKey = parse_key(AggregationKeys), - VKey = parse_key(ValueKey), - Limit = maps:get(limit, Options, query_limit()), - maybe - ok ?= validate_not_error(AggregationKey), - ok ?= validate_not_error(VKey), - ok ?= validate_limit(Limit), - {ok, Matcher} ?= get_matcher(MatcherName), - sort_by(Matcher, AggregationKey, VKey) - end. - --spec query_count_by(MatcherName, AggregationKeys, Options :: query_options()) -> - {ok, query_result()} - | {error, any()} -when - MatcherName :: string(), - AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()]. -%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}} -query_count_by(MatcherName, AggregationKeys, Options) -> - AggregationKey = parse_key(AggregationKeys), - Limit = maps:get(limit, Options, query_limit()), - maybe - ok ?= validate_not_error(AggregationKey), - ok ?= validate_limit(Limit), - {ok, Matcher} ?= get_matcher(MatcherName), - count_by(Matcher, AggregationKey) - end. - -validate_not_error({error, _} = Error) -> - Error; -validate_not_error(_) -> - ok. - +%% +%% Auxiliary functions +%% query_limit() -> config:get_integer(?CSRT, "query_limit", ?QUERY_LIMIT). -validate_limit(Limit) when is_integer(Limit) -> - case Limit =< query_limit() of - true -> - ok; - false -> - {error, {beyond_limit, query_limit()}} - end; -validate_limit(Limit) -> - {error, {invalid_limit, Limit}}. - --spec parse_key(Keys :: binary() | atom() | [binary()] | [atom()]) -> - [rctx_field()] - | throw({bad_request, Reason :: binary()}). - -parse_key([C | _] = Key) when is_integer(C)-> - csrt_entry:key(Key); -parse_key(Keys) when is_list(Keys) -> - parse_key(Keys, []); -parse_key(BinKey) when is_binary(BinKey) -> - csrt_entry:key(BinKey); -parse_key(undefined) -> - undefined; -parse_key(Key) when is_atom(Key) -> - csrt_entry:key(Key). +to_json_list(List) when is_list(List) -> + lists:map(fun csrt_entry:to_json/1, List). -parse_key([BinKey | Rest], Keys) -> - case csrt_entry:key(BinKey) of - {error, _} = Error -> - Error; - Key -> - parse_key(Rest, [Key | Keys]) - end; -parse_key([], Keys) -> - lists:reverse(Keys).
