This is an automated email from the ASF dual-hosted git repository. iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-4 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 61676c2e772747b491bc0c3bf7103ffd57a4742f Author: ILYA Khlopotov <[email protected]> AuthorDate: Thu Jul 10 01:20:02 2025 -0700 Rewrite csrt_query to support declarative queries --- .../src/couch_stats_resource_tracker.hrl | 9 +- src/couch_stats/src/csrt.erl | 418 +++++++++++--- src/couch_stats/src/csrt_entry.erl | 6 +- src/couch_stats/src/csrt_httpd.erl | 235 ++++---- src/couch_stats/src/csrt_query.erl | 640 +++++++++++++++++---- 5 files changed, 1016 insertions(+), 292 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl b/src/couch_stats/src/couch_stats_resource_tracker.hrl index bf03a88b4..00a43349a 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.hrl +++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl @@ -194,4 +194,11 @@ -type query_options() :: #{aggregation => group_by | sort_by | count_by, limit => pos_integer()}. -type aggregation_key() :: tuple_of_field_names(). -type aggregation_values() :: tuple_of_field_values(). --type query_result() :: #{aggregation_key() => non_neg_integer()}. + +-type field_value() :: any(). +-type aggregation_value() :: field_value(). +-type aggregation_result() :: #{aggregation_key() => non_neg_integer()}. +-type ordered_result() :: [{aggregation_key(), non_neg_integer()}]. +-type query_result() :: aggregation_result() | ordered_result(). + +-type json_spec(_Spec) :: term(). diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl index 08678d248..e1cd84d12 100644 --- a/src/couch_stats/src/csrt.erl +++ b/src/couch_stats/src/csrt.erl @@ -73,8 +73,8 @@ %% RPC api -export([ - rpc/2, - call/1 + rpc_run/1, + rpc_unsafe_run/1 ]). %% aggregate query api @@ -85,21 +85,12 @@ active_coordinators/1, active_workers/0, active_workers/1, - count_by/1, find_by_nonce/1, find_by_pid/1, find_by_pidref/1, find_workers_by_pidref/1, - group_by/2, - group_by/3, - query/1, - query/2, query_matcher/1, - query_matcher/2, - sorted/1, - sorted_by/1, - sorted_by/2, - sorted_by/3 + query_matcher/2 ]). %% Recon API Ports of https://github.com/ferd/recon/releases/tag/2.5.6 @@ -109,48 +100,96 @@ proc_window/3 ]). +-export([ + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 +]). + +-export_type([ + query/0, + query_expression/0, + query_option/0 +]). + +-opaque query() :: csrt_query:query(). +-opaque query_expression() :: csrt_query:query_expression(). +-opaque query_option() :: csrt_query:query_option(). + %% %% RPC operations %% --spec rpc(FName :: atom(), Args :: [any()]) -> - {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}], BadNodes :: [node()]}. -rpc(FName, Args) when is_atom(FName) andalso is_list(Args) -> - {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]), - {Results, Errors} = split_response(Resp), - {Results, lists:usort(Errors), BadNodes}. - -split_response(Resp) -> - lists:foldl(fun(Message, {Results, Errors}) -> - case Message of - {badrpc, _} = E -> - {Results, [E | Errors]}; - Result -> - {[Result | Results], Errors} - end - end, {[], []}, Resp). - -call({active, []}) -> {node(), active()}; -call({active, [json]}) -> {node(), active(json)}; -call({active_coordinators, []}) -> {node(), active_coordinators()}; -call({active_coordinators, [json]}) -> {node(), active_coordinators(json)}; -call({active_workers, []}) -> {node(), active_workers()}; -call({active_workers, [json]}) -> {node(), active_workers(json)}; -call({count_by, [Key]}) -> {node(), count_by(Key)}; -call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)}; -call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)}; -call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)}; -call({find_workers_by_pidref, [PidRef]}) -> {node(), find_workers_by_pidref(PidRef)}; -call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)}; -call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)}; -call({sorted, [Map]}) -> {node(), sorted(Map)}; -call({sorted_by, [Key]}) -> {node(), sorted_by(Key)}; -call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)}; -call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)}; -call({FunName, Args}) -> - FunNameBin = atom_to_binary(FunName), - ArityBin = integer_to_binary(length(Args)), - {error, <<"No such function '", FunNameBin/binary, "/", ArityBin/binary>>}. +-spec rpc_run(Query :: query()) -> + [ + #{ + node => node(), + result => [{aggregation_key(), pos_integer()}], + errors => [atom()] + } + ]. +rpc_run(Query) -> + Nodes = mem3:nodes(), + merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, run, [Query])). + +-spec rpc_unsafe_run(Query :: query()) -> + [ + #{ + node => node(), + result => [{aggregation_key(), pos_integer()}], + errors => [atom()] + } + ]. +rpc_unsafe_run(Query) -> + Nodes = mem3:nodes(), + merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, unsafe_run, [Query])). + +merge_results(Nodes, Resp) -> + %% The result of erpc:multicall is returned as a list where the result from each + %% node is placed at the same position as the node name is placed in Nodes. + %% That is why we can use `lists:zip/2` here. + lists:map(fun format_response/1, lists:zip(Nodes, Resp)). + +format_response({Node, {ok, {ok, Result}}}) -> + #{ + node => Node, + result => Result, + errors => [] + }; +format_response({Node, {ok, {error, Reason}}}) -> + #{ + node => Node, + result => none, + errors => [Reason] + }; +format_response({Node, {ok, Result}}) -> + #{ + node => Node, + result => Result, + errors => [] + }; +format_response({Node, {error, {erpc, Reason}}}) -> + #{ + node => Node, + result => none, + errors => [Reason] + }; +format_response({Node, {Tag, _}}) -> + #{ + node => Node, + result => none, + errors => [Tag] + }. %% %% PidRef operations @@ -446,10 +485,6 @@ active_workers() -> active_workers(Type) -> csrt_query:active_workers(Type). --spec count_by(Key :: string()) -> map(). -count_by(Key) -> - csrt_query:count_by(Key). - find_by_nonce(Nonce) -> csrt_query:find_by_nonce(Nonce). @@ -462,12 +497,6 @@ find_by_pidref(PidRef) -> find_workers_by_pidref(PidRef) -> csrt_query:find_workers_by_pidref(PidRef). -group_by(Key, Val) -> - csrt_query:group_by(Key, Val). - -group_by(Key, Val, Agg) -> - csrt_query:group_by(Key, Val, Agg). - -spec pid_ref_matchspec(AttrName :: rctx_field()) -> term() | throw(any()). pid_ref_matchspec(AttrName) -> csrt_logger:pid_ref_matchspec(AttrName). @@ -485,37 +514,18 @@ pid_ref_attrs(AttrName) -> proc_window(AttrName, Num, Time) -> csrt_logger:proc_window(AttrName, Num, Time). --spec query_matcher(MatcherName :: matcher_name()) -> {ok, query_result()} +-spec query_matcher(MatcherName :: matcher_name()) -> + {ok, query_result()} | {error, any()}. query_matcher(MatcherName) -> csrt_query:query_matcher(MatcherName). --spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> {ok, query_result()} +-spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> + {ok, query_result()} | {error, any()}. query_matcher(MatcherName, Limit) -> csrt_query:query_matcher(MatcherName, Limit). --spec query(Keys :: string() | [string()], Options :: query_options()) -> {ok, query_result()} - | {error, any()}. -query(Keys) -> - csrt_query:query(Keys). - -%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} -query(Keys, Options) -> - csrt_query:query(Keys, Options). - -sorted(Map) -> - csrt_query:sorted(Map). - -sorted_by(Key) -> - csrt_query:sorted_by(Key). - -sorted_by(Key, Val) -> - csrt_query:sorted_by(Key, Val). - -sorted_by(Key, Val, Agg) -> - csrt_query:sorted_by(Key, Val, Agg). - %% %% Delta API %% @@ -540,6 +550,248 @@ maybe_add_delta(T) -> maybe_add_delta(T, Delta) -> csrt_util:maybe_add_delta(T, Delta). +%% +%% Query API functions +%% +%% + +%% @doc Construct query from the expressions. +%% There are following types of expressions allowed in the query. +%% <li>group_by/1 @see group_by/1</li> +%% <li>group_by/2 @see group_by/1</li> +%% <li>sort_by/1 @see sort_by/1</li> +%% <li>count_by/1 @see count_by/1</li> +%% <li>options/1 @see options/1</li> +%% <li>from/1 @see from/1</li> +%% The order of expressions doesn't matter. +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% </code> +%% @end +-spec query(QueryExpression :: [query_expression()]) -> + query() | {error, any()}. +query(QueryExpression) -> + csrt_query:query(QueryExpression). + +%% @doc Specify the matcher to use for the query. +%% If atom 'all' is used then all entries would be in the scope of the query. +%% Also the use of 'all' makes the query 'unsafe'. Because it scans through all entries +%% and can return many matching rows. +%% Unsafe queries can only be run using 'unsafe_run/1'. +%% <code> +%% Q = query([ +%% ... +%% from("docs_read") +%% ]), +%% </code> +%% @end +-spec from(MatcherNameOrAll :: string() | all) -> + query_expression() | {error, any()}. +from(MatcherNameOrAll) -> + csrt_query:from(MatcherNameOrAll). + +%% @doc Request 'group_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% group_by([username, dbname]) +%% ]), +%% </code> +%% @end +-spec group_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +group_by(AggregationKeys) -> + csrt_query:group_by(AggregationKeys). + +%% @doc Request 'group_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% group_by([username, dbname], ioq_calls) +%% ]), +%% </code> +%% @end +-spec group_by(AggregationKeys, ValueKey) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + binary() + | rctx_field(). +group_by(AggregationKeys, ValueKey) -> + csrt_query:group_by(AggregationKeys, ValueKey). + +%% @doc Request 'sort_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% sort_by([username, dbname]) +%% ]), +%% </code> +%% @end +-spec sort_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +sort_by(AggregationKeys) -> + csrt_query:sort_by(AggregationKeys). + +%% @doc Request 'sort_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% sort_by([username, dbname], ioq_calls) +%% ]), +%% </code> +%% @end +-spec sort_by(AggregationKeys, ValueKey) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + binary() + | rctx_field(). +sort_by(AggregationKeys, ValueKey) -> + csrt_query:sort_by(AggregationKeys, ValueKey). + +%% @doc Request 'count_by' aggregation of results. +%% <code> +%% Q = query([ +%% ... +%% count_by(username) +%% ]), +%% </code> +%% @end +-spec count_by(AggregationKeys) -> + query_expression() | {error, any()} +when + AggregationKeys :: + binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +count_by(AggregationKeys) -> + csrt_query:count_by(AggregationKeys). + +%% @doc Construct 'options' query expression. +%% There are following types of expressions allowed in the query. +%% <li>unlimited/0 @see unlimited/0 (cannot be used with 'with_limit/1')</li> +%% <li>with_limit/1 @see with_limit/1 (cannot be used with 'unlimited/0')</li> +%% The order of expressions doesn't matter. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% ... +%% ]) +%% ]), +%% </code> +%% @end +-spec options([query_option()]) -> + query_expression() | {error, any()}. +options(OptionsExpression) -> + csrt_query:options(OptionsExpression). + +%% @doc Enable unlimited number of results from the query. +%% The use of 'unlimited' makes the query 'unsafe'. Because it can return many matching rows. +%% Unsafe queries can only be run using 'unsafe_run/1'. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% unlimited() +%% ]) +%% ]), +%% </code> +%% @end +-spec unlimited() -> + query_expression(). +unlimited() -> + csrt_query:unlimited(). + +%% @doc Set limit on number of results returned from the query. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% with_limit(100) +%% ]) +%% ]), +%% </code> +%% @end +-spec with_limit(Limit :: pos_integer()) -> + query_expression() | {error, any()}. +with_limit(Limit) -> + csrt_query:with_limit(Limit). + +%% @doc Executes provided query. Only 'safe' queries can be executed using 'run'. +%% The query considered 'unsafe' if any of the conditions bellow are met: +%% <li>Query uses 'unlimited/0'</li> +%% <li>Query uses 'from(all)'</li> +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% run(Q) +%% </code> +%% @end +-spec run(query()) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +run(Query) -> + csrt_query:run(Query). + +%% @doc Executes provided query. This function is similar to 'run/1', +%% however it supports 'unsafe' queries. Be very careful using it. +%% Pay attention to cardinality of the result. +%% The query considered 'unsafe' if any of the conditions bellow are met: +%% <li>Query uses 'unlimited/0'</li> +%% <li>Query uses 'from(all)'</li> +%% <code> +%% Q = query([ +%% from("docs_read"), +%% group_by(username, dbname, ioq_calls), +%% options([ +%% with_limit(10) +%% ]) +%% ]), +%% unsafe_run(Q) +%% </code> +%% @end +-spec unsafe_run(query()) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +unsafe_run(Query) -> + csrt_query:unsafe_run(Query). + %% %% Tests %% diff --git a/src/couch_stats/src/csrt_entry.erl b/src/couch_stats/src/csrt_entry.erl index 094e46ca7..9e34911ba 100644 --- a/src/couch_stats/src/csrt_entry.erl +++ b/src/couch_stats/src/csrt_entry.erl @@ -52,8 +52,10 @@ value(get_kp_node, #rctx{get_kp_node = Val}) -> Val; value(started_at, #rctx{started_at = Val}) -> Val; value(updated_at, #rctx{updated_at = Val}) -> Val. --spec key(BinKey :: binary() | string() | atom()) -> Key :: rctx_field() - | throw({bad_request, Reason :: binary()}). +-spec key(BinKey :: binary() | string() | atom()) -> + Key :: + rctx_field() + | {error, Reason :: any()}. key(Key) when is_atom(Key) -> key_from_atom(Key); diff --git a/src/couch_stats/src/csrt_httpd.erl b/src/couch_stats/src/csrt_httpd.erl index 5c260d596..38588a0f0 100644 --- a/src/couch_stats/src/csrt_httpd.erl +++ b/src/couch_stats/src/csrt_httpd.erl @@ -24,123 +24,146 @@ ] ). -rpc_to_json({Resp, _Errors, _Nodes}) -> - #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [], <<"bad_nodes">> => []}. - -resp_to_json([{N, R} | Rest], Acc) -> - resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc)); -resp_to_json([], Acc) -> - Acc. +-import( + csrt, + [ + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1 + ] +). -% handle_resource_status_req(#httpd{method = 'GET', path_parts = [<<"_active_resources">>]} = Req) -> -% ok = chttpd:verify_is_server_admin(Req), -% %% TODO: incorporate Bad responses -% Resp = rpc_to_json(csrt:rpc(active, [json])), -% send_json(Req, Resp); -handle_resource_status_req(#httpd{method = 'POST', path_parts = [<<"_active_resources">>, <<"_match">>, MatcherName]} = Req) -> +handle_resource_status_req( + #httpd{method = 'POST', path_parts = [<<"_active_resources">>, <<"_match">>, MatcherNameBin]} = + Req +) -> chttpd:validate_ctype(Req, "application/json"), {JsonProps} = chttpd:json_body_obj(Req), GroupBy = couch_util:get_value(<<"group_by">>, JsonProps), SortBy = couch_util:get_value(<<"sort_by">>, JsonProps), CountBy = couch_util:get_value(<<"count_by">>, JsonProps), - - case {GroupBy, SortBy, CountBy} of - {undefined, undefined, {Query}} -> - handle_count_by(Req, MatcherName, Query); - {undefined, {Query}, undefined} -> - handle_sort_by(Req, MatcherName, Query); - {{Query}, undefined, undefined} -> - handle_group_by(Req, MatcherName, Query); - {_, _, _} -> - throw({bad_request, <<"Multiple aggregations are not supported">>}) + MatcherName = binary_to_list(MatcherNameBin), + {AggregationKeys, Query} = + case {GroupBy, SortBy, CountBy} of + {undefined, undefined, {Props}} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + {Keys, csrt:query([from(MatcherName), count_by(Keys)])}; + {undefined, {Props}, undefined} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + CounterKey = couch_util:get_value(<<"counter_key">>, Props), + {Keys, query([from(MatcherName), sort_by(Keys, CounterKey)])}; + {{Props}, undefined, undefined} -> + Keys = couch_util:get_value(<<"aggregate_keys">>, Props), + CounterKey = couch_util:get_value(<<"counter_key">>, Props), + {Keys, query([from(MatcherName), group_by(Keys, CounterKey)])}; + {_, _, _} -> + throw({bad_request, <<"Multiple aggregations are not supported">>}) + end, + case Query of + {error, Reason} -> + send_error(Req, Reason); + Q -> + JSON = to_json(AggregationKeys, csrt:rpc_run(Q)), + send_json(Req, JSON) end; handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} = Req) -> ok = chttpd:verify_is_server_admin(Req), send_method_not_allowed(Req, "GET,HEAD"); - handle_resource_status_req(Req) -> ok = chttpd:verify_is_server_admin(Req), send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_count_by(Req, MatcherName, CountBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy), - AggregationKey = parse_key(AggregationKeys), - case csrt_query:count_by(matching(MatcherName), AggregationKey) of - {ok, Map} -> - send_json(Req, {aggregation_result_to_json(Map)}); - Else -> - %% TODO handle error - throw({bad_request, Else}) - end. - -handle_sort_by(Req, MatcherName, SortBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy), - CounterKey = couch_util:get_value(<<"counter_key">>, SortBy), - AggregationKey = parse_key(AggregationKeys), - ValueKey = parse_key(CounterKey), - case csrt_query:sort_by(matching(MatcherName), AggregationKey, ValueKey) of - {ok, Map} -> - send_json(Req, {aggregation_result_to_json(Map)}); - Else -> - %% TODO handle error - throw({bad_request, Else}) - end. - -handle_group_by(Req, MatcherName, GroupBy) -> - AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy), - CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy), - AggregationKey = parse_key(AggregationKeys), - ValueKey = parse_key(CounterKey), - case csrt_query:group_by(matching(MatcherName), AggregationKey, ValueKey) of - {ok, Map} -> - send_json(Req, {aggregation_result_to_json(Map)}); - Else -> - %% TODO handle error - throw({bad_request, Else}) - end. - -aggregation_result_to_json(Map) when is_map(Map) -> - maps:fold(fun(K, V, Acc) -> [{key_to_string(K), V} | Acc] end, [], Map). - -key_to_string(Key) when is_tuple(Key) -> - list_to_binary(string:join([atom_to_list(K) || K <- tuple_to_list(Key)], ",")); -key_to_string(Key) when is_atom(Key) -> - atom_to_binary(Key). - -matching(MatcherName) -> - case csrt_logger:get_matcher(binary_to_list(MatcherName)) of - undefined -> - throw({bad_request, <<"unknown matcher '", MatcherName/binary, "'">>}); - Matcher -> - Matcher - end. - -% extract one of the predefined matchers -% - docs_read -% - rows_read -% - docs_written -% - worker_changes_processed -% - ioq_calls -query_matcher(MatcherName, AggregationKey, CounterKey) -> - case csrt_logger:get_matcher(binary_to_list(MatcherName)) of - undefined -> - {error, <<"unknown matcher '", MatcherName/binary, "'">>}; - Matcher -> - csrt_query:query_matcher(Matcher, AggregationKey, CounterKey) - end. - --spec parse_key(Keys :: binary() | [binary()]) -> [rctx_field()] - | throw({bad_request, Reason :: binary()}). - -parse_key(Keys) when is_list(Keys) -> - parse_key(Keys, []); -parse_key(BinKey) when is_binary(BinKey) -> - csrt_entry:key(BinKey); -parse_key(undefined) -> - undefined. - -parse_key([BinKey | Rest], Keys) -> - parse_key(Rest, [csrt_entry:key(BinKey) | Keys]); -parse_key([], Keys) -> - lists:reverse(Keys). - +to_json(AggregationKeys, Results) -> + lists:map(fun(E) -> node_reply_to_json(AggregationKeys, E) end, Results). + +node_reply_to_json(_AggregationKeys, #{node := Node, result := none, errors := Errors}) -> + #{ + node => atom_to_binary(Node), + result => none, + errors => lists:map(fun erlang:atom_to_list/1, Errors) + }; +node_reply_to_json(AggregationKeys, #{node := Node, result := Result, errors := Errors}) -> + #{ + node => atom_to_binary(Node), + result => aggregation_result_to_json(AggregationKeys, Result), + errors => lists:map(fun erlang:atom_to_list/1, Errors) + }. + +encode_key(AggregationKeys, Key) -> + maps:from_list(lists:zip(AggregationKeys, tuple_to_list(Key))). + +-spec aggregation_result_to_json(AggregationKeys :: binary() | [binary()], Map :: query_result()) -> + json_spec(#{ + value => non_neg_integer(), + key => #{ + username => string(), + dbname => string() + } + }). + +aggregation_result_to_json(AggregationKeys, Map) when + is_map(Map) andalso is_list(AggregationKeys) +-> + maps:fold( + fun(K, V, Acc) -> + [ + #{ + value => V, + key => encode_key(AggregationKeys, K) + } + | Acc + ] + end, + [], + Map + ); +aggregation_result_to_json(AggregationKey, Map) when + is_map(Map) andalso is_binary(AggregationKey) +-> + maps:fold( + fun(K, V, Acc) -> + [ + #{value => V, key => #{AggregationKey => K}} | Acc + ] + end, + [], + Map + ); +aggregation_result_to_json(AggregationKeys, Ordered) when + is_list(Ordered) andalso is_list(AggregationKeys) +-> + lists:map( + fun({K, V}) -> + #{ + value => V, + key => encode_key(AggregationKeys, K) + } + end, + Ordered + ); +aggregation_result_to_json(AggregationKey, Ordered) when + is_list(Ordered) andalso is_binary(AggregationKey) +-> + lists:map( + fun({K, V}) -> + #{value => V, key => #{AggregationKey => K}} + end, + Ordered + ). + +send_error(Req, [{unknown_matcher, Matcher} | _]) -> + MatcherBin = list_to_binary(Matcher), + chttpd:send_error(Req, {bad_request, <<"Unknown matcher '", MatcherBin/binary, "'">>}); +send_error(Req, [{invalid_key, FieldName} | _]) -> + chttpd:send_error(Req, {bad_request, <<"Unknown field name '", FieldName/binary, "'">>}); +send_error(Req, [Reason | _]) -> + chttpd:send_error(Req, {error, Reason}). diff --git a/src/couch_stats/src/csrt_query.erl b/src/couch_stats/src/csrt_query.erl index b461fedcb..bc87b8311 100644 --- a/src/couch_stats/src/csrt_query.erl +++ b/src/couch_stats/src/csrt_query.erl @@ -12,6 +12,8 @@ -module(csrt_query). +-feature(maybe_expr, enable). + -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("couch_stats_resource_tracker.hrl"). @@ -23,26 +25,97 @@ active_coordinators/1, active_workers/0, active_workers/1, + all/0, - count_by/1, - count_by/2, find_by_nonce/1, find_by_pid/1, find_by_pidref/1, find_workers_by_pidref/1, - group_by/3, - group_by/4, - query/1, - query/2, + query_matcher/1, query_matcher/2, query_matcher_rows/1, query_matcher_rows/2, + + query/1, + from/1, + group_by/1, + group_by/2, sort_by/1, sort_by/2, - sort_by/3 + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 +]). + +-export_type([ + query/0, + query_expression/0, + query_option/0 ]). +-type aggregation_keys_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type value_key_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | aggregation_value()). +-type count_key_fun() :: fun((A :: pos_integer(), B :: pos_integer()) -> pos_integer()). + +-record(selector, { + aggregation_keys = undefined :: + rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + rctx_field() + | undefined +}). + +-record(unsafe_selector, { + aggregation_keys = undefined :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()] + | undefined, + value_key = undefined :: + value_key_fun() + | rctx_field() + | undefined +}). + +-record(query_options, { + limit = undefined :: pos_integer() | unlimited | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-type aggregation() :: group_by | sort_by | count_by. + +-record(query, { + matcher = undefined :: matcher_name() | all | undefined, + selector = undefined :: #selector{} | #unsafe_selector{} | undefined, + limit = undefined :: pos_integer() | unlimited | undefined, + aggregation = undefined :: aggregation() | undefined, + is_safe = true :: boolean() +}). + +-record(from, { + matcher = undefined :: matcher_name() | all | undefined, + is_safe = undefined :: boolean() | undefined +}). + +-opaque query() :: #query{}. +-opaque query_expression() :: + #from{} + | #query_options{} + | #selector{} + | #unsafe_selector{} + | query_option() + | {aggregation(), #selector{}} + | {aggregation(), #unsafe_selector{}}. +-opaque query_option() :: + {limit, pos_integer() | unlimited | undefined}. + %% %% Aggregate query API %% @@ -95,42 +168,39 @@ find_workers_by_pidref(PidRef) -> curry_field(Field) -> fun(Ele) -> csrt_entry:value(Field, Ele) end. -count_by(KeyFun) -> - csrt_query:count_by(all(), KeyFun). - --spec count_by(Matcher :: matcher(), KeyFun) -> - query_result() -when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()). -count_by(Matcher, KeyFun) -> - group_by(Matcher, KeyFun, fun(_) -> 1 end). - --spec group_by(KeyFun, ValFun) -> - query_result() +-spec group_by(Matcher, KeyFun, ValFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()). -group_by(KeyFun, ValFun) -> - csrt_query:group_by(all(), KeyFun, ValFun). - --spec group_by(Matcher :: matcher(), KeyFun, ValFun) -> - query_result() -when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()). + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(). group_by(Matcher, KeyFun, ValFun) -> - group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2). + AggFun = fun erlang:'+'/2, + group_by(Matcher, KeyFun, ValFun, AggFun). --spec group_by(Matcher :: matcher(), KeyFun, ValFun, AggFun) -> - query_result() +-spec group_by(Matcher, KeyFun, ValFun, AggFun) -> + {ok, aggregation_result()} | {limit, aggregation_result()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(). group_by(Matcher, KeyFun, ValFun, AggFun) -> group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT). --spec all() -> matcher(). +-spec all() -> + matcher(). all() -> Spec = ets:fun2ms(fun(#rctx{} = R) -> R end), @@ -142,6 +212,21 @@ all() -> %% eg: group_by(all(), [username, dbname, mfa], docs_read). %% eg: group_by(all(), [username, dbname, mfa], ioq_calls). %% eg: group_by(all(), [username, dbname, mfa], js_filters). +-spec group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> + {ok, aggregation_result()} | {limit, aggregation_result()} +when + Matcher :: matcher(), + KeyFun :: + aggregation_keys_fun() + | rctx_field() + | [rctx_field()], + ValFun :: + value_key_fun() + | rctx_field(), + AggFun :: + count_key_fun(), + Limit :: pos_integer(). + group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) -> KeyFun = fun(Ele) -> list_to_tuple([csrt_entry:value(Key, Ele) || Key <- KeyL]) end, group_by(Matcher, KeyFun, ValFun, AggFun, Limit); @@ -153,7 +238,7 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> FoldFun = fun(Ele, Acc) -> case maps:size(Acc) =< Limit of true -> - case maybe_match(Ele, Matcher) of + case ets_match(Ele, Matcher) of true -> Key = KeyFun(Ele), Val = ValFun(Ele), @@ -169,17 +254,16 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) -> end; false -> throw({limit, Acc}) - end + end end, try {ok, ets:foldl(FoldFun, #{}, ?CSRT_ETS)} - catch throw:{limit, Acc} -> - {limit, Acc} + catch + throw:{limit, Acc} -> + {limit, Acc} end. -maybe_match(_Ele, undefined) -> - true; -maybe_match(Ele, {_, MS}) -> +ets_match(Ele, {_, MS}) -> ets:match_spec_run([Ele], MS) =/= []. %% @@ -188,10 +272,10 @@ maybe_match(Ele, {_, MS}) -> -record(topK, { % we store ordered elements in ascending order - seq = [] :: list(pos_integer()), + seq = [] :: list({aggregation_key(), pos_integer()}), % we rely on erlang sorting order where `number < atom` - min = infinite :: infinite | pos_integer(), - max = 0 :: pos_integer(), + min = infinite :: infinite | pos_integer(), + max = 0 :: non_neg_integer(), size = 0 :: non_neg_integer(), % capacity cannot be less than 1 capacity = 1 :: pos_integer() @@ -205,7 +289,9 @@ new_topK(K) when K >= 1 -> update_topK(_Key, Value, #topK{size = S, capacity = S, min = Min} = Top) when Value < Min -> Top#topK{min = Value}; % when we are at capacity evict smallest value -update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} = Top) when Value > Max -> +update_topK(Key, Value, #topK{size = S, capacity = S, max = Max, seq = Seq} = Top) when + Value > Max +-> % capacity cannot be less than 1, so we can avoid handling the case when Seq is empty [_ | Truncated] = Seq, Top#topK{max = Value, seq = lists:keysort(2, [{Key, Value} | Truncated])}; @@ -224,95 +310,449 @@ update_topK(Key, Value, #topK{size = S, seq = Seq} = Top) -> get_topK(#topK{seq = S}) -> lists:reverse(S). +-spec topK(aggregation_result(), pos_integer()) -> + ordered_result(). topK(Results, K) -> TopK = maps:fold(fun update_topK/3, new_topK(K), Results), get_topK(TopK). --spec sort_by(KeyFun) -> - query_result() +%% +%% Query API functions +%% + +-spec from(MatcherName :: matcher_name() | all) -> + {ok, #{from => matcher_name() | all, is_unsafe => boolean()}} | {error, any()}. +from(all) -> + #from{matcher = all, is_safe = false}; +from(MatcherName) -> + case csrt_logger:get_matcher(MatcherName) of + undefined -> + {error, {unknown_matcher, MatcherName}}; + _ -> + #from{matcher = MatcherName, is_safe = true} + end. + +%% @doc Construct 'options' query expression. +%% <code> +%% Q = query([ +%% ... +%% options([ +%% ... +%% ]) +%% ]), +%% </code> +%% @end +-spec options([query_option()]) -> + #query_options{} | {error, any()}. +options(Options) -> + lists:foldl( + fun + (_, {error, _} = Error) -> + Error; + ({limit, unlimited}, Acc) -> + Acc#query_options{limit = unlimited, is_safe = false}; + ({limit, Limit}, {[], Acc}) when is_integer(Limit) -> + Acc#query_options{limit = Limit}; + ({error, _} = Error, _Acc) -> + Error + end, + #query_options{is_safe = true}, + Options + ). + +unlimited() -> + {limit, unlimited}. + +with_limit(Limit) when is_integer(Limit) -> + case Limit =< query_limit() of + true -> + {limit, Limit}; + false -> + {error, {beyond_limit, Limit}} + end; +with_limit(Limit) -> + {error, {invalid_limit, Limit}}. + +-spec count_by(AggregationKeys) -> + {count_by, #selector{}} | {count_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +count_by(AggregationKeys) -> + with_tag(select(AggregationKeys), count_by). + +-spec sort_by(AggregationKeys) -> + {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()). + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +sort_by(AggregationKeys) -> + with_tag(select(AggregationKeys), sort_by). + +-spec sort_by(AggregationKeys, ValueKey) -> + {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + value_key_fun() + | binary() + | rctx_field(). +sort_by(AggregationKeys, ValueKey) -> + with_tag(select(AggregationKeys, ValueKey), sort_by). + +-spec group_by(AggregationKeys) -> + {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()]. +group_by(AggregationKeys) -> + with_tag(select(AggregationKeys), group_by). + +-spec group_by(AggregationKeys, ValueKey) -> + {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()} +when + AggregationKeys :: + aggregation_keys_fun() + | binary() + | rctx_field() + | [binary()] + | [rctx_field()], + ValueKey :: + value_key_fun() + | binary() + | rctx_field(). +group_by(AggregationKeys, ValueKey) -> + with_tag(select(AggregationKeys, ValueKey), group_by). + +query(Query) -> + % start assuming safe query and turn to unsafe when we detect issues + Acc = #query{is_safe = true}, + Result = lists:foldr( + fun + ({Aggregation, #unsafe_selector{} = Selector}, {E, #query{selector = undefined} = Q}) -> + {E, Q#query{selector = Selector, is_safe = false, aggregation = Aggregation}}; + ({Aggregation, #unsafe_selector{}}, {E, Q}) -> + {[{more_than_once, {select, Aggregation}} | E], Q}; + ({Aggregation, #selector{} = Selector}, {E, #query{selector = undefined} = Q}) -> + {E, Q#query{selector = Selector, aggregation = Aggregation}}; + ({Aggregation, #selector{}}, {E, Q}) -> + {[{more_than_once, {select, Aggregation}} | E], Q}; + (#query_options{is_safe = false, limit = Limit}, {E, #query{limit = undefined} = Q}) -> + {E, Q#query{limit = Limit, is_safe = false}}; + (#query_options{limit = Limit}, {E, #query{limit = undefined} = Q}) -> + {E, Q#query{limit = Limit}}; + (#query_options{}, {E, Q}) -> + {[{more_than_once, options} | E], Q}; + (#from{matcher = Matcher, is_safe = false}, {E, #query{matcher = undefined} = Q}) -> + {E, Q#query{matcher = Matcher, is_safe = false}}; + (#from{matcher = Matcher}, {E, #query{matcher = undefined} = Q}) -> + {E, Q#query{matcher = Matcher}}; + (#from{}, {E, Q}) -> + {[{more_than_once, from} | E], Q}; + ({error, Reason}, {E, Q}) -> + {[Reason | E], Q} + end, + {[], Acc}, + Query + ), + case Result of + {[], #query{} = Q} -> + Q; + {Errors, _} -> + {error, Errors} + end. -%% eg: sort_by([username, dbname, type], ioq_calls) -%% eg: sort_by([dbname, type], doc_reads) -sort_by(KeyFun) -> - topK(count_by(KeyFun), 10). +-spec run(#query{}) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +run(#query{ + is_safe = true, + matcher = MatcherName, + selector = #selector{} = Selector, + limit = Limit, + aggregation = Aggregation +}) -> + % we validated the presence of the matcher so this shouldn't fail + {ok, Matcher} = get_matcher(MatcherName), + case {Aggregation, Selector} of + {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + {Result, Acc} = group_by(Matcher, AKey, ValFun), + to_map({Result, topK(Acc, Limit)}); + {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + to_map({Result, topK(Acc, Limit)}); + {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + {Result, topK(Acc, Limit)}; + {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + {Result, Acc} = group_by(Matcher, AKey, ValFun), + to_map({Result, topK(Acc, Limit)}); + {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + {Result, Acc} = group_by(Matcher, AKey, VKey), + to_map({Result, topK(Acc, Limit)}) + end; +run(#query{}) -> + {error, + {unsafe_query, "Please use 'unsafe(Query)' instead if you really know what you are doing."}}. + +-spec unsafe_run(#query{}) -> + {ok, [{aggregation_key(), pos_integer()}]} + | {limit, [{aggregation_key(), pos_integer()}]}. +unsafe_run(#query{selector = #unsafe_selector{} = Selector} = Query) -> + %% mutate the record (since all fields stay the same) + unsafe_run(Query#query{selector = setelement(1, Selector, selector)}); +unsafe_run(#query{ + matcher = MatcherName, + selector = #selector{} = Selector, + limit = Limit, + aggregation = Aggregation +}) -> + Matcher = choose_matcher(MatcherName), + case {Aggregation, Selector} of + {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit)); + {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit)); + {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit); + {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} -> + ValFun = fun(_) -> 1 end, + to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit)); + {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} -> + to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit)) + end. --spec sort_by(ValFun, AggFun) -> - query_result() +%% +%% Query API auxiliary functions +%% + +-spec select(AggregationKeys) -> + #selector{} | #unsafe_selector{} | {error, any()} when - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + AggregationKeys :: + aggregation_keys_fun() | binary() | rctx_field() | [binary()] | [rctx_field()]. -sort_by(KeyFun, ValFun) -> - {Result, Acc} = group_by(KeyFun, ValFun), - {Result, topK(Acc, 10)}. +select(AggregationKeys) -> + maybe + {ok, AKey} ?= parse_aggregation_keys(AggregationKeys), + case is_safe_key(AKey) of + true -> + #selector{aggregation_keys = AKey}; + false -> + #unsafe_selector{aggregation_keys = AKey} + end + end. --spec sort_by(KeyFun, ValFun, AggFun) -> - query_result() +-spec select(AggregationKeys, ValueKey) -> + {ok, #selector{} | #unsafe_selector{}} | {error, any()} when - KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()), - ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()), - AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()). + AggregationKeys :: + aggregation_keys_fun() | binary() | rctx_field() | [binary()] | [rctx_field()], + ValueKey :: value_key_fun() | binary() | rctx_field(). + +select(AggregationKeys, ValueKey) -> + maybe + {ok, AKey} ?= parse_aggregation_keys(AggregationKeys), + {ok, VKey} ?= parse_value_key(ValueKey), + case is_safe_key(AKey) andalso is_safe_key(VKey) of + true -> + #selector{aggregation_keys = AKey, value_key = VKey}; + false -> + #unsafe_selector{aggregation_keys = AKey, value_key = VKey} + end + end. + +is_safe_key(Fun) when is_function(Fun) -> + false; +is_safe_key(_) -> + true. + +parse_aggregation_keys(Fun) when is_function(Fun) -> + validate_fun(Fun, key_fun); +parse_aggregation_keys(Keys) -> + with_ok(parse_key(Keys)). + +parse_value_key(Fun) when is_function(Fun) -> + validate_fun(Fun, value_fun); +parse_value_key(Key) -> + case parse_key(Key) of + {error, _} = Error -> + Error; + Keys when is_list(Keys) -> + {error, multiple_value_keys}; + K -> + {ok, K} + end. -sort_by(KeyFun, ValFun, AggFun) -> - {Result, Acc} = group_by(KeyFun, ValFun, AggFun), - {Result, topK(Acc, 10)}. +with_tag({error, _} = Error, _) -> + Error; +with_tag(Result, Tag) -> + {Tag, Result}. -to_json_list(List) when is_list(List) -> - lists:map(fun csrt_entry:to_json/1, List). +with_ok({error, _} = Error) -> + Error; +with_ok(Result) -> + {ok, Result}. + +validate_fun(Fun, Tag) when is_function(Fun, 1) -> + try Fun(#rctx{}) of + _ -> + {ok, Fun} + catch + _:_ -> + {error, {invalid_fun, Tag}} + end; +validate_fun(_Fun, Tag) -> + {error, {invalid_fun, Tag}}. + +choose_matcher(all) -> + all(); +choose_matcher(MatcherName) -> + % we validated the presence of the matcher so this shouldn't fail + {ok, Matcher} = get_matcher(MatcherName), + Matcher. + +-spec maybe_apply_limit(ResultsOrError, Limit) -> OrderedResultsOrError when + ResultsOrError :: + {ok, aggregation_result()} + | {limit, aggregation_result()} + | {error, any()}, + Limit :: unlimited | undefined | pos_integer(), + OrderedResultsOrError :: + {ok, ordered_result()} + | {limit, ordered_result()} + | {ok, aggregation_result()} + | {limit, aggregation_result()} + | {error, any()}. + +maybe_apply_limit({Result, Results}, unlimited) -> + {Result, Results}; +maybe_apply_limit({Result, Results}, undefined) -> + {Result, topK(Results, query_limit())}; +maybe_apply_limit({Result, Results}, Limit) when is_integer(Limit) -> + {Result, topK(Results, Limit)}. + +-spec to_map(ResultsOrError) -> OrderedResultsOrError when + ResultsOrError :: + {ok, ordered_result() | aggregation_result()} + | {limit, ordered_result() | aggregation_result()}, + OrderedResultsOrError :: + {ok, aggregation_result()} + | {limit, aggregation_result()}. +to_map({Result, Results}) when is_list(Results) -> + {Result, maps:from_list(Results)}; +to_map({Result, Results}) when is_map(Results) -> + {Result, Results}. + +-spec parse_key(Keys :: binary() | atom() | [binary()] | [atom()]) -> + rctx_field() + | [rctx_field()] + | {error, Reason :: any()}. + +parse_key([C | _] = Key) when is_integer(C) -> + csrt_entry:key(Key); +parse_key(Keys) when is_list(Keys) -> + parse_key(Keys, []); +parse_key(BinKey) when is_binary(BinKey) -> + csrt_entry:key(BinKey); +parse_key(undefined) -> + undefined; +parse_key(Key) when is_atom(Key) -> + csrt_entry:key(Key). + +parse_key([BinKey | Rest], Keys) -> + case csrt_entry:key(BinKey) of + {error, _} = Error -> + Error; + Key -> + parse_key(Rest, [Key | Keys]) + end; +parse_key([], Keys) -> + lists:reverse(Keys). --spec query_matcher(MatcherName :: string()) -> {ok, query_result()} +%% +%% Scanning with matchers +%% +-spec query_matcher(MatcherName :: string()) -> + {ok, query_result()} | {error, any()}. -query_matcher(MatcherName) when is_list(MatcherName) -> +query_matcher(MatcherName) when is_list(MatcherName) -> query_matcher(MatcherName, query_limit()). --spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> {ok, query_result()} +-spec query_matcher(MatcherName :: matcher_name(), Limit :: pos_integer()) -> + {ok, query_result()} | {error, any()}. query_matcher(MatcherName, Limit) when is_list(MatcherName) andalso is_integer(Limit) -> - case csrt_logger:get_matcher(MatcherName) of - undefined -> - {error, {unknown_matcher, MatcherName}}; - Matcher -> - query_matcher_rows(Matcher, Limit) + case get_matcher(MatcherName) of + {ok, Matcher} -> + query_matcher_rows(Matcher, Limit); + Error -> + Error end. --spec query_matcher_rows(Matcher :: matcher()) -> {ok, query_result()} +-spec query_matcher_rows(Matcher :: matcher()) -> + {ok, query_result()} | {error, any()}. query_matcher_rows(Matcher) -> query_matcher_rows(Matcher, query_limit()). --spec query_matcher_rows(Matcher :: matcher(), Limit :: pos_integer()) -> {ok, query_result()} +-spec query_matcher_rows(Matcher :: matcher(), Limit :: pos_integer()) -> + {ok, query_result()} | {error, any()}. -query_matcher_rows({MSpec, _CompMSpec}, Limit) when is_list(MSpec) andalso is_integer(Limit) andalso Limit >= 1 -> +query_matcher_rows({MSpec, _CompMSpec}, Limit) when + is_list(MSpec) andalso is_integer(Limit) andalso Limit >= 1 +-> try %% ets:select/* takes match_spec(), not comp_match_spec() %% use ets:select/3 to constrain to Limit rows, but we need to handle %% the continuation() style return type compared with ets:select/2. - Rctxs = case ets:select(?CSRT_ETS, MSpec, Limit) of - {Rctxs0, _Continuation} -> - Rctxs0; - %% Handle '$end_of_table' - _ -> - [] - end, + Rctxs = + case ets:select(?CSRT_ETS, MSpec, Limit) of + {Rctxs0, _Continuation} -> + Rctxs0; + %% Handle '$end_of_table' + _ -> + [] + end, {ok, to_json_list(Rctxs)} catch _:_ = Error -> {error, Error} end. --spec query(Keys :: string() | [string()]) -> {ok, query_result()} - | {error, any()}. -%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} -query(Keys) -> - query(Keys, []). - --spec query(Keys :: string() | [string()], Options :: query_options()) -> {ok, query_result()} - | {error, any()}. -%% #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2} -query(_Keys, _Options) -> - {error, todo}. +get_matcher(MatcherName) -> + case csrt_logger:get_matcher(MatcherName) of + undefined -> + {error, {unknown_matcher, MatcherName}}; + Matcher -> + {ok, Matcher} + end. +%% +%% Auxiliary functions +%% query_limit() -> config:get_integer(?CSRT, "query_limit", ?QUERY_LIMIT). + +to_json_list(List) when is_list(List) -> + lists:map(fun csrt_entry:to_json/1, List).
