This is an automated email from the ASF dual-hosted git repository.

iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b0af4ef8960dafabd531be5018f2cdde6dcdba48
Author: ILYA Khlopotov <[email protected]>
AuthorDate: Thu Jul 10 00:37:36 2025 -0700

    Rewrite csrt_query to support declarative queries
---
 .../src/couch_stats_resource_tracker.hrl           |   2 +
 src/couch_stats/src/csrt.erl                       | 462 ++++++++++-----
 src/couch_stats/src/csrt_entry.erl                 |   5 +-
 src/couch_stats/src/csrt_httpd.erl                 | 113 ++--
 src/couch_stats/src/csrt_query.erl                 | 640 +++++++++++++++------
 5 files changed, 867 insertions(+), 355 deletions(-)

diff --git a/src/couch_stats/src/couch_stats_resource_tracker.hrl 
b/src/couch_stats/src/couch_stats_resource_tracker.hrl
index 9b52826c7..29ed684ac 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.hrl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.hrl
@@ -191,8 +191,10 @@
 -type tuple_of_field_values() :: tuple().
 -type tuple_of_field_names() :: tuple().
 
+-type field_value() :: any().
 -type query_options() :: #{limit => pos_integer()}.
 -type aggregation_key() :: tuple_of_field_names().
+-type aggregation_value() :: field_value().
 -type aggregation_values() :: tuple_of_field_values().
 -type aggregation_result() :: #{aggregation_key() => non_neg_integer()}.
 -type ordered_result() :: [{aggregation_key(), non_neg_integer()}].
diff --git a/src/couch_stats/src/csrt.erl b/src/couch_stats/src/csrt.erl
index 1efdf5ec3..bcb1b229c 100644
--- a/src/couch_stats/src/csrt.erl
+++ b/src/couch_stats/src/csrt.erl
@@ -73,8 +73,8 @@
 
 %% RPC api
 -export([
-    rpc/2,
-    call/1
+    rpc_run/1,
+    rpc_unsafe_run/1
 ]).
 
 %% aggregate query api
@@ -85,25 +85,12 @@
     active_coordinators/1,
     active_workers/0,
     active_workers/1,
-    count_by/1,
     find_by_nonce/1,
     find_by_pid/1,
     find_by_pidref/1,
     find_workers_by_pidref/1,
-    group_by/2,
-    group_by/3,
-    query_group_by/3,
-    query_group_by/4,
-    query_sort_by/3,
-    query_sort_by/4,
-    query_count_by/2,
-    query_count_by/3,
     query_matcher/1,
-    query_matcher/2,
-    sorted/1,
-    sorted_by/1,
-    sorted_by/2,
-    sorted_by/3
+    query_matcher/2
 ]).
 
 %% Recon API Ports of https://github.com/ferd/recon/releases/tag/2.5.6
@@ -113,48 +100,92 @@
     proc_window/3
 ]).
 
+-export([
+    query/1,
+    from/1,
+    group_by/1,
+    group_by/2,
+    sort_by/1,
+    sort_by/2,
+    count_by/1,
+    options/1,
+    unlimited/0,
+    with_limit/1,
+
+    run/1,
+    unsafe_run/1
+]).
+
+-export_type([
+    query/0,
+    query_expression/0,
+    query_option/0
+]).
+
+-opaque query() :: csrt_query:query().
+-opaque query_expression() :: csrt_query:query_expression().
+-opaque query_option() :: csrt_query:query_option().
+
 %%
 %% RPC operations
 %%
 
--spec rpc(FName :: atom(), Args :: [any()]) ->
-    {[{node(), Result :: any()}], Errors :: [{badrpc, Reason :: any()}], 
BadNodes :: [node()]}.
-rpc(FName, Args) when is_atom(FName) andalso is_list(Args) ->
-    {Resp, BadNodes} = rpc:multicall(?MODULE, call, [{FName, Args}]),
-    {Results, Errors} = split_response(Resp),
-    {Results, lists:usort(Errors), BadNodes}.
-
-split_response(Resp) ->
-    lists:foldl(fun(Message, {Results, Errors}) ->
-        case Message of
-            {badrpc, _} = E ->
-                {Results, [E | Errors]};
-            Result ->
-                {[Result | Results], Errors}
-        end
-    end, {[], []}, Resp).
-
-call({active, []}) -> {node(), active()};
-call({active, [json]}) -> {node(), active(json)};
-call({active_coordinators, []}) -> {node(), active_coordinators()};
-call({active_coordinators, [json]}) -> {node(), active_coordinators(json)};
-call({active_workers, []}) -> {node(), active_workers()};
-call({active_workers, [json]}) -> {node(), active_workers(json)};
-call({count_by, [Key]}) -> {node(), count_by(Key)};
-call({find_by_nonce, [Nonce]}) -> {node(), find_by_nonce(Nonce)};
-call({find_by_pid, [Pid]}) -> {node(), find_by_pid(Pid)};
-call({find_by_pidref, [PidRef]}) -> {node(), find_by_pidref(PidRef)};
-call({find_workers_by_pidref, [PidRef]}) -> {node(), 
find_workers_by_pidref(PidRef)};
-call({group_by, [Key, Val]}) -> {node(), group_by(Key, Val)};
-call({group_by, [Key, Val, Agg]}) -> {node(), group_by(Key, Val, Agg)};
-call({sorted, [Map]}) -> {node(), sorted(Map)};
-call({sorted_by, [Key]}) -> {node(), sorted_by(Key)};
-call({sorted_by, [Key, Val]}) -> {node(), sorted_by(Key, Val)};
-call({sorted_by, [Key, Val, Agg]}) -> {node(), sorted_by(Key, Val, Agg)};
-call({FunName, Args}) ->
-    FunNameBin = atom_to_binary(FunName),
-    ArityBin = integer_to_binary(length(Args)),
-    {error, <<"No such function '", FunNameBin/binary, "/", ArityBin/binary>>}.
+-spec rpc_run(Query :: query()) ->
+    [#{
+        node => node(),
+        result => [{aggregation_key(), pos_integer()}],
+        errors => [atom()]
+    }].
+rpc_run(Query) ->
+    Nodes = mem3:nodes(),
+    merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, run, [Query])).
+
+-spec rpc_unsafe_run(Query :: query()) ->
+    [#{
+        node => node(),
+        result => [{aggregation_key(), pos_integer()}],
+        errors => [atom()]
+    }].
+rpc_unsafe_run(Query) ->
+    Nodes = mem3:nodes(),
+    merge_results(Nodes, erpc:multicall(Nodes, ?MODULE, unsafe_run, [Query])).
+
+merge_results(Nodes, Resp) ->
+    %% The result of erpc:multicall is returned as a list where the result 
from each
+    %% node is placed at the same position as the node name is placed in Nodes.
+    %% That is why we can use `lists:zip/2` here.
+    lists:map(fun format_response/1, lists:zip(Nodes, Resp)).
+
+format_response({Node, {ok, {ok, Result}}}) ->
+    #{
+        node => Node,
+        result => Result,
+        errors => []
+    };
+format_response({Node, {ok, {error, Reason}}}) ->
+    #{
+        node => Node,
+        result => none,
+        errors => [Reason]
+    };
+format_response({Node, {ok, Result}}) ->
+    #{
+        node => Node,
+        result => Result,
+        errors => []
+    };
+format_response({Node, {error, {erpc, Reason}}}) ->
+    #{
+        node => Node,
+        result => none,
+        errors => [Reason]
+    };
+format_response({Node, {Tag, _}}) ->
+    #{
+        node => Node,
+        result => none,
+        errors => [Tag]
+    }.
 
 %%
 %% PidRef operations
@@ -450,10 +481,6 @@ active_workers() ->
 active_workers(Type) ->
     csrt_query:active_workers(Type).
 
--spec count_by(Key :: string()) -> map().
-count_by(Key) ->
-    csrt_query:count_by(Key).
-
 find_by_nonce(Nonce) ->
     csrt_query:find_by_nonce(Nonce).
 
@@ -466,12 +493,6 @@ find_by_pidref(PidRef) ->
 find_workers_by_pidref(PidRef) ->
     csrt_query:find_workers_by_pidref(PidRef).
 
-group_by(Key, Val) ->
-    csrt_query:group_by(Key, Val).
-
-group_by(Key, Val, Agg) ->
-    csrt_query:group_by(Key, Val, Agg).
-
 -spec pid_ref_matchspec(AttrName :: rctx_field()) -> term() | throw(any()).
 pid_ref_matchspec(AttrName) ->
     csrt_logger:pid_ref_matchspec(AttrName).
@@ -499,76 +520,6 @@ query_matcher(MatcherName) ->
 query_matcher(MatcherName, Limit) ->
     csrt_query:query_matcher(MatcherName, Limit).
 
--spec query_group_by(MatcherName, AggregationKeys, ValueKey) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-query_group_by(MatcherName, AggregationKeys, ValueKey) ->
-    csrt_query:query_group_by(MatcherName, AggregationKeys, ValueKey, #{}).
-
--spec query_group_by(MatcherName, AggregationKeys, ValueKey, Options :: 
query_options()) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-query_group_by(MatcherName, AggregationKeys, ValueKey, Options) ->
-    csrt_query:query_group_by(MatcherName, AggregationKeys, ValueKey, Options).
-
--spec query_sort_by(MatcherName, AggregationKeys, ValueKey) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-query_sort_by(MatcherName, AggregationKeys, ValueKey) ->
-    csrt_query:query_sort_by(MatcherName, AggregationKeys, ValueKey, #{}).
-
--spec query_sort_by(MatcherName, AggregationKeys, ValueKey, Options :: 
query_options()) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-query_sort_by(MatcherName, AggregationKeys, ValueKey, Options) ->
-    csrt_query:query_sort_by(MatcherName, AggregationKeys, ValueKey, Options).
-
--spec query_count_by(MatcherName, AggregationKeys) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()].
-query_count_by(MatcherName, AggregationKeys) ->
-    csrt_query:query_count_by(MatcherName, AggregationKeys, #{}).
-
--spec query_count_by(MatcherName, AggregationKeys, Options :: query_options()) 
->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()].
-query_count_by(MatcherName, AggregationKeys, Options) ->
-    csrt_query:query_count_by(MatcherName, AggregationKeys, Options).
-
-sorted(Map) ->
-    csrt_query:sorted(Map).
-
-sorted_by(Key) ->
-    csrt_query:sorted_by(Key).
-
-sorted_by(Key, Val) ->
-    csrt_query:sorted_by(Key, Val).
-
-sorted_by(Key, Val, Agg) ->
-    csrt_query:sorted_by(Key, Val, Agg).
-
 %%
 %% Delta API
 %%
@@ -593,6 +544,249 @@ maybe_add_delta(T) ->
 maybe_add_delta(T, Delta) ->
     csrt_util:maybe_add_delta(T, Delta).
 
+%%
+%% Query API functions
+%%
+%%
+
+%% @doc Construct query from the expressions.
+%% There are following types of expressions allowed in the query.
+%%   <li>group_by/1 @see group_by/1</li>
+%%   <li>group_by/2 @see group_by/1</li>
+%%   <li>sort_by/1 @see sort_by/1</li>
+%%   <li>count_by/1 @see count_by/1</li>
+%%   <li>options/1 @see options/1</li>
+%%   <li>from/1 @see from/1</li>
+%% The order of expressions doesn't matter.
+%% <code>
+%% Q = query([
+%%    from("docs_read"),
+%%    group_by(username, dbname, ioq_calls),
+%%    options([
+%%       with_limit(10)
+%%    ])
+%% ]),
+%% </code>
+%% @end
+-spec query(QueryExpression :: [query_expression()]) ->
+    query() | {error, any()}.
+query(QueryExpression) ->
+    csrt_query:query(QueryExpression).
+
+%% @doc Specify the matcher to use for the query.
+%% If atom 'all' is used then all entries would be in the scope of the query.
+%% Also the use of 'all' makes the query 'unsafe'. Because it scans through 
all entries
+%% and can return many matching rows.
+%% Unsafe queries can only be run using 'unsafe_run/1'.
+%% <code>
+%% Q = query([
+%%    ...
+%%    from("docs_read")
+%% ]),
+%% </code>
+%% @end
+-spec from(MatcherNameOrAll :: string() | all) ->
+    query_expression() | {error, any()}.
+from(MatcherNameOrAll) ->
+    csrt_query:from(MatcherNameOrAll).
+
+%% @doc Request 'group_by' aggregation of results.
+%% <code>
+%% Q = query([
+%%    ...
+%%    group_by([username, dbname])
+%% ]),
+%% </code>
+%% @end
+-spec group_by(AggregationKeys) ->
+    query_expression() | {error, any()}
+when
+    AggregationKeys ::
+        binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+group_by(AggregationKeys) ->
+    csrt_query:group_by(AggregationKeys).
+
+%% @doc Request 'group_by' aggregation of results.
+%% <code>
+%% Q = query([
+%%    ...
+%%    group_by([username, dbname], ioq_calls)
+%% ]),
+%% </code>
+%% @end
+-spec group_by(AggregationKeys, ValueKey) ->
+    query_expression() | {error, any()}
+when
+    AggregationKeys ::
+        binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()],
+    ValueKey ::
+        binary()
+        | rctx_field().
+group_by(AggregationKeys, ValueKey) ->
+    csrt_query:group_by(AggregationKeys, ValueKey).
+
+%% @doc Request 'sort_by' aggregation of results.
+%% <code>
+%% Q = query([
+%%    ...
+%%    sort_by([username, dbname])
+%% ]),
+%% </code>
+%% @end
+-spec sort_by(AggregationKeys) ->
+    query_expression() | {error, any()}
+when
+    AggregationKeys ::
+        binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+sort_by(AggregationKeys) ->
+    csrt_query:sort_by(AggregationKeys).
+
+%% @doc Request 'sort_by' aggregation of results.
+%% <code>
+%% Q = query([
+%%    ...
+%%    sort_by([username, dbname], ioq_calls)
+%% ]),
+%% </code>
+%% @end
+-spec sort_by(AggregationKeys, ValueKey) ->
+    query_expression() | {error, any()}
+when
+    AggregationKeys ::
+        binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()],
+    ValueKey ::
+        binary()
+        | rctx_field().
+sort_by(AggregationKeys, ValueKey) ->
+    csrt_query:sort_by(AggregationKeys, ValueKey).
+
+%% @doc Request 'count_by' aggregation of results.
+%% <code>
+%% Q = query([
+%%    ...
+%%    count_by(username)
+%% ]),
+%% </code>
+%% @end
+-spec count_by(AggregationKeys) ->
+    query_expression() | {error, any()}
+when
+    AggregationKeys ::
+        binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+count_by(AggregationKeys) ->
+    csrt_query:count_by(AggregationKeys).
+
+%% @doc Construct 'options' query expression.
+%% There are following types of expressions allowed in the query.
+%%   <li>unlimited/0 @see unlimited/0 (cannot be used with 'with_limit/1')</li>
+%%   <li>with_limit/1 @see with_limit/1 (cannot be used with 
'unlimited/0')</li>
+%% The order of expressions doesn't matter.
+%% <code>
+%% Q = query([
+%%    ...
+%%    options([
+%%      ...
+%%    ])
+%% ]),
+%% </code>
+%% @end
+-spec options([query_option()]) ->
+    query_expression() | {error, any()}.
+options(OptionsExpression) ->
+    csrt_query:options(OptionsExpression).
+
+
+%% @doc Enable unlimited number of results from the query.
+%% The use of 'unlimited' makes the query 'unsafe'. Because it can return many 
matching rows.
+%% Unsafe queries can only be run using 'unsafe_run/1'.
+%% <code>
+%% Q = query([
+%%    ...
+%%    options([
+%%      unlimited()
+%%    ])
+%% ]),
+%% </code>
+%% @end
+-spec unlimited() ->
+    query_expression().
+unlimited() ->
+    csrt_query:unlimited().
+
+%% @doc Set limit on number of results returned from the query.
+%% <code>
+%% Q = query([
+%%    ...
+%%    options([
+%%      with_limit(100)
+%%    ])
+%% ]),
+%% </code>
+%% @end
+-spec with_limit(Limit :: pos_integer()) ->
+    query_expression() | {error, any()}.
+with_limit(Limit) ->
+    csrt_query:with_limit(Limit).
+
+%% @doc Executes provided query. Only 'safe' queries can be executed using 
'run'.
+%% The query considered 'unsafe' if any of the conditions bellow are met:
+%%   <li>Query uses 'unlimited/0'</li>
+%%   <li>Query uses 'from(all)'</li>
+%% <code>
+%% Q = query([
+%%    from("docs_read"),
+%%    group_by(username, dbname, ioq_calls),
+%%    options([
+%%       with_limit(10)
+%%    ])
+%% ]),
+%% run(Q)
+%% </code>
+%% @end
+-spec run(query()) ->
+    {ok, [{aggregation_key(), pos_integer()}]}
+    | {limit, [{aggregation_key(), pos_integer()}]}.
+run(Query) ->
+    csrt_query:run(Query).
+
+%% @doc Executes provided query. This function is similar to 'run/1',
+%% however it supports 'unsafe' queries. Be very careful using it.
+%% Pay attention to cardinality of the result.
+%% The query considered 'unsafe' if any of the conditions bellow are met:
+%%   <li>Query uses 'unlimited/0'</li>
+%%   <li>Query uses 'from(all)'</li>
+%% <code>
+%% Q = query([
+%%    from("docs_read"),
+%%    group_by(username, dbname, ioq_calls),
+%%    options([
+%%       with_limit(10)
+%%    ])
+%% ]),
+%% unsafe_run(Q)
+%% </code>
+%% @end
+-spec unsafe_run(query()) ->
+    {ok, [{aggregation_key(), pos_integer()}]}
+    | {limit, [{aggregation_key(), pos_integer()}]}.
+unsafe_run(Query) ->
+    csrt_query:unsafe_run(Query).
+
 %%
 %% Tests
 %%
diff --git a/src/couch_stats/src/csrt_entry.erl 
b/src/couch_stats/src/csrt_entry.erl
index 094e46ca7..f50d9559d 100644
--- a/src/couch_stats/src/csrt_entry.erl
+++ b/src/couch_stats/src/csrt_entry.erl
@@ -52,8 +52,9 @@ value(get_kp_node, #rctx{get_kp_node = Val}) -> Val;
 value(started_at, #rctx{started_at = Val}) -> Val;
 value(updated_at, #rctx{updated_at = Val}) -> Val.
 
--spec key(BinKey :: binary() | string() | atom()) -> Key :: rctx_field()
-    | throw({bad_request, Reason :: binary()}).
+-spec key(BinKey :: binary() | string() | atom()) ->
+    Key :: rctx_field()
+    | {error, Reason :: any()}.
 
 key(Key) when is_atom(Key) ->
     key_from_atom(Key);
diff --git a/src/couch_stats/src/csrt_httpd.erl 
b/src/couch_stats/src/csrt_httpd.erl
index a14d219fa..01077cf9d 100644
--- a/src/couch_stats/src/csrt_httpd.erl
+++ b/src/couch_stats/src/csrt_httpd.erl
@@ -24,37 +24,54 @@
     ]
 ).
 
-rpc_to_json({Resp, _Errors, _Nodes}) ->
-    #{<<"results">> => resp_to_json(Resp, #{}), <<"errors">> => [], 
<<"bad_nodes">> => []}.
-
-resp_to_json([{N, R} | Rest], Acc) ->
-    resp_to_json(Rest, maps:put(atom_to_binary(N), R, Acc));
-resp_to_json([], Acc) ->
-    Acc.
+-import(
+    csrt,
+    [
+        query/1,
+        from/1,
+        group_by/1,
+        group_by/2,
+        sort_by/1,
+        sort_by/2,
+        count_by/1,
+        options/1,
+        unlimited/0,
+        with_limit/1,
+
+        run/1
+    ]
+).
 
-% handle_resource_status_req(#httpd{method = 'GET', path_parts = 
[<<"_active_resources">>]} = Req) ->
-%     ok = chttpd:verify_is_server_admin(Req),
-%     %% TODO: incorporate Bad responses
-%     Resp = rpc_to_json(csrt:rpc(active, [json])),
-%     send_json(Req, Resp);
 handle_resource_status_req(
-    #httpd{method = 'POST', path_parts = [<<"_active_resources">>, 
<<"_match">>, MatcherName]} = Req
+    #httpd{method = 'POST', path_parts = [<<"_active_resources">>, 
<<"_match">>, MatcherNameBin]} = Req
 ) ->
     chttpd:validate_ctype(Req, "application/json"),
     {JsonProps} = chttpd:json_body_obj(Req),
     GroupBy = couch_util:get_value(<<"group_by">>, JsonProps),
     SortBy = couch_util:get_value(<<"sort_by">>, JsonProps),
     CountBy = couch_util:get_value(<<"count_by">>, JsonProps),
-
-    case {GroupBy, SortBy, CountBy} of
-        {undefined, undefined, {Query}} ->
-            handle_count_by(Req, binary_to_list(MatcherName), Query);
-        {undefined, {Query}, undefined} ->
-            handle_sort_by(Req, binary_to_list(MatcherName), Query);
-        {{Query}, undefined, undefined} ->
-            handle_group_by(Req, binary_to_list(MatcherName), Query);
+    MatcherName = binary_to_list(MatcherNameBin),
+    {AggregationKeys, Query} = case {GroupBy, SortBy, CountBy} of
+        {undefined, undefined, {Props}} ->
+            Keys = couch_util:get_value(<<"aggregate_keys">>, Props),
+            {Keys, csrt:query([from(MatcherName), count_by(Keys)])};
+        {undefined, {Props}, undefined} ->
+            Keys = couch_util:get_value(<<"aggregate_keys">>, Props),
+            CounterKey = couch_util:get_value(<<"counter_key">>, Props),
+            {Keys, query([from(MatcherName), sort_by(Keys, CounterKey)])};
+        {{Props}, undefined, undefined} ->
+            Keys = couch_util:get_value(<<"aggregate_keys">>, Props),
+            CounterKey = couch_util:get_value(<<"counter_key">>, Props),
+            {Keys, query([from(MatcherName), group_by(Keys, CounterKey)])};
         {_, _, _} ->
             throw({bad_request, <<"Multiple aggregations are not supported">>})
+    end,
+    case Query of
+        {error, Reason} ->
+            send_error(Req, Reason);
+        Q ->
+            JSON = to_json(AggregationKeys, csrt:rpc_run(Q)),
+            send_json(Req, JSON)
     end;
 handle_resource_status_req(#httpd{path_parts = [<<"_active_resources">>]} = 
Req) ->
     ok = chttpd:verify_is_server_admin(Req),
@@ -64,39 +81,21 @@ handle_resource_status_req(Req) ->
     ok = chttpd:verify_is_server_admin(Req),
     send_method_not_allowed(Req, "GET,HEAD,POST").
 
-handle_count_by(Req, MatcherName, CountBy) ->
-    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, CountBy),
-    case csrt:query_count_by(MatcherName, AggregationKeys) of
-        {ok, Map} ->
-            send_json(Req, aggregation_result_to_json(AggregationKeys, Map));
-        {error, Reason} ->
-            send_error(Req, Reason)
-    end.
-
-handle_sort_by(Req, MatcherName, SortBy) ->
-    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, SortBy),
-    CounterKey = couch_util:get_value(<<"counter_key">>, SortBy),
-    case csrt:query_sort_by(MatcherName, AggregationKeys, CounterKey) of
-        {ok, Map} ->
-            send_json(Req, aggregation_result_to_json(AggregationKeys, Map));
-        {error, Reason} ->
-            send_error(Req, Reason)
-    end.
-
-handle_group_by(Req, MatcherName, GroupBy) ->
-    AggregationKeys = couch_util:get_value(<<"aggregate_keys">>, GroupBy),
-    CounterKey = couch_util:get_value(<<"counter_key">>, GroupBy),
-    case csrt:query_group_by(MatcherName, AggregationKeys, CounterKey) of
-        {ok, Map} ->
-            send_json(Req, aggregation_result_to_json(AggregationKeys, Map));
-        {error, Reason} ->
-            send_error(Req, Reason)
-    end.
-
-%% [{{<<"user_foo">>,<<"eunit-test-db-3950c6fc68955a4b629cebbece5bdfac">>}, 
10}]
-% -type aggregation_result() :: #{aggregation_key() => non_neg_integer()}.
-% -type ordered_result() :: [{aggregation_key(), non_neg_integer()}].
-% -type query_result() :: aggregation_result() | ordered_result().
+to_json(AggregationKeys, Results) ->
+    lists:map(fun(E) -> node_reply_to_json(AggregationKeys, E) end, Results).
+
+node_reply_to_json(_AggregationKeys, #{node := Node, result := none, errors := 
Errors}) ->
+    #{
+        node => atom_to_binary(Node),
+        result => none,
+        errors => lists:map(fun erlang:atom_to_list/1, Errors)
+    };
+node_reply_to_json(AggregationKeys, #{node := Node, result := Result, errors 
:= Errors}) ->
+    #{
+        node => atom_to_binary(Node),
+        result => aggregation_result_to_json(AggregationKeys, Result),
+        errors => lists:map(fun erlang:atom_to_list/1, Errors)
+    }.
 
 encode_key(AggregationKeys, Key) ->
     maps:from_list(lists:zip(AggregationKeys, tuple_to_list(Key))).
@@ -160,10 +159,10 @@ aggregation_result_to_json(AggregationKey, Ordered) when
         Ordered
     ).
 
-send_error(Req, {unknown_matcher, Matcher}) ->
+send_error(Req, [{unknown_matcher, Matcher} | _]) ->
     MatcherBin = list_to_binary(Matcher),
     chttpd:send_error(Req, {bad_request, <<"Unknown matcher '", 
MatcherBin/binary, "'">>});
-send_error(Req, {invalid_key, FieldName}) ->
+send_error(Req, [{invalid_key, FieldName} | _]) ->
     chttpd:send_error(Req, {bad_request, <<"Unknown field name '", 
FieldName/binary, "'">>});
-send_error(Req, Reason) ->
+send_error(Req, [Reason | _]) ->
     chttpd:send_error(Req, {error, Reason}).
\ No newline at end of file
diff --git a/src/couch_stats/src/csrt_query.erl 
b/src/couch_stats/src/csrt_query.erl
index be95a93a4..7ca871e2d 100644
--- a/src/couch_stats/src/csrt_query.erl
+++ b/src/couch_stats/src/csrt_query.erl
@@ -25,27 +25,97 @@
     active_coordinators/1,
     active_workers/0,
     active_workers/1,
+
     all/0,
-    count_by/1,
-    count_by/2,
     find_by_nonce/1,
     find_by_pid/1,
     find_by_pidref/1,
     find_workers_by_pidref/1,
-    group_by/3,
-    group_by/4,
-    query_group_by/4,
-    query_sort_by/4,
-    query_count_by/3,
+
     query_matcher/1,
     query_matcher/2,
     query_matcher_rows/1,
     query_matcher_rows/2,
+
+    query/1,
+    from/1,
+    group_by/1,
+    group_by/2,
     sort_by/1,
     sort_by/2,
-    sort_by/3
+    count_by/1,
+    options/1,
+    unlimited/0,
+    with_limit/1,
+
+    run/1,
+    unsafe_run/1
+]).
+
+-export_type([
+    query/0,
+    query_expression/0,
+    query_option/0
 ]).
 
+-type aggregation_keys_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | 
aggregation_value()).
+-type value_key_fun() :: fun((Ele :: #rctx{}) -> aggregation_values() | 
aggregation_value()).
+-type count_key_fun() :: fun((A :: pos_integer(), B :: pos_integer()) -> 
pos_integer()).
+
+-record(selector, {
+    aggregation_keys = undefined ::
+        rctx_field()
+        | [rctx_field()]
+        | undefined,
+    value_key = undefined ::
+        rctx_field()
+        | undefined
+}).
+
+-record(unsafe_selector, {
+    aggregation_keys = undefined ::
+        aggregation_keys_fun()
+        | rctx_field()
+        | [rctx_field()]
+        | undefined,
+    value_key = undefined ::
+        value_key_fun()
+        | rctx_field()
+        | undefined
+}).
+
+-record(query_options, {
+    limit = undefined :: pos_integer() | unlimited | undefined,
+    is_safe = undefined :: boolean() | undefined
+}).
+
+-type aggregation() :: group_by | sort_by | count_by.
+
+-record(query, {
+    matcher = undefined :: matcher_name() | all | undefined,
+    selector = undefined :: #selector{} | #unsafe_selector{} | undefined,
+    limit = undefined :: pos_integer() | unlimited | undefined,
+    aggregation = undefined :: aggregation() | undefined,
+    is_safe = true :: boolean()
+}).
+
+-record(from, {
+    matcher = undefined :: matcher_name() | all | undefined,
+    is_safe = undefined :: boolean() | undefined
+}).
+
+-opaque query() :: #query{}.
+-opaque query_expression() ::
+    #from{}
+    | #query_options{}
+    | #selector{}
+    | #unsafe_selector{}
+    | query_option()
+    | {aggregation(), #selector{}}
+    | {aggregation(), #unsafe_selector{}}.
+-opaque query_option() ::
+    {limit, pos_integer() | unlimited | undefined}.
+
 %%
 %% Aggregate query API
 %%
@@ -98,42 +168,39 @@ find_workers_by_pidref(PidRef) ->
 curry_field(Field) ->
     fun(Ele) -> csrt_entry:value(Field, Ele) end.
 
-count_by(KeyFun) ->
-    csrt_query:count_by(all(), KeyFun).
-
--spec count_by(Matcher :: matcher(), KeyFun) ->
-    query_result()
+-spec group_by(Matcher, KeyFun, ValFun) ->
+    {ok, aggregation_result()} | {limit, aggregation_result()}
 when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()).
-count_by(Matcher, KeyFun) ->
-    group_by(Matcher, KeyFun, fun(_) -> 1 end).
-
--spec group_by(KeyFun, ValFun) ->
-    query_result()
-when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()),
-    ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()).
-group_by(KeyFun, ValFun) ->
-    csrt_query:group_by(all(), KeyFun, ValFun).
-
--spec group_by(Matcher :: matcher(), KeyFun, ValFun) ->
-    query_result()
-when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()),
-    ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()).
+    Matcher :: matcher(),
+    KeyFun ::
+        aggregation_keys_fun()
+        | rctx_field()
+        | [rctx_field()],
+    ValFun ::
+        value_key_fun()
+        | rctx_field().
 group_by(Matcher, KeyFun, ValFun) ->
-    group_by(Matcher, KeyFun, ValFun, fun erlang:'+'/2).
+    AggFun = fun erlang:'+'/2,
+    group_by(Matcher, KeyFun, ValFun, AggFun).
 
--spec group_by(Matcher :: matcher(), KeyFun, ValFun, AggFun) ->
-    query_result()
+-spec group_by(Matcher, KeyFun, ValFun, AggFun) ->
+    {ok, aggregation_result()} | {limit, aggregation_result()}
 when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()),
-    ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()),
-    AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()).
+    Matcher :: matcher(),
+    KeyFun ::
+        aggregation_keys_fun()
+        | rctx_field()
+        | [rctx_field()],
+    ValFun ::
+        value_key_fun()
+        | rctx_field(),
+    AggFun ::
+        count_key_fun().
 group_by(Matcher, KeyFun, ValFun, AggFun) ->
     group_by(Matcher, KeyFun, ValFun, AggFun, ?QUERY_CARDINALITY_LIMIT).
 
--spec all() -> matcher().
+-spec all() ->
+    matcher().
 
 all() ->
     Spec = ets:fun2ms(fun(#rctx{} = R) -> R end),
@@ -145,6 +212,22 @@ all() ->
 %% eg: group_by(all(), [username, dbname, mfa], docs_read).
 %% eg: group_by(all(), [username, dbname, mfa], ioq_calls).
 %% eg: group_by(all(), [username, dbname, mfa], js_filters).
+-spec group_by(Matcher, KeyFun, ValFun, AggFun, Limit) ->
+    {ok, aggregation_result()} | {limit, aggregation_result()}
+when
+    Matcher :: matcher(),
+    KeyFun ::
+        aggregation_keys_fun()
+        | rctx_field()
+        | [rctx_field()],
+    ValFun ::
+        value_key_fun()
+        | rctx_field(),
+    AggFun ::
+        count_key_fun(),
+    Limit :: pos_integer().
+
+
 group_by(Matcher, KeyL, ValFun, AggFun, Limit) when is_list(KeyL) ->
     KeyFun = fun(Ele) -> list_to_tuple([csrt_entry:value(Key, Ele) || Key <- 
KeyL]) end,
     group_by(Matcher, KeyFun, ValFun, AggFun, Limit);
@@ -156,7 +239,7 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) ->
     FoldFun = fun(Ele, Acc) ->
         case maps:size(Acc) =< Limit of
             true ->
-                case maybe_match(Ele, Matcher) of
+                case ets_match(Ele, Matcher) of
                     true ->
                         Key = KeyFun(Ele),
                         Val = ValFun(Ele),
@@ -181,9 +264,7 @@ group_by(Matcher, KeyFun, ValFun, AggFun, Limit) ->
             {limit, Acc}
     end.
 
-maybe_match(_Ele, undefined) ->
-    true;
-maybe_match(Ele, {_, MS}) ->
+ets_match(Ele, {_, MS}) ->
     ets:match_spec_run([Ele], MS) =/= [].
 
 %%
@@ -192,10 +273,10 @@ maybe_match(Ele, {_, MS}) ->
 
 -record(topK, {
     % we store ordered elements in ascending order
-    seq = [] :: list(pos_integer()),
+    seq = [] :: list({aggregation_key(), pos_integer()}),
     % we rely on erlang sorting order where `number < atom`
     min = infinite :: infinite | pos_integer(),
-    max = 0 :: pos_integer(),
+    max = 0 :: non_neg_integer(),
     size = 0 :: non_neg_integer(),
     % capacity cannot be less than 1
     capacity = 1 :: pos_integer()
@@ -230,44 +311,369 @@ update_topK(Key, Value, #topK{size = S, seq = Seq} = 
Top) ->
 get_topK(#topK{seq = S}) ->
     lists:reverse(S).
 
+-spec topK(aggregation_result(), pos_integer()) ->
+    ordered_result().
 topK(Results, K) ->
     TopK = maps:fold(fun update_topK/3, new_topK(K), Results),
     get_topK(TopK).
 
--spec sort_by(KeyFun) ->
-    query_result()
+%%
+%% Query API functions
+%%
+
+-spec from(MatcherName :: matcher_name() | all) ->
+    {ok, #{from => matcher_name() | all, is_unsafe => boolean()}} | {error, 
any()}.
+from(all) ->
+    #from{matcher = all, is_safe = false};
+from(MatcherName) ->
+    case csrt_logger:get_matcher(MatcherName) of
+        undefined ->
+            {error, {unknown_matcher, MatcherName}};
+        _ ->
+            #from{matcher = MatcherName, is_safe = true}
+    end.
+
+%% @doc Construct 'options' query expression.
+%% <code>
+%% Q = query([
+%%    ...
+%%    options([
+%%      ...
+%%    ])
+%% ]),
+%% </code>
+%% @end
+-spec options([query_option()]) ->
+    #query_options{} | {error, any()}.
+options(Options) ->
+    lists:foldl(fun
+        (_, {error, _} = Error) ->
+            Error;
+        ({limit, unlimited}, Acc) ->
+            Acc#query_options{limit = unlimited, is_safe = false};
+        ({limit, Limit}, {[], Acc}) when is_integer(Limit) ->
+            Acc#query_options{limit = Limit};
+        ({error, _} = Error, _Acc) ->
+            Error
+    end, #query_options{is_safe = true}, Options).
+
+unlimited() ->
+    {limit, unlimited}.
+
+with_limit(Limit) when is_integer(Limit) ->
+    case Limit =< query_limit() of
+        true ->
+            {limit, Limit};
+        false ->
+            {error, {beyond_limit, Limit}}
+    end;
+with_limit(Limit) ->
+    {error, {invalid_limit, Limit}}.
+
+-spec count_by(AggregationKeys) ->
+    {count_by, #selector{}} | {count_by, #unsafe_selector{}} | {error, any()}
+when
+    AggregationKeys ::
+        aggregation_keys_fun()
+        | binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+count_by(AggregationKeys) ->
+    with_tag(select(AggregationKeys), count_by).
+
+-spec sort_by(AggregationKeys) ->
+    {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()}
 when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()).
+    AggregationKeys ::
+        aggregation_keys_fun()
+        | binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+sort_by(AggregationKeys) ->
+    with_tag(select(AggregationKeys), sort_by).
+
+-spec sort_by(AggregationKeys, ValueKey) ->
+    {sort_by, #selector{}} | {sort_by, #unsafe_selector{}} | {error, any()}
+when
+    AggregationKeys ::
+        aggregation_keys_fun()
+        | binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()],
+    ValueKey ::
+        value_key_fun()
+        | binary()
+        | rctx_field().
+sort_by(AggregationKeys, ValueKey) ->
+    with_tag(select(AggregationKeys, ValueKey), sort_by).
+
+-spec group_by(AggregationKeys) ->
+    {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()}
+when
+    AggregationKeys ::
+        aggregation_keys_fun()
+        | binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()].
+group_by(AggregationKeys) ->
+    with_tag(select(AggregationKeys), group_by).
+
+-spec group_by(AggregationKeys, ValueKey) ->
+    {group_by, #selector{}} | {group_by, #unsafe_selector{}} | {error, any()}
+when
+    AggregationKeys ::
+        aggregation_keys_fun()
+        | binary()
+        | rctx_field()
+        | [binary()]
+        | [rctx_field()],
+    ValueKey ::
+        value_key_fun()
+        | binary()
+        | rctx_field().
+group_by(AggregationKeys, ValueKey) ->
+    with_tag(select(AggregationKeys, ValueKey), group_by).
+
+query(Query) ->
+    % start assuming safe query and turn to unsafe when we detect issues
+    Acc = #query{is_safe = true},
+    Result = lists:foldr(fun
+        ({Aggregation, #unsafe_selector{} = Selector}, {E, #query{selector = 
undefined} = Q}) ->
+            {E, Q#query{selector = Selector, is_safe = false, aggregation = 
Aggregation}};
+        ({Aggregation, #unsafe_selector{}}, {E, Q}) ->
+            {[{more_than_once, {select, Aggregation}} | E], Q};
+        ({Aggregation, #selector{} = Selector}, {E, #query{selector = 
undefined} = Q}) ->
+            {E, Q#query{selector = Selector, aggregation = Aggregation}};
+        ({Aggregation, #selector{}}, {E, Q}) ->
+            {[{more_than_once, {select, Aggregation}} | E], Q};
+        (#query_options{is_safe = false, limit = Limit}, {E, #query{limit = 
undefined} = Q}) ->
+            {E, Q#query{limit = Limit, is_safe = false}};
+        (#query_options{limit = Limit}, {E, #query{limit = undefined} = Q}) ->
+            {E, Q#query{limit = Limit}};
+        (#query_options{}, {E, Q}) ->
+            {[{more_than_once, options} | E], Q};
+        (#from{matcher = Matcher, is_safe = false}, {E, #query{matcher = 
undefined} = Q}) ->
+            {E, Q#query{matcher = Matcher, is_safe = false}};
+        (#from{matcher = Matcher}, {E, #query{matcher = undefined} = Q}) ->
+            {E, Q#query{matcher = Matcher}};
+        (#from{}, {E, Q}) ->
+            {[{more_than_once, from} | E], Q};
+        ({error, Reason}, {E, Q}) ->
+            {[Reason | E], Q}
+    end, {[], Acc}, Query),
+    case Result of
+      {[], #query{} = Q} ->
+        Q;
+      {Errors, _} ->
+        {error, Errors}
+    end.
 
-%% eg: sort_by([username, dbname, type], ioq_calls)
-%% eg: sort_by([dbname, type], doc_reads)
-sort_by(KeyFun) ->
-    topK(count_by(KeyFun), 10).
+-spec run(#query{}) ->
+    {ok, [{aggregation_key(), pos_integer()}]}
+    | {limit, [{aggregation_key(), pos_integer()}]}.
+run(#query{is_safe = true, matcher = MatcherName, selector = #selector{} = 
Selector, limit = Limit, aggregation = Aggregation}) ->
+    % we validated the presence of the matcher so this shouldn't fail
+    {ok, Matcher} =  get_matcher(MatcherName),
+    case {Aggregation, Selector} of
+      {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} ->
+        ValFun = fun(_) -> 1 end,
+        {Result, Acc} = group_by(Matcher, AKey, ValFun),
+        to_map({Result, topK(Acc, Limit)});
+      {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        {Result, Acc} = group_by(Matcher, AKey, VKey),
+        to_map({Result, topK(Acc, Limit)});
+      {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        {Result, Acc} = group_by(Matcher, AKey, VKey),
+        {Result, topK(Acc, Limit)};
+      {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} ->
+        ValFun = fun(_) -> 1 end,
+        {Result, Acc} = group_by(Matcher, AKey, ValFun),
+        to_map({Result, topK(Acc, Limit)});
+      {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        {Result, Acc} = group_by(Matcher, AKey, VKey),
+        to_map({Result, topK(Acc, Limit)})
+    end;
+run(#query{}) ->
+    {error, {unsafe_query, "Please use 'unsafe(Query)' instead if you really 
know what you are doing."}}.
+
+-spec unsafe_run(#query{}) ->
+    {ok, [{aggregation_key(), pos_integer()}]}
+    | {limit, [{aggregation_key(), pos_integer()}]}.
+unsafe_run(#query{selector = #unsafe_selector{} = Selector} = Query) ->
+    %% mutate the record (since all fields stay the same)
+    unsafe_run(Query#query{selector = setelement(1, Selector, selector)});
+unsafe_run(#query{matcher = MatcherName, selector = #selector{} = Selector, 
limit = Limit, aggregation = Aggregation}) ->
+    Matcher = choose_matcher(MatcherName),
+    case {Aggregation, Selector} of
+      {count_by, #selector{aggregation_keys = AKey, value_key = undefined}} ->
+        ValFun = fun(_) -> 1 end,
+        to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit));
+      {count_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit));
+      {sort_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit);
+      {group_by, #selector{aggregation_keys = AKey, value_key = undefined}} ->
+        ValFun = fun(_) -> 1 end,
+        to_map(maybe_apply_limit(group_by(Matcher, AKey, ValFun), Limit));
+      {group_by, #selector{aggregation_keys = AKey, value_key = VKey}} ->
+        to_map(maybe_apply_limit(group_by(Matcher, AKey, VKey), Limit))
+    end.
+
+%%
+%% Query API auxiliary functions
+%%
 
--spec sort_by(ValFun, AggFun) ->
-    query_result()
+-spec select(AggregationKeys) ->
+    #selector{} | #unsafe_selector{} | {error, any()}
 when
-    ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()),
-    AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()).
+    AggregationKeys :: aggregation_keys_fun() | binary() | rctx_field() | 
[binary()] | [rctx_field()].
 
-sort_by(KeyFun, ValFun) ->
-    {Result, Acc} = group_by(KeyFun, ValFun),
-    {Result, topK(Acc, 10)}.
+select(AggregationKeys) ->
+    maybe
+        {ok, AKey} ?= parse_aggregation_keys(AggregationKeys),
+        case is_safe_key(AKey) of
+          true ->
+            #selector{aggregation_keys = AKey};
+          false ->
+            #unsafe_selector{aggregation_keys = AKey}
+        end
+    end.
 
--spec sort_by(KeyFun, ValFun, AggFun) ->
-    query_result()
+-spec select(AggregationKeys, ValueKey) ->
+    {ok, #selector{} | #unsafe_selector{}} | {error, any()}
 when
-    KeyFun :: fun((Ele :: #rctx{}) -> aggregation_key()),
-    ValFun :: fun((Ele :: #rctx{}) -> aggregation_values()),
-    AggFun :: fun((FieldValue :: pos_integer()) -> pos_integer()).
+    AggregationKeys :: aggregation_keys_fun() | binary() | rctx_field() | 
[binary()] | [rctx_field()],
+    ValueKey :: value_key_fun() | binary() | rctx_field().
 
-sort_by(KeyFun, ValFun, AggFun) ->
-    {Result, Acc} = group_by(KeyFun, ValFun, AggFun),
-    {Result, topK(Acc, 10)}.
+select(AggregationKeys, ValueKey) ->
+    maybe
+        {ok, AKey} ?= parse_aggregation_keys(AggregationKeys),
+        {ok, VKey} ?= parse_value_key(ValueKey),
+        case is_safe_key(AKey) andalso is_safe_key(VKey) of
+          true ->
+            #selector{aggregation_keys = AKey, value_key = VKey};
+          false ->
+            #unsafe_selector{aggregation_keys = AKey, value_key = VKey}
+        end
+    end.
 
-to_json_list(List) when is_list(List) ->
-    lists:map(fun csrt_entry:to_json/1, List).
+is_safe_key(Fun) when is_function(Fun) ->
+    false;
+is_safe_key(_) ->
+    true.
+
+parse_aggregation_keys(Fun) when is_function(Fun) ->
+    validate_fun(Fun, key_fun);
+parse_aggregation_keys(Keys) ->
+    with_ok(parse_key(Keys)).
+
+parse_value_key(Fun) when is_function(Fun) ->
+    validate_fun(Fun, value_fun);
+parse_value_key(Key) ->
+    case parse_key(Key) of
+      {error, _} = Error ->
+        Error;
+      Keys when is_list(Keys) ->
+        {error, multiple_value_keys};
+      K ->
+        {ok, K}
+    end.
 
+with_tag({error, _} = Error, _) ->
+    Error;
+with_tag(Result, Tag) ->
+    {Tag, Result}.
+
+with_ok({error, _} = Error) ->
+    Error;
+with_ok(Result) ->
+    {ok, Result}.
+
+validate_fun(Fun, Tag) when is_function(Fun, 1) ->
+    try Fun(#rctx{}) of
+      _ ->
+        {ok, Fun}
+    catch
+      _:_ ->
+        {error, {invalid_fun, Tag}}
+    end;
+validate_fun(_Fun, Tag) ->
+    {error, {invalid_fun, Tag}}.
+
+choose_matcher(all) ->
+    all();
+choose_matcher(MatcherName) ->
+    % we validated the presence of the matcher so this shouldn't fail
+    {ok, Matcher} =  get_matcher(MatcherName),
+    Matcher.
+
+-spec maybe_apply_limit(ResultsOrError, Limit) -> OrderedResultsOrError
+when
+    ResultsOrError ::
+        {ok, aggregation_result()}
+        | {limit, aggregation_result()}
+        | {error, any()},
+    Limit :: unlimited | undefined | pos_integer(),
+    OrderedResultsOrError ::
+        {ok, ordered_result()}
+        | {limit, ordered_result()}
+        | {ok, aggregation_result()}
+        | {limit, aggregation_result()}
+        | {error, any()}.
+
+maybe_apply_limit({Result, Results}, unlimited) ->
+    {Result, Results};
+maybe_apply_limit({Result, Results}, undefined) ->
+    {Result, topK(Results, query_limit())};
+maybe_apply_limit({Result, Results}, Limit) when is_integer(Limit) ->
+    {Result, topK(Results, Limit)}.
+
+-spec to_map(ResultsOrError) -> OrderedResultsOrError
+when
+    ResultsOrError ::
+        {ok, ordered_result() | aggregation_result()}
+        | {limit, ordered_result() | aggregation_result()},
+    OrderedResultsOrError ::
+        {ok, aggregation_result()}
+        | {limit, aggregation_result()}.
+to_map({Result, Results}) when is_list(Results) ->
+    {Result, maps:from_list(Results)};
+to_map({Result, Results}) when is_map(Results) ->
+    {Result, Results}.
+
+-spec parse_key(Keys :: binary() | atom() | [binary()] | [atom()]) ->
+    rctx_field()
+    | [rctx_field()]
+    | {error, Reason :: any()}.
+
+parse_key([C | _] = Key) when is_integer(C) ->
+    csrt_entry:key(Key);
+parse_key(Keys) when is_list(Keys) ->
+    parse_key(Keys, []);
+parse_key(BinKey) when is_binary(BinKey) ->
+    csrt_entry:key(BinKey);
+parse_key(undefined) ->
+    undefined;
+parse_key(Key) when is_atom(Key) ->
+    csrt_entry:key(Key).
+
+parse_key([BinKey | Rest], Keys) ->
+    case csrt_entry:key(BinKey) of
+        {error, _} = Error ->
+            Error;
+        Key ->
+            parse_key(Rest, [Key | Keys])
+    end;
+parse_key([], Keys) ->
+    lists:reverse(Keys).
+
+%%
+%% Scanning with matchers
+%%
 -spec query_matcher(MatcherName :: string()) ->
     {ok, query_result()}
     | {error, any()}.
@@ -323,102 +729,12 @@ get_matcher(MatcherName) ->
             {ok, Matcher}
     end.
 
--spec query_group_by(MatcherName, AggregationKeys, ValueKey, Options :: 
query_options()) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}}
-query_group_by(MatcherName, AggregationKeys, ValueKey, Options) ->
-    AggregationKey = parse_key(AggregationKeys),
-    VKey = parse_key(ValueKey),
-    Limit = maps:get(limit, Options, query_limit()),
-    maybe
-        ok ?= validate_not_error(AggregationKey),
-        ok ?= validate_not_error(VKey),
-        ok ?= validate_limit(Limit),
-        {ok, Matcher} ?= get_matcher(MatcherName),
-        group_by(Matcher, AggregationKey, VKey)
-    end.
-
--spec query_sort_by(MatcherName, AggregationKeys, ValueKey, Options :: 
query_options()) ->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()],
-    ValueKey :: binary() | rctx_field().
-%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}}
-query_sort_by(MatcherName, AggregationKeys, ValueKey, Options) ->
-    AggregationKey = parse_key(AggregationKeys),
-    VKey = parse_key(ValueKey),
-    Limit = maps:get(limit, Options, query_limit()),
-    maybe
-        ok ?= validate_not_error(AggregationKey),
-        ok ?= validate_not_error(VKey),
-        ok ?= validate_limit(Limit),
-        {ok, Matcher} ?= get_matcher(MatcherName),
-        sort_by(Matcher, AggregationKey, VKey)
-    end.
-
--spec query_count_by(MatcherName, AggregationKeys, Options :: query_options()) 
->
-    {ok, query_result()}
-    | {error, any()}
-when
-    MatcherName :: string(),
-    AggregationKeys :: binary() | rctx_field() | [binary()] | [rctx_field()].
-%% {ok, #{{<<"adm">>,<<"bench-yktbb3as46rzffea">>} => 2}}
-query_count_by(MatcherName, AggregationKeys, Options) ->
-    AggregationKey = parse_key(AggregationKeys),
-    Limit = maps:get(limit, Options, query_limit()),
-    maybe
-        ok ?= validate_not_error(AggregationKey),
-        ok ?= validate_limit(Limit),
-        {ok, Matcher} ?= get_matcher(MatcherName),
-        count_by(Matcher, AggregationKey)
-    end.
-
-validate_not_error({error, _} = Error) ->
-    Error;
-validate_not_error(_) ->
-    ok.
-
+%%
+%% Auxiliary functions
+%%
 query_limit() ->
     config:get_integer(?CSRT, "query_limit", ?QUERY_LIMIT).
 
-validate_limit(Limit) when is_integer(Limit) ->
-    case Limit =< query_limit() of
-        true ->
-            ok;
-        false ->
-            {error, {beyond_limit, query_limit()}}
-    end;
-validate_limit(Limit) ->
-    {error, {invalid_limit, Limit}}.
-
--spec parse_key(Keys :: binary() | atom() | [binary()] | [atom()]) ->
-    [rctx_field()]
-    | throw({bad_request, Reason :: binary()}).
-
-parse_key([C | _] = Key) when is_integer(C)->
-    csrt_entry:key(Key);
-parse_key(Keys) when is_list(Keys) ->
-    parse_key(Keys, []);
-parse_key(BinKey) when is_binary(BinKey) ->
-    csrt_entry:key(BinKey);
-parse_key(undefined) ->
-    undefined;
-parse_key(Key) when is_atom(Key) ->
-    csrt_entry:key(Key).
+to_json_list(List) when is_list(List) ->
+    lists:map(fun csrt_entry:to_json/1, List).
 
-parse_key([BinKey | Rest], Keys) ->
-    case csrt_entry:key(BinKey) of
-        {error, _} = Error ->
-            Error;
-        Key ->
-            parse_key(Rest, [Key | Keys])
-    end;
-parse_key([], Keys) ->
-    lists:reverse(Keys).


Reply via email to