This is an automated email from the ASF dual-hosted git repository.

iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit fb8a5b1c4f70df9de2bd1f9108505a29d0f46055
Author: ILYA Khlopotov <[email protected]>
AuthorDate: Thu Jul 10 00:38:48 2025 -0700

    Update tests
---
 src/couch_stats/test/eunit/csrt_httpd_tests.erl |  81 ++-
 src/couch_stats/test/eunit/csrt_query_tests.erl | 693 ++++++++++++++++--------
 2 files changed, 526 insertions(+), 248 deletions(-)

diff --git a/src/couch_stats/test/eunit/csrt_httpd_tests.erl 
b/src/couch_stats/test/eunit/csrt_httpd_tests.erl
index 5b32f564f..e2db8cdfe 100644
--- a/src/couch_stats/test/eunit/csrt_httpd_tests.erl
+++ b/src/couch_stats/test/eunit/csrt_httpd_tests.erl
@@ -89,8 +89,13 @@ active_resources_group_by(MatcherName, Url, AggregationKeys, 
CounterKey) ->
 t_query_group_by_multiple_keys(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
-    {RC, Result} = active_resources_group_by(Url, [<<"username">>, 
<<"dbname">>], <<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_group_by(Url, [<<"username">>, 
<<"dbname">>], <<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -118,8 +123,13 @@ t_query_group_by_multiple_keys(#{rctxs := Rctxs, url := 
Url}) ->
 t_query_group_by_single_key(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
-    {RC, Result} = active_resources_group_by(Url, [<<"username">>], 
<<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_group_by(Url, [<<"username">>], 
<<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -141,8 +151,13 @@ t_query_group_by_single_key(#{rctxs := Rctxs, url := Url}) 
->
 t_query_group_by_binary_key(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
-    {RC, Result} = active_resources_group_by(Url, <<"username">>, 
<<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_group_by(Url, <<"username">>, 
<<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -206,8 +221,13 @@ active_resources_count_by(MatcherName, Url, 
AggregationKeys) ->
 t_query_count_by_multiple_keys(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
     Grouped = count(Aggregated),
-    {RC, Result} = active_resources_count_by(Url, [<<"username">>, 
<<"dbname">>]),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_count_by(Url, [<<"username">>, 
<<"dbname">>]),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -235,8 +255,13 @@ t_query_count_by_multiple_keys(#{rctxs := Rctxs, url := 
Url}) ->
 t_query_count_by_single_key(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = count(Aggregated),
-    {RC, Result} = active_resources_count_by(Url, [<<"username">>]),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_count_by(Url, [<<"username">>]),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -258,8 +283,13 @@ t_query_count_by_single_key(#{rctxs := Rctxs, url := Url}) 
->
 t_query_count_by_binary_key(#{rctxs := Rctxs, url := Url}) ->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = count(Aggregated),
-    {RC, Result} = active_resources_count_by(Url, <<"username">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_count_by(Url, <<"username">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -318,8 +348,13 @@ t_query_sort_by_multiple_keys(#{rctxs := Rctxs, url := 
Url}) ->
     Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
     Ordered = order_by_value(Grouped),
-    {RC, Result} = active_resources_sort_by(Url, [<<"username">>, 
<<"dbname">>], <<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_sort_by(Url, [<<"username">>, 
<<"dbname">>], <<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -349,8 +384,13 @@ t_query_sort_by_single_key(#{rctxs := Rctxs, url := Url}) 
->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
     Ordered = order_by_value(Grouped),
-    {RC, Result} = active_resources_sort_by(Url, [<<"username">>], 
<<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_sort_by(Url, [<<"username">>], 
<<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
@@ -374,8 +414,13 @@ t_query_sort_by_binary_key(#{rctxs := Rctxs, url := Url}) 
->
     Aggregated = aggregate([username], ioq_calls, Rctxs),
     Grouped = group(Aggregated),
     Ordered = order_by_value(Grouped),
-    {RC, Result} = active_resources_sort_by(Url, <<"username">>, 
<<"ioq_calls">>),
-    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Result])),
+    {RC, Results} = active_resources_sort_by(Url, <<"username">>, 
<<"ioq_calls">>),
+    ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n  
~p~n", [RC, Results])),
+    [#{
+        <<"errors">> := [],
+        <<"node">> := _,
+        <<"result">> := Result
+    }] = Results,
     ?assert(is_list(Result), format("Expected list of entries, got ~p~n", 
[Result])),
     ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n  
~p~n", [length(Result), Result])),
     ?assertMatch([
diff --git a/src/couch_stats/test/eunit/csrt_query_tests.erl 
b/src/couch_stats/test/eunit/csrt_query_tests.erl
index f85df771d..681af5a2b 100644
--- a/src/couch_stats/test/eunit/csrt_query_tests.erl
+++ b/src/couch_stats/test/eunit/csrt_query_tests.erl
@@ -12,317 +12,550 @@
 
 -module(csrt_query_tests).
 
--include_lib("stdlib/include/ms_transform.hrl").
-
 -import(
-    csrt_test_helper,
+    csrt_query,
     [
-        rctx_gen/0,
-        rctx_gen/1,
-        rctxs/0,
-        jrctx/1
+        query/1,
+        from/1,
+        group_by/1,
+        group_by/2,
+        sort_by/1,
+        sort_by/2,
+        count_by/1,
+        options/1,
+        unlimited/0,
+        with_limit/1,
+
+        run/1,
+        unsafe_run/1
     ]
 ).
 
--include_lib("couch/include/couch_db.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
--include("../../src/couch_stats_resource_tracker.hrl").
-
-%% Use different values than default configs to ensure they're picked up
--define(THRESHOLD_DBNAME_IO, 91).
--define(THRESHOLD_DOCS_READ, 123).
--define(THRESHOLD_DOCS_WRITTEN, 12).
--define(THRESHOLD_IOQ_CALLS, 439).
--define(THRESHOLD_ROWS_READ, 43).
--define(THRESHOLD_CHANGES, 79).
--define(THRESHOLD_LONG_REQS, 432).
 
--define(TEST_QUERY_LIMIT, 98).
+-include_lib("stdlib/include/ms_transform.hrl").
+-include("../../src/couch_stats_resource_tracker.hrl").
 
+-define(MATCHERS_THRESHOLD, 1000).
 csrt_query_test_() ->
     {
         foreach,
         fun setup/0,
         fun teardown/1,
         [
-            ?TDEF_FE(t_query_group_by),
-            ?TDEF_FE(t_query_count_by),
-            ?TDEF_FE(t_query_sort_by)
+            ?TDEF_FE(t_group_by_multiple_keys),
+            ?TDEF_FE(t_group_by_single_key),
+            ?TDEF_FE(t_group_by_binary_key),
+            ?TDEF_FE(t_group_by_detect_unsafe_query),
+            ?TDEF_FE(t_group_by_run_unsafe_query),
+            ?TDEF_FE(t_group_by_run_unsafe_correctness),
+            ?TDEF_FE(t_group_by_bad_request),
+            ?TDEF_FE(t_count_by_multiple_keys),
+            ?TDEF_FE(t_count_by_single_key),
+            ?TDEF_FE(t_count_by_binary_key),
+            ?TDEF_FE(t_count_by_bad_request),
+            ?TDEF_FE(t_sort_by_multiple_keys),
+            ?TDEF_FE(t_sort_by_single_key),
+            ?TDEF_FE(t_sort_by_binary_key),
+            ?TDEF_FE(t_sort_by_bad_request)
         ]
     }.
 
 setup() ->
-    Ctx = test_util:start_couch([fabric, couch_stats]),
-    config:set_boolean(?CSRT, "randomize_testing", false, false),
-    config:set_boolean(?CSRT, "enable_reporting", true, false),
-    config:set_boolean(?CSRT, "enable_rpc_reporting", true, false),
-
-    ok = meck:new(ioq, [passthrough]),
-    ok = meck:expect(ioq, bypass, fun(_, _) -> false end),
-    DbName = ?tempdb(),
-    ok = fabric:create_db(DbName, [{q, 8}, {n, 1}]),
-    Docs = make_docs(100),
-    Opts = [],
-    {ok, _} = fabric:update_docs(DbName, Docs, Opts),
-    Method = 'GET',
-    Path = "/" ++ ?b2l(DbName) ++ "/_all_docs",
-    Nonce = couch_util:to_hex(crypto:strong_rand_bytes(5)),
-    Req = #httpd{method = Method, nonce = Nonce},
-    {_, _} = PidRef = csrt:create_coordinator_context(Req, Path),
-    csrt:set_context_username(<<"user_foo">>),
-    csrt:set_context_dbname(DbName),
-    MArgs = #mrargs{include_docs = false},
-    _Res = fabric:all_docs(DbName, [?ADMIN_CTX], fun view_cb/2, [], MArgs),
-    Rctx = load_rctx(PidRef),
-    ok = config:set(
-        "csrt_logger.matchers_threshold", "docs_read", 
integer_to_list(?THRESHOLD_DOCS_READ), false
-    ),
-    ok = config:set(
-        "csrt_logger.matchers_threshold",
-        "docs_written",
-        integer_to_list(?THRESHOLD_DOCS_WRITTEN),
-        false
-    ),
-    ok = config:set(
-        "csrt_logger.matchers_threshold", "ioq_calls", 
integer_to_list(?THRESHOLD_IOQ_CALLS), false
-    ),
-    ok = config:set(
-        "csrt_logger.matchers_threshold", "rows_read", 
integer_to_list(?THRESHOLD_ROWS_READ), false
-    ),
-    ok = config:set(
-        "csrt_logger.matchers_threshold",
-        "changes_processed",
-        integer_to_list(?THRESHOLD_CHANGES),
-        false
-    ),
-    ok = config:set(
-        "csrt_logger.matchers_threshold", "long_reqs", 
integer_to_list(?THRESHOLD_LONG_REQS), false
-    ),
-    ok = config:set("csrt_logger.dbnames_io", "foo", 
integer_to_list(?THRESHOLD_DBNAME_IO), false),
-    ok = config:set("csrt_logger.dbnames_io", "bar", 
integer_to_list(?THRESHOLD_DBNAME_IO), false),
-    ok = config:set(
-        "csrt_logger.dbnames_io", "foo/bar", 
integer_to_list(?THRESHOLD_DBNAME_IO), false
-    ),
-    config:set(?CSRT, "query_limit", integer_to_list(?TEST_QUERY_LIMIT)),
-    csrt_logger:reload_matchers(),
-    Rctxs = rctxs(),
-    #{ctx => Ctx, dbname => DbName, rctx => Rctx, rctxs => Rctxs}.
+    Rctxs = [
+        rctx(#{dbname => <<"db1">>, ioq_calls => 123, username => 
<<"user_foo">>}),
+        rctx(#{dbname => <<"db1">>, ioq_calls => 321, username => 
<<"user_foo">>}),
+        rctx(#{dbname => <<"db2">>, ioq_calls => 345, username => 
<<"user_bar">>}),
+        rctx(#{dbname => <<"db2">>, ioq_calls => 543, username => 
<<"user_bar">>}),
+        rctx(#{dbname => <<"db1">>, ioq_calls => 678, username => 
<<"user_bar">>}),
+        rctx(#{dbname => <<"db2">>, ioq_calls => 987, username => 
<<"user_foo">>})
+    ],
+    ets:new(?CSRT_ETS, [
+        named_table,
+        public,
+        {keypos, #rctx.pid_ref}
+    ]),
+    ets:insert(?CSRT_ETS, Rctxs),
+    add_matcher("docs_read", 
csrt_logger:matcher_on_docs_read(?MATCHERS_THRESHOLD)),
+    #{rctxs => Rctxs}.
 
-teardown(#{ctx := Ctx, dbname := DbName}) ->
-    ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
-    ok = meck:unload(ioq),
-    test_util:stop_couch(Ctx).
+teardown(_) ->
+    ets:delete(?CSRT_ETS).
 
-load_rctx(PidRef) ->
-    %% Add slight delay to accumulate RPC response deltas
-    timer:sleep(50),
-    csrt:get_resource(PidRef).
+rctx(Opts) ->
+    % Update `docs_read` to make standard `{docs_read, fun 
matcher_on_docs_read/1, 1000}`
+    % matcher match.
+    BaseOpts = #{docs_read => ?MATCHERS_THRESHOLD + 1, username => 
<<"user_foo">>},
+    csrt_test_helper:rctx_gen(maps:merge(BaseOpts, Opts)).
 
-make_docs(Count) ->
-    lists:map(
-        fun(I) ->
-            #doc{
-                id = ?l2b("foo_" ++ integer_to_list(I)),
-                body = {[{<<"value">>, I}]}
-            }
-        end,
-        lists:seq(1, Count)
-    ).
+dummy_key_fun(#rctx{username = Username}) ->
+    Username.
 
-view_cb({row, Row}, Acc) ->
-    {ok, [Row | Acc]};
-view_cb(_Msg, Acc) ->
-    {ok, Acc}.
+dummy_value_fun(#rctx{ioq_calls = IoqCalls}) ->
+    IoqCalls.
 
-t_query_group_by(#{rctx := Rctx, dbname := DbName}) ->
-    IoqCalls = Rctx#rctx.ioq_calls,
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>, DbName} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", [username, dbname], 
<<"ioq_calls">>, #{}),
-        "Should handle 'AggregationKeys :: [atom(), ...]'"
-    ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", [username], <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: [atom()]'"
-    ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", ["username"], <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: [string()]'"
-    ),
-    ?assertMatch(
-        {ok, #{<<"user_foo">> := IoqCalls}},
-        csrt_query:query_group_by("rows_read", username, <<"ioq_calls">>, #{}),
-        "Should handle 'AggregationKeys :: atom()'"
+t_group_by_multiple_keys(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped),
+    V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped),
+    V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped),
+    V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        group_by([<<"username">>, <<"dbname">>], <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, #{
+            {<<"user_bar">>, <<"db1">>} := V1,
+            {<<"user_bar">>, <<"db2">>} := V2,
+            {<<"user_foo">>, <<"db1">>} := V3,
+            {<<"user_foo">>, <<"db2">>} := V4
+        }},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, #{<<"user_foo">> := IoqCalls}},
-        csrt_query:query_group_by("rows_read", "username", <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: string()'"
-    ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>, DbName} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", [username, dbname], 
"ioq_calls", #{}),
-        "Should handle 'ValueKey :: string()'"
+    ok.
+
+t_group_by_single_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    V1 = maps:get({<<"user_bar">>}, Grouped),
+    V2 = maps:get({<<"user_foo">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        group_by([<<"username">>], <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, #{
+            {<<"user_bar">>} := V1,
+            {<<"user_foo">>} := V2
+        }},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>, DbName} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, 
#{}),
-        "Should handle 'ValueKey :: atom()'"
+    ok.
+
+t_group_by_binary_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    V1 = maps:get({<<"user_bar">>}, Grouped),
+    V2 = maps:get({<<"user_foo">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        group_by(<<"username">>, <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, #{
+            <<"user_bar">> := V1,
+            <<"user_foo">> := V2
+        }},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>, DbName} := IoqCalls}},
-        csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, 
#{limit => ?TEST_QUERY_LIMIT - 1}),
-        "Should handle 'limit' option"
+    ok.
+
+t_group_by_detect_unsafe_query(_) ->
+    ?assertMatch({error, {unsafe_query, _}}, run(query([
+        from(all),
+        group_by(<<"username">>, <<"ioq_calls">>)
+    ])), "Should detect `unsafe` when `all` matcher is used"),
+    ?assertMatch({error, {unsafe_query, _}}, run(query([
+        from("docs_read"),
+        group_by(fun dummy_key_fun/1, <<"ioq_calls">>)
+    ])), "Should detect `unsafe` when `AggregationKey` is a function()"),
+    ?assertMatch({error, {unsafe_query, _}}, run(query([
+        from("docs_read"),
+        group_by(<<"username">>, fun dummy_value_fun/1)
+    ])), "Should detect `unsafe` when `ValueKey` is a function()"),
+    ?assertMatch({error, {unsafe_query, _}}, run(query([
+        from("docs_read"),
+        group_by(<<"username">>, <<"ioq_calls">>),
+        options([
+            unlimited()
+        ])
+    ])), "Should detect `unsafe` when `unlimited()` is used"),
+    ok.
+
+t_group_by_run_unsafe_query(_) ->
+    ?assertMatch({ok, _}, unsafe_run(query([
+        from(all),
+        group_by(<<"username">>, <<"ioq_calls">>)
+    ])), "Should be able to use `unsafe_run` when `all` matcher is used"),
+    ?assertMatch({ok, _}, unsafe_run(query([
+        from("docs_read"),
+        group_by(fun dummy_key_fun/1, <<"ioq_calls">>)
+    ])), "Should be able to use `unsafe_run`  when `AggregationKey` is a 
function()"),
+    ?assertMatch({ok, _}, unsafe_run(query([
+        from("docs_read"),
+        group_by(<<"username">>, fun dummy_value_fun/1)
+    ])), "Should be able to use `unsafe_run`  when `ValueKey` is a 
function()"),
+    ?assertMatch({ok, _}, unsafe_run(query([
+        from("docs_read"),
+        group_by(<<"username">>, <<"ioq_calls">>),
+        options([
+            unlimited()
+        ])
+    ])), "Should be able to use `unsafe_run` when `unlimited()` is used"),
+    ok.
+
+t_group_by_run_unsafe_correctness(_) ->
+    % we are checking that safe analog of the query return same result
+    ?assertEqual(
+        run(query([
+            from("docs_read"),
+            group_by(<<"username">>, <<"ioq_calls">>)
+        ])),
+        unsafe_run(query([
+            from(all),
+            group_by(<<"username">>, <<"ioq_calls">>)
+        ])),
+        "Should get correct result from `unsafe_run` when `all` matcher is 
used"
+    ),
+    ?assertEqual(
+        run(query([
+            from("docs_read"),
+            group_by(<<"username">>, <<"ioq_calls">>)
+        ])),
+        unsafe_run(query([
+            from("docs_read"),
+            group_by(fun dummy_key_fun/1, <<"ioq_calls">>)
+        ])),
+        "Should get correct result from `unsafe_run`  when `AggregationKey` is 
a function()"
+    ),
+    ?assertEqual(
+        run(query([
+            from("docs_read"),
+            group_by(<<"username">>, ioq_calls)
+        ])),
+        unsafe_run(query([
+            from("docs_read"),
+            group_by(<<"username">>, fun dummy_value_fun/1)
+        ])),
+        "Should get correct result from `unsafe_run`  when `ValueKey` is a 
function()"
+    ),
+    ?assertEqual(
+        run(query([
+            from("docs_read"),
+            group_by(<<"username">>, <<"ioq_calls">>)
+        ])),
+        unsafe_run(query([
+            from("docs_read"),
+            group_by(<<"username">>, <<"ioq_calls">>),
+            options([
+            unlimited()
+            ])
+        ])),
+        "Should get correct result from `unsafe_run` when `unlimited()` is 
used"
     ),
+    ok.
+
+t_group_by_bad_request(_) ->
     ?assertMatch(
-        {error,{unknown_matcher,"unknown_matcher"}},
-        csrt_query:query_group_by("unknown_matcher", [username, dbname], 
ioq_calls, #{}),
+        {error, [{unknown_matcher, "unknown_matcher"}]},
+        query([
+            from("unknown_matcher"),
+            group_by(<<"username">>, <<"ioq_calls">>)
+        ]),
         "Should return error if 'matcher' is unknown"
     ),
     ?assertMatch(
-        {error,{unknown_matcher, rows_read}},
-        csrt_query:query_group_by(rows_read, [username, dbname], ioq_calls, 
#{}),
+        {error, [{unknown_matcher, rows_read}]},
+        query([
+            from(rows_read),
+            group_by([username, dbname], ioq_calls)
+        ]),
         "Should return error if 'matcher' is not a string()"
     ),
     ?assertMatch(
-        {error, {invalid_key, "unknown_field"}},
-        csrt_query:query_group_by("rows_read", "unknown_field", ioq_calls, 
#{}),
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            group_by("unknown_field", ioq_calls)
+        ]),
         "Should return error if 'AggregationKeys' contain unknown field"
     ),
     ?assertMatch(
-        {error, {invalid_key, "unknown_field"}},
-        csrt_query:query_group_by("rows_read", "username", "unknown_field", 
#{}),
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            group_by("username", "unknown_field")
+        ]),
         "Should return error if 'ValueKey' contain unknown field"
     ),
     ?assertMatch(
-        {error, {beyond_limit, ?TEST_QUERY_LIMIT}},
-        csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, 
#{limit => ?TEST_QUERY_LIMIT + 1}),
+        {error, [{beyond_limit, ?QUERY_LIMIT + 1}]},
+        query([
+            from("docs_read"),
+            group_by("username", ioq_calls),
+            options([
+                with_limit(?QUERY_LIMIT + 1)
+            ])
+        ]),
         "Should return error when 'limit' is greater than configured"
     ),
     ok.
 
-t_query_count_by(#{dbname := DbName}) ->
-    IoqCount = 1,
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>, DbName} := IoqCount}},
-        csrt_query:query_count_by("rows_read", [username, dbname], #{}),
-        "Should handle 'AggregationKeys :: [atom(), ...]'"
+t_count_by_multiple_keys(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
+    Grouped = count(Aggregated),
+    V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped),
+    V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped),
+    V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped),
+    V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        count_by([<<"username">>, <<"dbname">>])
+    ]),
+    ?assertMatch(
+        {ok, #{
+            {<<"user_bar">>, <<"db1">>} := V1,
+            {<<"user_bar">>, <<"db2">>} := V2,
+            {<<"user_foo">>, <<"db1">>} := V3,
+            {<<"user_foo">>, <<"db2">>} := V4
+        }},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>} := IoqCount}},
-        csrt_query:query_count_by("rows_read", [username], #{}),
-        "Should handle 'AggregationKeys :: [atom()]'"
-    ),
-    ?assertMatch(
-        {ok, #{{<<"user_foo">>} := IoqCount}},
-        csrt_query:query_count_by("rows_read", ["username"], #{}),
-        "Should handle 'AggregationKeys :: [string()]'"
-    ),
-    ?assertMatch(
-        {ok, #{<<"user_foo">> := IoqCount}},
-        csrt_query:query_count_by("rows_read", username, #{}),
-        "Should handle 'AggregationKeys :: atom()'"
-    ),
-    ?assertMatch(
-        {ok, #{<<"user_foo">> := IoqCount}},
-        csrt_query:query_count_by("rows_read", "username", #{}),
-        "Should handle 'AggregationKeys :: string()'"
+    ok.
+
+t_count_by_single_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = count(Aggregated),
+    V1 = maps:get({<<"user_bar">>}, Grouped),
+    V2 = maps:get({<<"user_foo">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        count_by([<<"username">>])
+    ]),
+    ?assertMatch(
+        {ok, #{
+            {<<"user_bar">>} := V1,
+            {<<"user_foo">>} := V2
+        }},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, #{<<"user_foo">> := IoqCount}},
-        csrt_query:query_count_by("rows_read", "username", #{limit => 
?TEST_QUERY_LIMIT - 1}),
-        "Should handle 'limit' option"
+    ok.
+
+t_count_by_binary_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = count(Aggregated),
+    V1 = maps:get({<<"user_bar">>}, Grouped),
+    V2 = maps:get({<<"user_foo">>}, Grouped),
+    Q = query([
+        from("docs_read"),
+        count_by(<<"username">>)
+    ]),
+    ?assertMatch(
+        {ok, #{
+            <<"user_bar">> := V1,
+            <<"user_foo">> := V2
+        }},
+        run(Q)
     ),
+    ok.
+
+t_count_by_bad_request(_) ->
     ?assertMatch(
-        {error,{unknown_matcher,"unknown_matcher"}},
-        csrt_query:query_count_by("unknown_matcher", [username, dbname], #{}),
+        {error, [{unknown_matcher, "unknown_matcher"}]},
+        query([
+            from("unknown_matcher"),
+            count_by(<<"username">>)
+        ]),
         "Should return error if 'matcher' is unknown"
     ),
     ?assertMatch(
-        {error,{unknown_matcher, rows_read}},
-        csrt_query:query_count_by(rows_read, [username, dbname], #{}),
+        {error, [{unknown_matcher, rows_read}]},
+        query([
+            from(rows_read),
+            count_by([username, dbname])
+        ]),
         "Should return error if 'matcher' is not a string()"
     ),
     ?assertMatch(
-        {error, {invalid_key, "unknown_field"}},
-        csrt_query:query_count_by("rows_read", "unknown_field", #{}),
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            count_by("unknown_field")
+        ]),
         "Should return error if 'AggregationKeys' contain unknown field"
     ),
     ?assertMatch(
-        {error, {beyond_limit, ?TEST_QUERY_LIMIT}},
-        csrt_query:query_count_by("rows_read", [username, dbname], #{limit => 
?TEST_QUERY_LIMIT + 1}),
+        {error, [{beyond_limit, ?QUERY_LIMIT + 1}]},
+        query([
+            from("docs_read"),
+            count_by("username"),
+            options([
+                with_limit(?QUERY_LIMIT + 1)
+            ])
+        ]),
         "Should return error when 'limit' is greater than configured"
     ),
     ok.
 
-t_query_sort_by(#{rctx := Rctx, dbname := DbName}) ->
-    IoqCalls = Rctx#rctx.ioq_calls,
-    ?assertMatch(
-        {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", [username, dbname], 
<<"ioq_calls">>, #{}),
-        "Should handle 'AggregationKeys :: [atom(), ...]'"
-    ),
-    ?assertMatch(
-        {ok, [{{<<"user_foo">>}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", [username], <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: [atom()]'"
-    ),
-    ?assertMatch(
-        {ok, [{{<<"user_foo">>}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", ["username"], <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: [string()]'"
+t_sort_by_multiple_keys(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username, dbname], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    Ordered = order_by_value(Grouped),
+    [
+        {{<<"user_foo">>, <<"db2">>}, V1},
+        {{<<"user_bar">>, <<"db2">>}, V2},
+        {{<<"user_bar">>, <<"db1">>}, V3},
+        {{<<"user_foo">>, <<"db1">>}, V4}
+    ] = Ordered,
+    Q = query([
+        from("docs_read"),
+        sort_by([<<"username">>, <<"dbname">>], <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, [
+            {{<<"user_foo">>, <<"db2">>}, V1},
+            {{<<"user_bar">>, <<"db2">>}, V2},
+            {{<<"user_bar">>, <<"db1">>}, V3},
+            {{<<"user_foo">>, <<"db1">>}, V4}
+        ]},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, [{<<"user_foo">>, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", username, <<"ioq_calls">>, #{}),
-        "Should handle 'AggregationKeys :: atom()'"
+    ok.
+
+t_sort_by_single_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    Ordered = order_by_value(Grouped),
+    [
+        {{<<"user_bar">>}, V1},
+        {{<<"user_foo">>}, V2}
+    ] = Ordered,
+    Q = query([
+        from("docs_read"),
+        sort_by([<<"username">>], <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, [
+            {{<<"user_bar">>}, V1},
+            {{<<"user_foo">>}, V2}
+        ]},
+        run(Q)
     ),
-    ?assertMatch(
-        {ok, [{<<"user_foo">>, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", "username", <<"ioq_calls">>, 
#{}),
-        "Should handle 'AggregationKeys :: string()'"
+    ok.
+
+t_sort_by_binary_key(#{rctxs := Rctxs}) ->
+    Aggregated = aggregate([username], ioq_calls, Rctxs),
+    Grouped = group(Aggregated),
+    Ordered = order_by_value(Grouped),
+    [
+        {{<<"user_bar">>}, V1},
+        {{<<"user_foo">>}, V2}
+    ] = Ordered,
+    Q = query([
+        from("docs_read"),
+        sort_by(<<"username">>, <<"ioq_calls">>)
+    ]),
+    ?assertMatch(
+        {ok, [
+            {<<"user_bar">>, V1},
+            {<<"user_foo">>, V2}
+        ]},
+        run(Q)
     ),
+    ok.
+
+t_sort_by_bad_request(_) ->
     ?assertMatch(
-        {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", [username, dbname], "ioq_calls", 
#{}),
-        "Should handle 'ValueKey :: string()'"
+        {error, [{unknown_matcher, "unknown_matcher"}]},
+        query([
+            from("unknown_matcher"),
+            sort_by(<<"username">>, <<"ioq_calls">>)
+        ]),
+        "Should return error if 'matcher' is unknown"
     ),
     ?assertMatch(
-        {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, 
#{}),
-        "Should handle 'ValueKey :: atom()'"
+        {error, [{unknown_matcher, "unknown_matcher"}]},
+        query([
+            from("unknown_matcher"),
+            sort_by(<<"username">>)
+        ]),
+        "Should return error if 'matcher' is unknown"
     ),
     ?assertMatch(
-        {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]},
-        csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, 
#{limit => ?TEST_QUERY_LIMIT - 1}),
-        "Should handle 'limit' option"
+        {error, [{unknown_matcher, rows_read}]},
+        query([
+            from(rows_read),
+            sort_by([username, dbname], ioq_calls)
+        ]),
+        "Should return error if 'matcher' is not a string()"
     ),
     ?assertMatch(
-        {error,{unknown_matcher,"unknown_matcher"}},
-        csrt_query:query_sort_by("unknown_matcher", [username, dbname], 
ioq_calls, #{}),
-        "Should return error if 'matcher' is unknown"
+        {error, [{unknown_matcher, rows_read}]},
+        query([
+            from(rows_read),
+            sort_by([username, dbname])
+        ]),
+        "Should return error if 'matcher' is not a string()"
     ),
     ?assertMatch(
-        {error,{unknown_matcher, rows_read}},
-        csrt_query:query_sort_by(rows_read, [username, dbname], ioq_calls, 
#{}),
-        "Should return error if 'matcher' is not a string()"
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            sort_by("unknown_field", ioq_calls)
+        ]),
+        "Should return error if 'AggregationKeys' contain unknown field"
     ),
     ?assertMatch(
-        {error, {invalid_key, "unknown_field"}},
-        csrt_query:query_sort_by("rows_read", "unknown_field", ioq_calls, #{}),
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            sort_by("unknown_field")
+        ]),
         "Should return error if 'AggregationKeys' contain unknown field"
     ),
     ?assertMatch(
-        {error, {invalid_key, "unknown_field"}},
-        csrt_query:query_sort_by("rows_read", "username", "unknown_field", 
#{}),
+        {error, [{invalid_key, "unknown_field"}]},
+        query([
+            from("docs_read"),
+            sort_by("username", "unknown_field")
+        ]),
         "Should return error if 'ValueKey' contain unknown field"
     ),
     ?assertMatch(
-        {error, {beyond_limit, ?TEST_QUERY_LIMIT}},
-        csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, 
#{limit => ?TEST_QUERY_LIMIT + 1}),
+        {error, [{beyond_limit, ?QUERY_LIMIT + 1}]},
+        query([
+            from("docs_read"),
+            sort_by("username", ioq_calls),
+            options([
+                with_limit(?QUERY_LIMIT + 1)
+            ])
+        ]),
+        "Should return error when 'limit' is greater than configured"
+    ),
+    ?assertMatch(
+        {error, [{beyond_limit, ?QUERY_LIMIT + 1}]},
+        query([
+            from("docs_read"),
+            sort_by("username"),
+            options([
+                with_limit(?QUERY_LIMIT + 1)
+            ])
+        ]),
         "Should return error when 'limit' is greater than configured"
     ),
     ok.
+
+add_matcher(Name, MSpec) ->
+    persistent_term:put({csrt_logger, all_csrt_matchers}, #{Name => {MSpec, 
ets:match_spec_compile(MSpec)}}).
+
+aggregate(AggregationKeys, ValField, Records) ->
+    lists:foldl(fun(Rctx, Acc) ->
+        Key = list_to_tuple([csrt_entry:value(Field, Rctx) || Field <- 
AggregationKeys]),
+        CurrVal = maps:get(Key, Acc, []),
+        maps:put(Key, [csrt_entry:value(ValField, Rctx) | CurrVal], Acc)
+    end, #{}, Records).
+
+group(Aggregated) ->
+    maps:fold(fun(Key, Val, Acc) ->
+        maps:put(Key, lists:foldl(fun erlang:'+'/2, 0, Val), Acc)
+    end, #{}, Aggregated).
+
+count(Aggregated) ->
+    maps:fold(fun(Key, Val, Acc) ->
+        maps:put(Key, lists:foldl(fun(_, A) -> A + 1 end, 0, Val), Acc)
+    end, #{}, Aggregated).
+
+order_by_value(Grouped) ->
+    lists:reverse(lists:keysort(2, maps:to_list(Grouped))).


Reply via email to