This is an automated email from the ASF dual-hosted git repository. iilyak pushed a commit to branch couch-stats-resource-tracker-v3-rebase-http-2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit fb8a5b1c4f70df9de2bd1f9108505a29d0f46055 Author: ILYA Khlopotov <[email protected]> AuthorDate: Thu Jul 10 00:38:48 2025 -0700 Update tests --- src/couch_stats/test/eunit/csrt_httpd_tests.erl | 81 ++- src/couch_stats/test/eunit/csrt_query_tests.erl | 693 ++++++++++++++++-------- 2 files changed, 526 insertions(+), 248 deletions(-) diff --git a/src/couch_stats/test/eunit/csrt_httpd_tests.erl b/src/couch_stats/test/eunit/csrt_httpd_tests.erl index 5b32f564f..e2db8cdfe 100644 --- a/src/couch_stats/test/eunit/csrt_httpd_tests.erl +++ b/src/couch_stats/test/eunit/csrt_httpd_tests.erl @@ -89,8 +89,13 @@ active_resources_group_by(MatcherName, Url, AggregationKeys, CounterKey) -> t_query_group_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), Grouped = group(Aggregated), - {RC, Result} = active_resources_group_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_group_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -118,8 +123,13 @@ t_query_group_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> t_query_group_by_single_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = group(Aggregated), - {RC, Result} = active_resources_group_by(Url, [<<"username">>], <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_group_by(Url, [<<"username">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -141,8 +151,13 @@ t_query_group_by_single_key(#{rctxs := Rctxs, url := Url}) -> t_query_group_by_binary_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = group(Aggregated), - {RC, Result} = active_resources_group_by(Url, <<"username">>, <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_group_by(Url, <<"username">>, <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -206,8 +221,13 @@ active_resources_count_by(MatcherName, Url, AggregationKeys) -> t_query_count_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), Grouped = count(Aggregated), - {RC, Result} = active_resources_count_by(Url, [<<"username">>, <<"dbname">>]), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_count_by(Url, [<<"username">>, <<"dbname">>]), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -235,8 +255,13 @@ t_query_count_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> t_query_count_by_single_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = count(Aggregated), - {RC, Result} = active_resources_count_by(Url, [<<"username">>]), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_count_by(Url, [<<"username">>]), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -258,8 +283,13 @@ t_query_count_by_single_key(#{rctxs := Rctxs, url := Url}) -> t_query_count_by_binary_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = count(Aggregated), - {RC, Result} = active_resources_count_by(Url, <<"username">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_count_by(Url, <<"username">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -318,8 +348,13 @@ t_query_sort_by_multiple_keys(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), Grouped = group(Aggregated), Ordered = order_by_value(Grouped), - {RC, Result} = active_resources_sort_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_sort_by(Url, [<<"username">>, <<"dbname">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(4, length(Result), format("Expected four entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -349,8 +384,13 @@ t_query_sort_by_single_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = group(Aggregated), Ordered = order_by_value(Grouped), - {RC, Result} = active_resources_sort_by(Url, [<<"username">>], <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_sort_by(Url, [<<"username">>], <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ @@ -374,8 +414,13 @@ t_query_sort_by_binary_key(#{rctxs := Rctxs, url := Url}) -> Aggregated = aggregate([username], ioq_calls, Rctxs), Grouped = group(Aggregated), Ordered = order_by_value(Grouped), - {RC, Result} = active_resources_sort_by(Url, <<"username">>, <<"ioq_calls">>), - ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Result])), + {RC, Results} = active_resources_sort_by(Url, <<"username">>, <<"ioq_calls">>), + ?assertEqual(200, RC, format("Should have '200' return code, got ~p~n ~p~n", [RC, Results])), + [#{ + <<"errors">> := [], + <<"node">> := _, + <<"result">> := Result + }] = Results, ?assert(is_list(Result), format("Expected list of entries, got ~p~n", [Result])), ?assertEqual(2, length(Result), format("Expected two entries, got ~p~n ~p~n", [length(Result), Result])), ?assertMatch([ diff --git a/src/couch_stats/test/eunit/csrt_query_tests.erl b/src/couch_stats/test/eunit/csrt_query_tests.erl index f85df771d..681af5a2b 100644 --- a/src/couch_stats/test/eunit/csrt_query_tests.erl +++ b/src/couch_stats/test/eunit/csrt_query_tests.erl @@ -12,317 +12,550 @@ -module(csrt_query_tests). --include_lib("stdlib/include/ms_transform.hrl"). - -import( - csrt_test_helper, + csrt_query, [ - rctx_gen/0, - rctx_gen/1, - rctxs/0, - jrctx/1 + query/1, + from/1, + group_by/1, + group_by/2, + sort_by/1, + sort_by/2, + count_by/1, + options/1, + unlimited/0, + with_limit/1, + + run/1, + unsafe_run/1 ] ). --include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). --include("../../src/couch_stats_resource_tracker.hrl"). - -%% Use different values than default configs to ensure they're picked up --define(THRESHOLD_DBNAME_IO, 91). --define(THRESHOLD_DOCS_READ, 123). --define(THRESHOLD_DOCS_WRITTEN, 12). --define(THRESHOLD_IOQ_CALLS, 439). --define(THRESHOLD_ROWS_READ, 43). --define(THRESHOLD_CHANGES, 79). --define(THRESHOLD_LONG_REQS, 432). --define(TEST_QUERY_LIMIT, 98). +-include_lib("stdlib/include/ms_transform.hrl"). +-include("../../src/couch_stats_resource_tracker.hrl"). +-define(MATCHERS_THRESHOLD, 1000). csrt_query_test_() -> { foreach, fun setup/0, fun teardown/1, [ - ?TDEF_FE(t_query_group_by), - ?TDEF_FE(t_query_count_by), - ?TDEF_FE(t_query_sort_by) + ?TDEF_FE(t_group_by_multiple_keys), + ?TDEF_FE(t_group_by_single_key), + ?TDEF_FE(t_group_by_binary_key), + ?TDEF_FE(t_group_by_detect_unsafe_query), + ?TDEF_FE(t_group_by_run_unsafe_query), + ?TDEF_FE(t_group_by_run_unsafe_correctness), + ?TDEF_FE(t_group_by_bad_request), + ?TDEF_FE(t_count_by_multiple_keys), + ?TDEF_FE(t_count_by_single_key), + ?TDEF_FE(t_count_by_binary_key), + ?TDEF_FE(t_count_by_bad_request), + ?TDEF_FE(t_sort_by_multiple_keys), + ?TDEF_FE(t_sort_by_single_key), + ?TDEF_FE(t_sort_by_binary_key), + ?TDEF_FE(t_sort_by_bad_request) ] }. setup() -> - Ctx = test_util:start_couch([fabric, couch_stats]), - config:set_boolean(?CSRT, "randomize_testing", false, false), - config:set_boolean(?CSRT, "enable_reporting", true, false), - config:set_boolean(?CSRT, "enable_rpc_reporting", true, false), - - ok = meck:new(ioq, [passthrough]), - ok = meck:expect(ioq, bypass, fun(_, _) -> false end), - DbName = ?tempdb(), - ok = fabric:create_db(DbName, [{q, 8}, {n, 1}]), - Docs = make_docs(100), - Opts = [], - {ok, _} = fabric:update_docs(DbName, Docs, Opts), - Method = 'GET', - Path = "/" ++ ?b2l(DbName) ++ "/_all_docs", - Nonce = couch_util:to_hex(crypto:strong_rand_bytes(5)), - Req = #httpd{method = Method, nonce = Nonce}, - {_, _} = PidRef = csrt:create_coordinator_context(Req, Path), - csrt:set_context_username(<<"user_foo">>), - csrt:set_context_dbname(DbName), - MArgs = #mrargs{include_docs = false}, - _Res = fabric:all_docs(DbName, [?ADMIN_CTX], fun view_cb/2, [], MArgs), - Rctx = load_rctx(PidRef), - ok = config:set( - "csrt_logger.matchers_threshold", "docs_read", integer_to_list(?THRESHOLD_DOCS_READ), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", - "docs_written", - integer_to_list(?THRESHOLD_DOCS_WRITTEN), - false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "ioq_calls", integer_to_list(?THRESHOLD_IOQ_CALLS), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "rows_read", integer_to_list(?THRESHOLD_ROWS_READ), false - ), - ok = config:set( - "csrt_logger.matchers_threshold", - "changes_processed", - integer_to_list(?THRESHOLD_CHANGES), - false - ), - ok = config:set( - "csrt_logger.matchers_threshold", "long_reqs", integer_to_list(?THRESHOLD_LONG_REQS), false - ), - ok = config:set("csrt_logger.dbnames_io", "foo", integer_to_list(?THRESHOLD_DBNAME_IO), false), - ok = config:set("csrt_logger.dbnames_io", "bar", integer_to_list(?THRESHOLD_DBNAME_IO), false), - ok = config:set( - "csrt_logger.dbnames_io", "foo/bar", integer_to_list(?THRESHOLD_DBNAME_IO), false - ), - config:set(?CSRT, "query_limit", integer_to_list(?TEST_QUERY_LIMIT)), - csrt_logger:reload_matchers(), - Rctxs = rctxs(), - #{ctx => Ctx, dbname => DbName, rctx => Rctx, rctxs => Rctxs}. + Rctxs = [ + rctx(#{dbname => <<"db1">>, ioq_calls => 123, username => <<"user_foo">>}), + rctx(#{dbname => <<"db1">>, ioq_calls => 321, username => <<"user_foo">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 345, username => <<"user_bar">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 543, username => <<"user_bar">>}), + rctx(#{dbname => <<"db1">>, ioq_calls => 678, username => <<"user_bar">>}), + rctx(#{dbname => <<"db2">>, ioq_calls => 987, username => <<"user_foo">>}) + ], + ets:new(?CSRT_ETS, [ + named_table, + public, + {keypos, #rctx.pid_ref} + ]), + ets:insert(?CSRT_ETS, Rctxs), + add_matcher("docs_read", csrt_logger:matcher_on_docs_read(?MATCHERS_THRESHOLD)), + #{rctxs => Rctxs}. -teardown(#{ctx := Ctx, dbname := DbName}) -> - ok = fabric:delete_db(DbName, [?ADMIN_CTX]), - ok = meck:unload(ioq), - test_util:stop_couch(Ctx). +teardown(_) -> + ets:delete(?CSRT_ETS). -load_rctx(PidRef) -> - %% Add slight delay to accumulate RPC response deltas - timer:sleep(50), - csrt:get_resource(PidRef). +rctx(Opts) -> + % Update `docs_read` to make standard `{docs_read, fun matcher_on_docs_read/1, 1000}` + % matcher match. + BaseOpts = #{docs_read => ?MATCHERS_THRESHOLD + 1, username => <<"user_foo">>}, + csrt_test_helper:rctx_gen(maps:merge(BaseOpts, Opts)). -make_docs(Count) -> - lists:map( - fun(I) -> - #doc{ - id = ?l2b("foo_" ++ integer_to_list(I)), - body = {[{<<"value">>, I}]} - } - end, - lists:seq(1, Count) - ). +dummy_key_fun(#rctx{username = Username}) -> + Username. -view_cb({row, Row}, Acc) -> - {ok, [Row | Acc]}; -view_cb(_Msg, Acc) -> - {ok, Acc}. +dummy_value_fun(#rctx{ioq_calls = IoqCalls}) -> + IoqCalls. -t_query_group_by(#{rctx := Rctx, dbname := DbName}) -> - IoqCalls = Rctx#rctx.ioq_calls, - ?assertMatch( - {ok, #{{<<"user_foo">>, DbName} := IoqCalls}}, - csrt_query:query_group_by("rows_read", [username, dbname], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [atom(), ...]'" - ), - ?assertMatch( - {ok, #{{<<"user_foo">>} := IoqCalls}}, - csrt_query:query_group_by("rows_read", [username], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [atom()]'" - ), - ?assertMatch( - {ok, #{{<<"user_foo">>} := IoqCalls}}, - csrt_query:query_group_by("rows_read", ["username"], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [string()]'" - ), - ?assertMatch( - {ok, #{<<"user_foo">> := IoqCalls}}, - csrt_query:query_group_by("rows_read", username, <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: atom()'" +t_group_by_multiple_keys(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = group(Aggregated), + V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped), + V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped), + V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped), + V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped), + Q = query([ + from("docs_read"), + group_by([<<"username">>, <<"dbname">>], <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, #{ + {<<"user_bar">>, <<"db1">>} := V1, + {<<"user_bar">>, <<"db2">>} := V2, + {<<"user_foo">>, <<"db1">>} := V3, + {<<"user_foo">>, <<"db2">>} := V4 + }}, + run(Q) ), - ?assertMatch( - {ok, #{<<"user_foo">> := IoqCalls}}, - csrt_query:query_group_by("rows_read", "username", <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: string()'" - ), - ?assertMatch( - {ok, #{{<<"user_foo">>, DbName} := IoqCalls}}, - csrt_query:query_group_by("rows_read", [username, dbname], "ioq_calls", #{}), - "Should handle 'ValueKey :: string()'" + ok. + +t_group_by_single_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), + Q = query([ + from("docs_read"), + group_by([<<"username">>], <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, #{ + {<<"user_bar">>} := V1, + {<<"user_foo">>} := V2 + }}, + run(Q) ), - ?assertMatch( - {ok, #{{<<"user_foo">>, DbName} := IoqCalls}}, - csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, #{}), - "Should handle 'ValueKey :: atom()'" + ok. + +t_group_by_binary_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), + Q = query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, #{ + <<"user_bar">> := V1, + <<"user_foo">> := V2 + }}, + run(Q) ), - ?assertMatch( - {ok, #{{<<"user_foo">>, DbName} := IoqCalls}}, - csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, #{limit => ?TEST_QUERY_LIMIT - 1}), - "Should handle 'limit' option" + ok. + +t_group_by_detect_unsafe_query(_) -> + ?assertMatch({error, {unsafe_query, _}}, run(query([ + from(all), + group_by(<<"username">>, <<"ioq_calls">>) + ])), "Should detect `unsafe` when `all` matcher is used"), + ?assertMatch({error, {unsafe_query, _}}, run(query([ + from("docs_read"), + group_by(fun dummy_key_fun/1, <<"ioq_calls">>) + ])), "Should detect `unsafe` when `AggregationKey` is a function()"), + ?assertMatch({error, {unsafe_query, _}}, run(query([ + from("docs_read"), + group_by(<<"username">>, fun dummy_value_fun/1) + ])), "Should detect `unsafe` when `ValueKey` is a function()"), + ?assertMatch({error, {unsafe_query, _}}, run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>), + options([ + unlimited() + ]) + ])), "Should detect `unsafe` when `unlimited()` is used"), + ok. + +t_group_by_run_unsafe_query(_) -> + ?assertMatch({ok, _}, unsafe_run(query([ + from(all), + group_by(<<"username">>, <<"ioq_calls">>) + ])), "Should be able to use `unsafe_run` when `all` matcher is used"), + ?assertMatch({ok, _}, unsafe_run(query([ + from("docs_read"), + group_by(fun dummy_key_fun/1, <<"ioq_calls">>) + ])), "Should be able to use `unsafe_run` when `AggregationKey` is a function()"), + ?assertMatch({ok, _}, unsafe_run(query([ + from("docs_read"), + group_by(<<"username">>, fun dummy_value_fun/1) + ])), "Should be able to use `unsafe_run` when `ValueKey` is a function()"), + ?assertMatch({ok, _}, unsafe_run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>), + options([ + unlimited() + ]) + ])), "Should be able to use `unsafe_run` when `unlimited()` is used"), + ok. + +t_group_by_run_unsafe_correctness(_) -> + % we are checking that safe analog of the query return same result + ?assertEqual( + run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>) + ])), + unsafe_run(query([ + from(all), + group_by(<<"username">>, <<"ioq_calls">>) + ])), + "Should get correct result from `unsafe_run` when `all` matcher is used" + ), + ?assertEqual( + run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>) + ])), + unsafe_run(query([ + from("docs_read"), + group_by(fun dummy_key_fun/1, <<"ioq_calls">>) + ])), + "Should get correct result from `unsafe_run` when `AggregationKey` is a function()" + ), + ?assertEqual( + run(query([ + from("docs_read"), + group_by(<<"username">>, ioq_calls) + ])), + unsafe_run(query([ + from("docs_read"), + group_by(<<"username">>, fun dummy_value_fun/1) + ])), + "Should get correct result from `unsafe_run` when `ValueKey` is a function()" + ), + ?assertEqual( + run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>) + ])), + unsafe_run(query([ + from("docs_read"), + group_by(<<"username">>, <<"ioq_calls">>), + options([ + unlimited() + ]) + ])), + "Should get correct result from `unsafe_run` when `unlimited()` is used" ), + ok. + +t_group_by_bad_request(_) -> ?assertMatch( - {error,{unknown_matcher,"unknown_matcher"}}, - csrt_query:query_group_by("unknown_matcher", [username, dbname], ioq_calls, #{}), + {error, [{unknown_matcher, "unknown_matcher"}]}, + query([ + from("unknown_matcher"), + group_by(<<"username">>, <<"ioq_calls">>) + ]), "Should return error if 'matcher' is unknown" ), ?assertMatch( - {error,{unknown_matcher, rows_read}}, - csrt_query:query_group_by(rows_read, [username, dbname], ioq_calls, #{}), + {error, [{unknown_matcher, rows_read}]}, + query([ + from(rows_read), + group_by([username, dbname], ioq_calls) + ]), "Should return error if 'matcher' is not a string()" ), ?assertMatch( - {error, {invalid_key, "unknown_field"}}, - csrt_query:query_group_by("rows_read", "unknown_field", ioq_calls, #{}), + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + group_by("unknown_field", ioq_calls) + ]), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( - {error, {invalid_key, "unknown_field"}}, - csrt_query:query_group_by("rows_read", "username", "unknown_field", #{}), + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + group_by("username", "unknown_field") + ]), "Should return error if 'ValueKey' contain unknown field" ), ?assertMatch( - {error, {beyond_limit, ?TEST_QUERY_LIMIT}}, - csrt_query:query_group_by("rows_read", [username, dbname], ioq_calls, #{limit => ?TEST_QUERY_LIMIT + 1}), + {error, [{beyond_limit, ?QUERY_LIMIT + 1}]}, + query([ + from("docs_read"), + group_by("username", ioq_calls), + options([ + with_limit(?QUERY_LIMIT + 1) + ]) + ]), "Should return error when 'limit' is greater than configured" ), ok. -t_query_count_by(#{dbname := DbName}) -> - IoqCount = 1, - ?assertMatch( - {ok, #{{<<"user_foo">>, DbName} := IoqCount}}, - csrt_query:query_count_by("rows_read", [username, dbname], #{}), - "Should handle 'AggregationKeys :: [atom(), ...]'" +t_count_by_multiple_keys(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = count(Aggregated), + V1 = maps:get({<<"user_bar">>, <<"db1">>}, Grouped), + V2 = maps:get({<<"user_bar">>, <<"db2">>}, Grouped), + V3 = maps:get({<<"user_foo">>, <<"db1">>}, Grouped), + V4 = maps:get({<<"user_foo">>, <<"db2">>}, Grouped), + Q = query([ + from("docs_read"), + count_by([<<"username">>, <<"dbname">>]) + ]), + ?assertMatch( + {ok, #{ + {<<"user_bar">>, <<"db1">>} := V1, + {<<"user_bar">>, <<"db2">>} := V2, + {<<"user_foo">>, <<"db1">>} := V3, + {<<"user_foo">>, <<"db2">>} := V4 + }}, + run(Q) ), - ?assertMatch( - {ok, #{{<<"user_foo">>} := IoqCount}}, - csrt_query:query_count_by("rows_read", [username], #{}), - "Should handle 'AggregationKeys :: [atom()]'" - ), - ?assertMatch( - {ok, #{{<<"user_foo">>} := IoqCount}}, - csrt_query:query_count_by("rows_read", ["username"], #{}), - "Should handle 'AggregationKeys :: [string()]'" - ), - ?assertMatch( - {ok, #{<<"user_foo">> := IoqCount}}, - csrt_query:query_count_by("rows_read", username, #{}), - "Should handle 'AggregationKeys :: atom()'" - ), - ?assertMatch( - {ok, #{<<"user_foo">> := IoqCount}}, - csrt_query:query_count_by("rows_read", "username", #{}), - "Should handle 'AggregationKeys :: string()'" + ok. + +t_count_by_single_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = count(Aggregated), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), + Q = query([ + from("docs_read"), + count_by([<<"username">>]) + ]), + ?assertMatch( + {ok, #{ + {<<"user_bar">>} := V1, + {<<"user_foo">>} := V2 + }}, + run(Q) ), - ?assertMatch( - {ok, #{<<"user_foo">> := IoqCount}}, - csrt_query:query_count_by("rows_read", "username", #{limit => ?TEST_QUERY_LIMIT - 1}), - "Should handle 'limit' option" + ok. + +t_count_by_binary_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = count(Aggregated), + V1 = maps:get({<<"user_bar">>}, Grouped), + V2 = maps:get({<<"user_foo">>}, Grouped), + Q = query([ + from("docs_read"), + count_by(<<"username">>) + ]), + ?assertMatch( + {ok, #{ + <<"user_bar">> := V1, + <<"user_foo">> := V2 + }}, + run(Q) ), + ok. + +t_count_by_bad_request(_) -> ?assertMatch( - {error,{unknown_matcher,"unknown_matcher"}}, - csrt_query:query_count_by("unknown_matcher", [username, dbname], #{}), + {error, [{unknown_matcher, "unknown_matcher"}]}, + query([ + from("unknown_matcher"), + count_by(<<"username">>) + ]), "Should return error if 'matcher' is unknown" ), ?assertMatch( - {error,{unknown_matcher, rows_read}}, - csrt_query:query_count_by(rows_read, [username, dbname], #{}), + {error, [{unknown_matcher, rows_read}]}, + query([ + from(rows_read), + count_by([username, dbname]) + ]), "Should return error if 'matcher' is not a string()" ), ?assertMatch( - {error, {invalid_key, "unknown_field"}}, - csrt_query:query_count_by("rows_read", "unknown_field", #{}), + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + count_by("unknown_field") + ]), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( - {error, {beyond_limit, ?TEST_QUERY_LIMIT}}, - csrt_query:query_count_by("rows_read", [username, dbname], #{limit => ?TEST_QUERY_LIMIT + 1}), + {error, [{beyond_limit, ?QUERY_LIMIT + 1}]}, + query([ + from("docs_read"), + count_by("username"), + options([ + with_limit(?QUERY_LIMIT + 1) + ]) + ]), "Should return error when 'limit' is greater than configured" ), ok. -t_query_sort_by(#{rctx := Rctx, dbname := DbName}) -> - IoqCalls = Rctx#rctx.ioq_calls, - ?assertMatch( - {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", [username, dbname], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [atom(), ...]'" - ), - ?assertMatch( - {ok, [{{<<"user_foo">>}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", [username], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [atom()]'" - ), - ?assertMatch( - {ok, [{{<<"user_foo">>}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", ["username"], <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: [string()]'" +t_sort_by_multiple_keys(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username, dbname], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + [ + {{<<"user_foo">>, <<"db2">>}, V1}, + {{<<"user_bar">>, <<"db2">>}, V2}, + {{<<"user_bar">>, <<"db1">>}, V3}, + {{<<"user_foo">>, <<"db1">>}, V4} + ] = Ordered, + Q = query([ + from("docs_read"), + sort_by([<<"username">>, <<"dbname">>], <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, [ + {{<<"user_foo">>, <<"db2">>}, V1}, + {{<<"user_bar">>, <<"db2">>}, V2}, + {{<<"user_bar">>, <<"db1">>}, V3}, + {{<<"user_foo">>, <<"db1">>}, V4} + ]}, + run(Q) ), - ?assertMatch( - {ok, [{<<"user_foo">>, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", username, <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: atom()'" + ok. + +t_sort_by_single_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + [ + {{<<"user_bar">>}, V1}, + {{<<"user_foo">>}, V2} + ] = Ordered, + Q = query([ + from("docs_read"), + sort_by([<<"username">>], <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, [ + {{<<"user_bar">>}, V1}, + {{<<"user_foo">>}, V2} + ]}, + run(Q) ), - ?assertMatch( - {ok, [{<<"user_foo">>, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", "username", <<"ioq_calls">>, #{}), - "Should handle 'AggregationKeys :: string()'" + ok. + +t_sort_by_binary_key(#{rctxs := Rctxs}) -> + Aggregated = aggregate([username], ioq_calls, Rctxs), + Grouped = group(Aggregated), + Ordered = order_by_value(Grouped), + [ + {{<<"user_bar">>}, V1}, + {{<<"user_foo">>}, V2} + ] = Ordered, + Q = query([ + from("docs_read"), + sort_by(<<"username">>, <<"ioq_calls">>) + ]), + ?assertMatch( + {ok, [ + {<<"user_bar">>, V1}, + {<<"user_foo">>, V2} + ]}, + run(Q) ), + ok. + +t_sort_by_bad_request(_) -> ?assertMatch( - {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", [username, dbname], "ioq_calls", #{}), - "Should handle 'ValueKey :: string()'" + {error, [{unknown_matcher, "unknown_matcher"}]}, + query([ + from("unknown_matcher"), + sort_by(<<"username">>, <<"ioq_calls">>) + ]), + "Should return error if 'matcher' is unknown" ), ?assertMatch( - {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, #{}), - "Should handle 'ValueKey :: atom()'" + {error, [{unknown_matcher, "unknown_matcher"}]}, + query([ + from("unknown_matcher"), + sort_by(<<"username">>) + ]), + "Should return error if 'matcher' is unknown" ), ?assertMatch( - {ok, [{{<<"user_foo">>, DbName}, IoqCalls}]}, - csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, #{limit => ?TEST_QUERY_LIMIT - 1}), - "Should handle 'limit' option" + {error, [{unknown_matcher, rows_read}]}, + query([ + from(rows_read), + sort_by([username, dbname], ioq_calls) + ]), + "Should return error if 'matcher' is not a string()" ), ?assertMatch( - {error,{unknown_matcher,"unknown_matcher"}}, - csrt_query:query_sort_by("unknown_matcher", [username, dbname], ioq_calls, #{}), - "Should return error if 'matcher' is unknown" + {error, [{unknown_matcher, rows_read}]}, + query([ + from(rows_read), + sort_by([username, dbname]) + ]), + "Should return error if 'matcher' is not a string()" ), ?assertMatch( - {error,{unknown_matcher, rows_read}}, - csrt_query:query_sort_by(rows_read, [username, dbname], ioq_calls, #{}), - "Should return error if 'matcher' is not a string()" + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + sort_by("unknown_field", ioq_calls) + ]), + "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( - {error, {invalid_key, "unknown_field"}}, - csrt_query:query_sort_by("rows_read", "unknown_field", ioq_calls, #{}), + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + sort_by("unknown_field") + ]), "Should return error if 'AggregationKeys' contain unknown field" ), ?assertMatch( - {error, {invalid_key, "unknown_field"}}, - csrt_query:query_sort_by("rows_read", "username", "unknown_field", #{}), + {error, [{invalid_key, "unknown_field"}]}, + query([ + from("docs_read"), + sort_by("username", "unknown_field") + ]), "Should return error if 'ValueKey' contain unknown field" ), ?assertMatch( - {error, {beyond_limit, ?TEST_QUERY_LIMIT}}, - csrt_query:query_sort_by("rows_read", [username, dbname], ioq_calls, #{limit => ?TEST_QUERY_LIMIT + 1}), + {error, [{beyond_limit, ?QUERY_LIMIT + 1}]}, + query([ + from("docs_read"), + sort_by("username", ioq_calls), + options([ + with_limit(?QUERY_LIMIT + 1) + ]) + ]), + "Should return error when 'limit' is greater than configured" + ), + ?assertMatch( + {error, [{beyond_limit, ?QUERY_LIMIT + 1}]}, + query([ + from("docs_read"), + sort_by("username"), + options([ + with_limit(?QUERY_LIMIT + 1) + ]) + ]), "Should return error when 'limit' is greater than configured" ), ok. + +add_matcher(Name, MSpec) -> + persistent_term:put({csrt_logger, all_csrt_matchers}, #{Name => {MSpec, ets:match_spec_compile(MSpec)}}). + +aggregate(AggregationKeys, ValField, Records) -> + lists:foldl(fun(Rctx, Acc) -> + Key = list_to_tuple([csrt_entry:value(Field, Rctx) || Field <- AggregationKeys]), + CurrVal = maps:get(Key, Acc, []), + maps:put(Key, [csrt_entry:value(ValField, Rctx) | CurrVal], Acc) + end, #{}, Records). + +group(Aggregated) -> + maps:fold(fun(Key, Val, Acc) -> + maps:put(Key, lists:foldl(fun erlang:'+'/2, 0, Val), Acc) + end, #{}, Aggregated). + +count(Aggregated) -> + maps:fold(fun(Key, Val, Acc) -> + maps:put(Key, lists:foldl(fun(_, A) -> A + 1 end, 0, Val), Acc) + end, #{}, Aggregated). + +order_by_value(Grouped) -> + lists:reverse(lists:keysort(2, maps:to_list(Grouped))).
