This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch optimize-purge
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit fb4ca210e8155f12d6786387b1e907b40ae293e5
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Sun Oct 12 02:24:19 2025 -0400

    Optimize purge
    
    Optimizations consist in 3 improvements:
    
     * In fabric_doc_purge use maps instead dicts. During setup avoid traversing
     the requests and uuids too many times. Instead, generate more in the same
     iteration: when creating requests, generate the response structure; when
     generating worker uuids, generate uuids counts.
    
     * Reduce purge UUID sizes by half by using binary values instead hex 
encoded
     values. These UUIDs are generated internally and are never returned or
     accepted through the API so we don't have to hex-encode them.
    
     * Use UUID v7 for purge UUIDs. Generate them directly instead of going 
through
     a single gen_server bottleneck like before. They are standard, and yet
     preserve the same nice seqential prefix property which help when these are
     used for B-tree IDs (which they are in this case).
    
    Expand the test suite a bit, add at least one end-to-end test and a few more
    test functions. The coverage increases from about 80% to almost 100%.
    
    Add a few comments with the shape of various data structures as a reminder 
for
    the future.
    
    Purging 100k conflicted docs on a q=1,n=1 db improved from 106 sec to 87 sec
    which is about 18%
---
 src/fabric/src/fabric_doc_purge.erl | 513 ++++++++++++++++++++++++------------
 1 file changed, 339 insertions(+), 174 deletions(-)

diff --git a/src/fabric/src/fabric_doc_purge.erl 
b/src/fabric/src/fabric_doc_purge.erl
index 3719132ab..bc96430eb 100644
--- a/src/fabric/src/fabric_doc_purge.erl
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -28,42 +28,39 @@
 go(_, [], _) ->
     {ok, []};
 go(DbName, IdsRevs, Options) ->
-    % Generate our purge requests of {UUID, DocId, Revs}
-    {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
-
-    % Fire off rexi workers for each shard.
-    {Workers, WorkerUUIDs} = dict:fold(
-        fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+    % Generate our purge requests of {UUID, DocId, Revs}. Return:
+    %  * Reqs : [{UUID, DocId, Revs}]
+    %  * UUIDs : [UUID] in the same order as Reqs
+    %  * Responses : #{UUID => []} initial response accumulator
+    %
+    {UUIDs, Reqs, Responses} = create_requests_and_responses(IdsRevs),
+
+    % Fire off rexi workers for each shard. Return:
+    %  * Workers : [#shard{ref = Ref}]
+    %  * WorkerUUIDs : #{Worker => [UUID]}
+    %  * UUIDCounts : #{UUID => Counter}
+    %
+    {Workers, WorkerUUIDs, UUIDCounts} = maps:fold(
+        fun(Shard, ShardReqs, {WorkersAcc, WorkersUUIDsAcc, CountsAcc}) ->
             #shard{name = ShardDbName, node = Node} = Shard,
             Args = [ShardDbName, ShardReqs, Options],
             Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
             Worker = Shard#shard{ref = Ref},
             ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
-            {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
+            Fun = fun(UUID, Acc) -> update_counter(UUID, Acc) end,
+            CountsAcc1 = lists:foldl(Fun, CountsAcc, ShardUUIDs),
+            WorkersUUIDAcc1 = WorkersUUIDsAcc#{Worker => ShardUUIDs},
+            {[Worker | WorkersAcc], WorkersUUIDAcc1, CountsAcc1}
         end,
-        {[], []},
+        {[], #{}, #{}},
         group_reqs_by_shard(DbName, Reqs)
     ),
 
-    UUIDCounts = lists:foldl(
-        fun({_Worker, WUUIDs}, CountAcc) ->
-            lists:foldl(
-                fun(UUID, InnerCountAcc) ->
-                    dict:update_counter(UUID, 1, InnerCountAcc)
-                end,
-                CountAcc,
-                WUUIDs
-            )
-        end,
-        dict:new(),
-        WorkerUUIDs
-    ),
-
     RexiMon = fabric_util:create_monitors(Workers),
     Timeout = fabric_util:request_timeout(),
     Acc0 = #acc{
         worker_uuids = WorkerUUIDs,
-        resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+        resps = Responses,
         uuid_counts = UUIDCounts,
         w = w(DbName, Options)
     },
@@ -85,8 +82,9 @@ handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
         worker_uuids = WorkerUUIDs,
         resps = Resps
     } = Acc,
-    Pred = fun({#shard{node = N}, _}) -> N == Node end,
-    {Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
+    Pred = fun(#shard{node = N}, _) -> N == Node end,
+    Failed = maps:filter(Pred, WorkerUUIDs),
+    Rest = maps:without(maps:keys(Failed), WorkerUUIDs),
     NewResps = append_errors(internal_server_error, Failed, Resps),
     maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
 handle_message({rexi_EXIT, _}, Worker, Acc) ->
@@ -94,48 +92,41 @@ handle_message({rexi_EXIT, _}, Worker, Acc) ->
         worker_uuids = WorkerUUIDs,
         resps = Resps
     } = Acc,
-    {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
-    NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
-    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+    {FailedUUIDs, WorkerUUIDs1} = maps:take(Worker, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, #{Worker => FailedUUIDs}, 
Resps),
+    maybe_stop(Acc#acc{worker_uuids = WorkerUUIDs1, resps = NewResps});
 handle_message({ok, Replies}, Worker, Acc) ->
     #acc{
         worker_uuids = WorkerUUIDs,
         resps = Resps
     } = Acc,
-    {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    {UUIDs, WorkerUUIDs1} = maps:take(Worker, WorkerUUIDs),
     NewResps = append_resps(UUIDs, Replies, Resps),
-    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+    maybe_stop(Acc#acc{worker_uuids = WorkerUUIDs1, resps = NewResps});
 handle_message({bad_request, Msg}, _, _) ->
     throw({bad_request, Msg}).
 
 handle_timeout(#acc{worker_uuids = DefunctWorkerUUIDs, resps = Resps} = Acc) ->
-    DefunctWorkers = [Worker || {Worker, _} <- DefunctWorkerUUIDs],
+    DefunctWorkers = maps:keys(DefunctWorkerUUIDs),
     fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
     NewResps = append_errors(timeout, DefunctWorkerUUIDs, Resps),
-    Acc#acc{worker_uuids = [], resps = NewResps}.
+    Acc#acc{worker_uuids = #{}, resps = NewResps}.
 
-create_reqs([], UUIDs, Reqs) ->
-    {lists:reverse(UUIDs), lists:reverse(Reqs)};
-create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
-    UUID = couch_uuids:new(),
-    NewUUIDs = [UUID | UUIDs],
-    NewReqs = [{UUID, Id, lists:usort(Revs)} | Reqs],
-    create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
+create_requests_and_responses(IdsRevs) ->
+    Fun = fun({Id, Revs}, {UUIDsAcc, RespAcc}) ->
+        UUID = couch_uuids:v7_bin(),
+        {{UUID, Id, lists:usort(Revs)}, {[UUID | UUIDsAcc], RespAcc#{UUID => 
[]}}}
+    end,
+    {IdRevs1, {UUIDs, Resps}} = lists:mapfoldl(Fun, {[], #{}}, IdsRevs),
+    {lists:reverse(UUIDs), IdRevs1, Resps}.
 
 group_reqs_by_shard(DbName, Reqs) ->
-    lists:foldl(
-        fun({_UUID, Id, _Revs} = Req, D0) ->
-            lists:foldl(
-                fun(Shard, D1) ->
-                    dict:append(Shard, Req, D1)
-                end,
-                D0,
-                mem3:shards(DbName, Id)
-            )
+    ReqFoldFun =
+        fun({_UUID, Id, _Revs} = Req, #{} = Map0) ->
+            AppendFun = fun(Shard, Map1) -> map_append(Shard, Req, Map1) end,
+            lists:foldl(AppendFun, Map0, mem3:shards(DbName, Id))
         end,
-        dict:new(),
-        Reqs
-    ).
+    lists:foldl(ReqFoldFun, #{}, Reqs).
 
 w(DbName, Options) ->
     try
@@ -145,9 +136,11 @@ w(DbName, Options) ->
             mem3:quorum(DbName)
     end.
 
-append_errors(Type, WorkerUUIDs, Resps) ->
-    lists:foldl(
-        fun({_Worker, UUIDs}, RespAcc) ->
+% Failed WorkerUUIDs = #{#shard{} => [UUIDs, ...]}
+% Resps = #{UUID => [{ok, ...} | {error, ...}]
+append_errors(Type, #{} = WorkerUUIDs, #{} = Resps) ->
+    maps:fold(
+        fun(_Worker, UUIDs, RespAcc) ->
             Errors = [{error, Type} || _UUID <- UUIDs],
             append_resps(UUIDs, Errors, RespAcc)
         end,
@@ -155,59 +148,54 @@ append_errors(Type, WorkerUUIDs, Resps) ->
         WorkerUUIDs
     ).
 
-append_resps([], [], Resps) ->
+append_resps([], [], #{} = Resps) ->
     Resps;
-append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
-    NewResps = dict:append(UUID, Reply, Resps),
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], #{} = Resps) ->
+    NewResps = map_append(UUID, Reply, Resps),
     append_resps(RestUUIDs, RestReplies, NewResps).
 
-maybe_stop(#acc{worker_uuids = []} = Acc) ->
+maybe_stop(#acc{worker_uuids = #{} = Map} = Acc) when map_size(Map) == 0 ->
     {stop, Acc};
-maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
+maybe_stop(#acc{resps = #{} = Resps, uuid_counts = #{} = Counts, w = W} = Acc) 
->
     try
-        dict:fold(
-            fun(UUID, UUIDResps, _) ->
-                UUIDCount = dict:fetch(UUID, Counts),
+        Fun =
+            fun(UUID, UUIDResps) ->
+                #{UUID := UUIDCount} = Counts,
                 case has_quorum(UUIDResps, UUIDCount, W) of
                     true -> ok;
                     false -> throw(keep_going)
                 end
             end,
-            nil,
-            Resps
-        ),
+        maps:foreach(Fun, Resps),
         {stop, Acc}
     catch
         throw:keep_going ->
             {ok, Acc}
     end.
 
-format_resps(UUIDs, #acc{} = Acc) ->
-    #acc{
-        resps = Resps,
-        w = W
-    } = Acc,
-    FoldFun = fun(UUID, Replies, ReplyAcc) ->
+format_resps(UUIDs, #acc{resps = Resps, w = W}) ->
+    Fun = fun(_UUID, Replies) ->
         OkReplies = [Reply || {ok, Reply} <- Replies],
         case OkReplies of
             [] ->
                 [Error | _] = lists:usort(Replies),
-                [{UUID, Error} | ReplyAcc];
-            _ ->
+                Error;
+            [_ | _] ->
                 AllRevs = lists:usort(lists:flatten(OkReplies)),
-                IsOk =
-                    length(OkReplies) >= W andalso
-                        length(lists:usort(OkReplies)) == 1,
+                IsOk = length(OkReplies) >= W andalso 
length(lists:usort(OkReplies)) == 1,
                 Health =
                     if
                         IsOk -> ok;
                         true -> accepted
                     end,
-                [{UUID, {Health, AllRevs}} | ReplyAcc]
+                {Health, AllRevs}
         end
     end,
-    FinalReplies = dict:fold(FoldFun, [], Resps),
-    couch_util:reorder_results(UUIDs, FinalReplies);
+    FinalReplies = maps:map(Fun, Resps),
+    % Reorder results in the same order as the the initial IdRevs
+    % this also implicitly asserts that the all UUIDs should have
+    % a matching reply
+    [map_get(UUID, FinalReplies) || UUID <- UUIDs];
 format_resps(_UUIDs, Else) ->
     Else.
 
@@ -225,22 +213,45 @@ resp_health(Resps) ->
 
 has_quorum(Resps, Count, W) ->
     OkResps = [R || {ok, _} = R <- Resps],
-    OkCounts = lists:foldl(
-        fun(R, Acc) ->
-            orddict:update_counter(R, 1, Acc)
-        end,
-        orddict:new(),
-        OkResps
-    ),
-    MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]),
+    OkCounts = lists:foldl(fun(R, Acc) -> update_counter(R, Acc) end, #{}, 
OkResps),
+    MaxOk = lists:max([0 | maps:values(OkCounts)]),
     if
         MaxOk >= W -> true;
         length(Resps) >= Count -> true;
         true -> false
     end.
 
+map_append(Key, Val, #{} = Map) ->
+    maps:update_with(Key, fun(V) -> [Val | V] end, [Val], Map).
+
+update_counter(Key, #{} = Map) ->
+    maps:update_with(Key, fun(V) -> V + 1 end, 1, Map).
+
 -ifdef(TEST).
+
 -include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+response_health_test() ->
+    ?assertEqual(error, resp_health([])),
+    ?assertEqual(error, resp_health([{potato, x}])),
+    ?assertEqual(ok, resp_health([{ok, x}, {ok, y}])),
+    ?assertEqual(accepted, resp_health([{accepted, x}])),
+    ?assertEqual(accepted, resp_health([{ok, x}, {accepted, y}])),
+    ?assertEqual(error, resp_health([{error, x}])),
+    ?assertEqual(error, resp_health([{ok, x}, {error, y}])),
+    ?assertEqual(error, resp_health([{error, x}, {accepted, y}, {ok, z}])).
+
+has_quorum_test() ->
+    ?assertEqual(true, has_quorum([], 0, 0)),
+    ?assertEqual(true, has_quorum([], 1, 0)),
+    ?assertEqual(true, has_quorum([], 0, 1)),
+    ?assertEqual(false, has_quorum([], 1, 1)),
+    ?assertEqual(true, has_quorum([{ok, x}], 1, 1)),
+    ?assertEqual(true, has_quorum([{accepted, x}], 1, 1)),
+    ?assertEqual(false, has_quorum([{accepted, x}], 2, 1)),
+    ?assertEqual(false, has_quorum([{accepted, x}, {ok, y}], 3, 2)),
+    ?assertEqual(true, has_quorum([{accepted, x}, {ok, y}], 2, 2)).
 
 purge_test_() ->
     {
@@ -248,6 +259,8 @@ purge_test_() ->
         fun setup/0,
         fun teardown/1,
         with([
+            ?TDEF(t_create_reqs),
+
             ?TDEF(t_w2_ok),
             ?TDEF(t_w3_ok),
 
@@ -262,29 +275,53 @@ purge_test_() ->
 
             ?TDEF(t_mixed_ok_accepted),
             ?TDEF(t_mixed_errors),
+            ?TDEF(t_rexi_down_error),
             ?TDEF(t_timeout)
         ])
     }.
 
 setup() ->
-    meck:new(couch_log),
-    meck:expect(couch_log, warning, fun(_, _) -> ok end),
-    meck:expect(couch_log, notice, fun(_, _) -> ok end),
-    meck:expect(couch_log, error, fun(_, _) -> ok end).
+    test_util:start_couch().
 
-teardown(_) ->
-    meck:unload().
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+t_create_reqs(_) ->
+    ?assertEqual({[], [], #{}}, create_requests_and_responses([])),
+    IdRevs = [
+        {<<"3">>, []},
+        {<<"1">>, [<<"2-b">>, <<"1-a">>]},
+        {<<"2">>, [<<"3-c">>, <<"1-d">>, <<"3-c">>]}
+    ],
+    Res = create_requests_and_responses(IdRevs),
+    ?assertMatch({[<<_/binary>> | _], [{<<_/binary>>, _, _} | _], #{}}, Res),
+    {UUIDs, IdRevs1, Resps} = Res,
+    ?assertEqual(3, length(UUIDs)),
+    ?assertEqual(3, length(IdRevs1)),
+    ?assertEqual(3, map_size(Resps)),
+    ?assertEqual(lists:sort(UUIDs), lists:sort(maps:keys(Resps))),
+    {IdRevsUUIDs, DocIds, Revs} = lists:unzip3(IdRevs1),
+    ?assertEqual(UUIDs, IdRevsUUIDs),
+    ?assertEqual([<<"3">>, <<"1">>, <<"2">>], DocIds),
+    ?assertEqual(
+        [
+            [],
+            [<<"1-a">>, <<"2-b">>],
+            [<<"1-d">>, <<"3-c">>]
+        ],
+        Revs
+    ).
 
 t_w2_ok(_) ->
     Acc0 = create_init_acc(2),
     Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
 
     {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, true),
 
     Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
@@ -300,11 +337,11 @@ t_w3_ok(_) ->
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
@@ -318,15 +355,15 @@ t_w2_mixed_accepted(_) ->
     Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
 
     {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [
@@ -343,15 +380,15 @@ t_w3_mixed_accepted(_) ->
     Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
 
     {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [
@@ -368,15 +405,15 @@ t_w2_exit1_ok(_) ->
     ExitMsg = {rexi_EXIT, blargh},
 
     {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
@@ -390,15 +427,15 @@ t_w2_exit2_accepted(_) ->
     ExitMsg = {rexi_EXIT, blargh},
 
     {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
@@ -411,15 +448,15 @@ t_w2_exit3_error(_) ->
     ExitMsg = {rexi_EXIT, blargh},
 
     {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [
@@ -439,15 +476,15 @@ t_w4_accepted(_) ->
     Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
 
     {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+    ?assertEqual(2, map_size(Acc1#acc.worker_uuids)),
     check_quorum(Acc1, false),
 
     {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
-    ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+    ?assertEqual(1, map_size(Acc2#acc.worker_uuids)),
     check_quorum(Acc2, false),
 
     {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
-    ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+    ?assertEqual(0, map_size(Acc3#acc.worker_uuids)),
     check_quorum(Acc3, true),
 
     Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
@@ -456,20 +493,20 @@ t_w4_accepted(_) ->
     ?assertEqual(accepted, resp_health(Resps)).
 
 t_mixed_ok_accepted(_) ->
-    WorkerUUIDs = [
-        {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
-
-        {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
-    ],
+    WorkerUUIDs = #{
+        #shard{node = a, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = b, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = c, range = [1, 2]} => [<<"uuid1">>],
+
+        #shard{node = a, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = b, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = c, range = [3, 4]} => [<<"uuid2">>]
+    },
 
     Acc0 = #acc{
         worker_uuids = WorkerUUIDs,
-        resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
-        uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+        resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+        uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
         w = 2
     },
 
@@ -477,11 +514,11 @@ t_mixed_ok_accepted(_) ->
     Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
     ExitMsg = {rexi_EXIT, blargh},
 
-    {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
-    {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
-    {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
-    {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
-    {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+    {ok, Acc1} = handle_message(Msg1, worker(a, [1, 2], Acc0), Acc0),
+    {ok, Acc2} = handle_message(Msg1, worker(b, [1, 2], Acc0), Acc1),
+    {ok, Acc3} = handle_message(ExitMsg, worker(a, [3, 4], Acc0), Acc2),
+    {ok, Acc4} = handle_message(ExitMsg, worker(b, [3, 4], Acc0), Acc3),
+    {stop, Acc5} = handle_message(Msg2, worker(c, [3, 4], Acc0), Acc4),
 
     Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
     Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
@@ -489,59 +526,94 @@ t_mixed_ok_accepted(_) ->
     ?assertEqual(accepted, resp_health(Resps)).
 
 t_mixed_errors(_) ->
-    WorkerUUIDs = [
-        {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
-
-        {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
-    ],
+    WorkerUUIDs = #{
+        #shard{node = a, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = b, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = c, range = [1, 2]} => [<<"uuid1">>],
+
+        #shard{node = a, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = b, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = c, range = [3, 4]} => [<<"uuid2">>]
+    },
 
     Acc0 = #acc{
         worker_uuids = WorkerUUIDs,
-        resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
-        uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+        resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+        uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
         w = 2
     },
 
     Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
     ExitMsg = {rexi_EXIT, blargh},
 
-    {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
-    {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
-    {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
-    {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+    {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0),
+    {ok, Acc2} = handle_message(Msg, worker(b, [1, 2], Acc0), Acc1),
+    {ok, Acc3} = handle_message(ExitMsg, worker(a, [3, 4], Acc0), Acc2),
+    {ok, Acc4} = handle_message(ExitMsg, worker(b, [3, 4], Acc0), Acc3),
+    {stop, Acc5} = handle_message(ExitMsg, worker(c, [3, 4], Acc0), Acc4),
 
     Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
     Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
     ?assertEqual(Expect, Resps),
     ?assertEqual(error, resp_health(Resps)).
 
-t_timeout(_) ->
-    WorkerUUIDs = [
-        {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
-        {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
-
-        {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
-        {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+t_rexi_down_error(_) ->
+    WorkerUUIDs = #{
+        #shard{node = a, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = b, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = c, range = [1, 2]} => [<<"uuid1">>],
+
+        #shard{node = a, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = b, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = c, range = [3, 4]} => [<<"uuid2">>]
+    },
+
+    Acc0 = #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+        uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+        w = 2
+    },
+
+    Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+    {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0),
+
+    DownMsgB = {rexi_DOWN, nodedown, {nil, b}, nil},
+    {ok, Acc2} = handle_message(DownMsgB, worker(b, [1, 2], Acc0), Acc1),
+
+    DownMsgC = {rexi_DOWN, nodedown, {nil, c}, nil},
+    {ok, Acc3} = handle_message(DownMsgC, worker(c, [3, 4], Acc0), Acc2),
+
+    Expect = [
+        {accepted, [{1, <<"foo">>}]},
+        {error, internal_server_error}
     ],
+    Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+    ?assertEqual(Expect, Resps),
+    ?assertEqual(error, resp_health(Resps)).
+
+t_timeout(_) ->
+    WorkerUUIDs = #{
+        #shard{node = a, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = b, range = [1, 2]} => [<<"uuid1">>],
+        #shard{node = c, range = [1, 2]} => [<<"uuid1">>],
+
+        #shard{node = a, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = b, range = [3, 4]} => [<<"uuid2">>],
+        #shard{node = c, range = [3, 4]} => [<<"uuid2">>]
+    },
 
     Acc0 = #acc{
         worker_uuids = WorkerUUIDs,
-        resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
-        uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+        resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+        uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
         w = 2
     },
 
     Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
-    {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
-    {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
-    {ok, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+    {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0),
+    {ok, Acc2} = handle_message(Msg, worker(b, [1, 2], Acc0), Acc1),
+    {ok, Acc3} = handle_message(Msg, worker(c, [1, 2], Acc0), Acc2),
     Acc4 = handle_timeout(Acc3),
     Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc4),
     ?assertEqual([{ok, [{1, <<"foo">>}]}, {error, timeout}], Resps).
@@ -556,31 +628,124 @@ create_init_acc(W) ->
     % Create our worker_uuids. We're relying on the fact that
     % we're using a fake Q=1 db so we don't have to worry
     % about any hashing here.
-    WorkerUUIDs = lists:map(
-        fun(Shard) ->
-            {Shard#shard{ref = make_ref()}, [UUID1, UUID2]}
-        end,
-        Shards
-    ),
+    UUIDs = [UUID1, UUID2],
+    Workers = [{S#shard{ref = make_ref()}, UUIDs} || S <- Shards],
+    WorkerUUIDs = maps:from_list(Workers),
 
     #acc{
         worker_uuids = WorkerUUIDs,
-        resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
-        uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+        resps = #{UUID1 => [], UUID2 => []},
+        uuid_counts = #{UUID1 => 3, UUID2 => 3},
         w = W
     }.
 
+worker(Node, Range, #acc{worker_uuids = WorkerUUIDs}) ->
+    Workers = maps:keys(WorkerUUIDs),
+    Pred = fun(#shard{node = N, range = R}) ->
+        Node =:= N andalso Range =:= R
+    end,
+    case lists:filter(Pred, Workers) of
+        [W] -> W;
+        _ -> error(not_found)
+    end.
+
 worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
-    {Worker, _} = lists:nth(N, WorkerUUIDs),
-    Worker.
+    Workers = maps:keys(WorkerUUIDs),
+    lists:nth(N, lists:sort(Workers)).
 
 check_quorum(Acc, Expect) ->
-    dict:fold(
-        fun(_Shard, Resps, _) ->
+    maps:map(
+        fun(_Shard, Resps) ->
             ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
         end,
-        nil,
         Acc#acc.resps
     ).
 
+purge_end_to_end_test_() ->
+    {
+        setup,
+        fun() ->
+            Ctx = test_util:start_couch([fabric]),
+            DbName = ?tempdb(),
+            ok = fabric:create_db(DbName, [{q, 2}, {n, 1}]),
+            {Ctx, DbName}
+        end,
+        fun({Ctx, DbName}) ->
+            fabric:delete_db(DbName),
+            test_util:stop_couch(Ctx),
+            meck:unload()
+        end,
+        with([
+            ?TDEF(t_purge),
+            ?TDEF(t_purge_missing_doc_id),
+            ?TDEF(t_purge_missing_rev)
+        ])
+    }.
+
+t_purge({_Ctx, DbName}) ->
+    Rev1 = update_doc(DbName, <<"1">>),
+    Rev2 = update_doc(DbName, <<"2">>),
+    Rev3 = update_doc(DbName, <<"3">>),
+    Res = fabric:purge_docs(
+        DbName,
+        [
+            {<<"3">>, [Rev3]},
+            {<<"1">>, [Rev1]},
+            {<<"2">>, [Rev2]}
+        ],
+        []
+    ),
+    ?assertMatch({ok, [_, _, _]}, Res),
+    {ok, [Res3, Res1, Res2]} = Res,
+    ?assertMatch({ok, [Rev1]}, Res1),
+    ?assertMatch({ok, [Rev2]}, Res2),
+    ?assertMatch({ok, [Rev3]}, Res3).
+
+t_purge_missing_doc_id({_Ctx, DbName}) ->
+    ?assertMatch({ok, []}, fabric:purge_docs(DbName, [], [])),
+    Rev1 = update_doc(DbName, <<"1">>),
+    Rev2 = update_doc(DbName, <<"2">>),
+    Res = fabric:purge_docs(
+        DbName,
+        [
+            {<<"3">>, [Rev1]},
+            {<<"1">>, [Rev1]},
+            {<<"2">>, [Rev2]}
+        ],
+        []
+    ),
+    ?assertMatch({ok, [_, _, _]}, Res),
+    {ok, [Res3, Res1, Res2]} = Res,
+    ?assertMatch({ok, [Rev1]}, Res1),
+    ?assertMatch({ok, [Rev2]}, Res2),
+    ?assertMatch({ok, []}, Res3).
+
+t_purge_missing_rev({_Ctx, DbName}) ->
+    Rev1 = update_doc(DbName, <<"1">>),
+    Rev2 = update_doc(DbName, <<"2">>),
+    update_doc(DbName, <<"3">>),
+    Res = fabric:purge_docs(
+        DbName,
+        [
+            {<<"1">>, [Rev2, Rev1]},
+            {<<"2">>, [Rev1]},
+            {<<"3">>, []}
+        ],
+        []
+    ),
+    ?assertMatch({ok, [_, _, _]}, Res),
+    {ok, [Res1, Res2, Res3]} = Res,
+    ?assertMatch({ok, [Rev1]}, Res1),
+    ?assertMatch({ok, []}, Res2),
+    ?assertMatch({ok, []}, Res3).
+
+update_doc(DbName, Id) ->
+    fabric_util:isolate(fun() ->
+        Data = binary:encode_hex(crypto:strong_rand_bytes(10)),
+        Doc = #doc{id = Id, body = {[{<<"foo">>, Data}]}},
+        case fabric:update_doc(DbName, Doc, []) of
+            {ok, Res} -> Res
+        end
+    end).
+
 -endif.

Reply via email to