This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch optimize-purge in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit fb4ca210e8155f12d6786387b1e907b40ae293e5 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Sun Oct 12 02:24:19 2025 -0400 Optimize purge Optimizations consist in 3 improvements: * In fabric_doc_purge use maps instead dicts. During setup avoid traversing the requests and uuids too many times. Instead, generate more in the same iteration: when creating requests, generate the response structure; when generating worker uuids, generate uuids counts. * Reduce purge UUID sizes by half by using binary values instead hex encoded values. These UUIDs are generated internally and are never returned or accepted through the API so we don't have to hex-encode them. * Use UUID v7 for purge UUIDs. Generate them directly instead of going through a single gen_server bottleneck like before. They are standard, and yet preserve the same nice seqential prefix property which help when these are used for B-tree IDs (which they are in this case). Expand the test suite a bit, add at least one end-to-end test and a few more test functions. The coverage increases from about 80% to almost 100%. Add a few comments with the shape of various data structures as a reminder for the future. Purging 100k conflicted docs on a q=1,n=1 db improved from 106 sec to 87 sec which is about 18% --- src/fabric/src/fabric_doc_purge.erl | 513 ++++++++++++++++++++++++------------ 1 file changed, 339 insertions(+), 174 deletions(-) diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl index 3719132ab..bc96430eb 100644 --- a/src/fabric/src/fabric_doc_purge.erl +++ b/src/fabric/src/fabric_doc_purge.erl @@ -28,42 +28,39 @@ go(_, [], _) -> {ok, []}; go(DbName, IdsRevs, Options) -> - % Generate our purge requests of {UUID, DocId, Revs} - {UUIDs, Reqs} = create_reqs(IdsRevs, [], []), - - % Fire off rexi workers for each shard. - {Workers, WorkerUUIDs} = dict:fold( - fun(Shard, ShardReqs, {Ws, WUUIDs}) -> + % Generate our purge requests of {UUID, DocId, Revs}. Return: + % * Reqs : [{UUID, DocId, Revs}] + % * UUIDs : [UUID] in the same order as Reqs + % * Responses : #{UUID => []} initial response accumulator + % + {UUIDs, Reqs, Responses} = create_requests_and_responses(IdsRevs), + + % Fire off rexi workers for each shard. Return: + % * Workers : [#shard{ref = Ref}] + % * WorkerUUIDs : #{Worker => [UUID]} + % * UUIDCounts : #{UUID => Counter} + % + {Workers, WorkerUUIDs, UUIDCounts} = maps:fold( + fun(Shard, ShardReqs, {WorkersAcc, WorkersUUIDsAcc, CountsAcc}) -> #shard{name = ShardDbName, node = Node} = Shard, Args = [ShardDbName, ShardReqs, Options], Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}), Worker = Shard#shard{ref = Ref}, ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs], - {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]} + Fun = fun(UUID, Acc) -> update_counter(UUID, Acc) end, + CountsAcc1 = lists:foldl(Fun, CountsAcc, ShardUUIDs), + WorkersUUIDAcc1 = WorkersUUIDsAcc#{Worker => ShardUUIDs}, + {[Worker | WorkersAcc], WorkersUUIDAcc1, CountsAcc1} end, - {[], []}, + {[], #{}, #{}}, group_reqs_by_shard(DbName, Reqs) ), - UUIDCounts = lists:foldl( - fun({_Worker, WUUIDs}, CountAcc) -> - lists:foldl( - fun(UUID, InnerCountAcc) -> - dict:update_counter(UUID, 1, InnerCountAcc) - end, - CountAcc, - WUUIDs - ) - end, - dict:new(), - WorkerUUIDs - ), - RexiMon = fabric_util:create_monitors(Workers), Timeout = fabric_util:request_timeout(), Acc0 = #acc{ worker_uuids = WorkerUUIDs, - resps = dict:from_list([{UUID, []} || UUID <- UUIDs]), + resps = Responses, uuid_counts = UUIDCounts, w = w(DbName, Options) }, @@ -85,8 +82,9 @@ handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> worker_uuids = WorkerUUIDs, resps = Resps } = Acc, - Pred = fun({#shard{node = N}, _}) -> N == Node end, - {Failed, Rest} = lists:partition(Pred, WorkerUUIDs), + Pred = fun(#shard{node = N}, _) -> N == Node end, + Failed = maps:filter(Pred, WorkerUUIDs), + Rest = maps:without(maps:keys(Failed), WorkerUUIDs), NewResps = append_errors(internal_server_error, Failed, Resps), maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); handle_message({rexi_EXIT, _}, Worker, Acc) -> @@ -94,48 +92,41 @@ handle_message({rexi_EXIT, _}, Worker, Acc) -> worker_uuids = WorkerUUIDs, resps = Resps } = Acc, - {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), - NewResps = append_errors(internal_server_error, [WorkerPair], Resps), - maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + {FailedUUIDs, WorkerUUIDs1} = maps:take(Worker, WorkerUUIDs), + NewResps = append_errors(internal_server_error, #{Worker => FailedUUIDs}, Resps), + maybe_stop(Acc#acc{worker_uuids = WorkerUUIDs1, resps = NewResps}); handle_message({ok, Replies}, Worker, Acc) -> #acc{ worker_uuids = WorkerUUIDs, resps = Resps } = Acc, - {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), + {UUIDs, WorkerUUIDs1} = maps:take(Worker, WorkerUUIDs), NewResps = append_resps(UUIDs, Replies, Resps), - maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + maybe_stop(Acc#acc{worker_uuids = WorkerUUIDs1, resps = NewResps}); handle_message({bad_request, Msg}, _, _) -> throw({bad_request, Msg}). handle_timeout(#acc{worker_uuids = DefunctWorkerUUIDs, resps = Resps} = Acc) -> - DefunctWorkers = [Worker || {Worker, _} <- DefunctWorkerUUIDs], + DefunctWorkers = maps:keys(DefunctWorkerUUIDs), fabric_util:log_timeout(DefunctWorkers, "purge_docs"), NewResps = append_errors(timeout, DefunctWorkerUUIDs, Resps), - Acc#acc{worker_uuids = [], resps = NewResps}. + Acc#acc{worker_uuids = #{}, resps = NewResps}. -create_reqs([], UUIDs, Reqs) -> - {lists:reverse(UUIDs), lists:reverse(Reqs)}; -create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> - UUID = couch_uuids:new(), - NewUUIDs = [UUID | UUIDs], - NewReqs = [{UUID, Id, lists:usort(Revs)} | Reqs], - create_reqs(RestIdsRevs, NewUUIDs, NewReqs). +create_requests_and_responses(IdsRevs) -> + Fun = fun({Id, Revs}, {UUIDsAcc, RespAcc}) -> + UUID = couch_uuids:v7_bin(), + {{UUID, Id, lists:usort(Revs)}, {[UUID | UUIDsAcc], RespAcc#{UUID => []}}} + end, + {IdRevs1, {UUIDs, Resps}} = lists:mapfoldl(Fun, {[], #{}}, IdsRevs), + {lists:reverse(UUIDs), IdRevs1, Resps}. group_reqs_by_shard(DbName, Reqs) -> - lists:foldl( - fun({_UUID, Id, _Revs} = Req, D0) -> - lists:foldl( - fun(Shard, D1) -> - dict:append(Shard, Req, D1) - end, - D0, - mem3:shards(DbName, Id) - ) + ReqFoldFun = + fun({_UUID, Id, _Revs} = Req, #{} = Map0) -> + AppendFun = fun(Shard, Map1) -> map_append(Shard, Req, Map1) end, + lists:foldl(AppendFun, Map0, mem3:shards(DbName, Id)) end, - dict:new(), - Reqs - ). + lists:foldl(ReqFoldFun, #{}, Reqs). w(DbName, Options) -> try @@ -145,9 +136,11 @@ w(DbName, Options) -> mem3:quorum(DbName) end. -append_errors(Type, WorkerUUIDs, Resps) -> - lists:foldl( - fun({_Worker, UUIDs}, RespAcc) -> +% Failed WorkerUUIDs = #{#shard{} => [UUIDs, ...]} +% Resps = #{UUID => [{ok, ...} | {error, ...}] +append_errors(Type, #{} = WorkerUUIDs, #{} = Resps) -> + maps:fold( + fun(_Worker, UUIDs, RespAcc) -> Errors = [{error, Type} || _UUID <- UUIDs], append_resps(UUIDs, Errors, RespAcc) end, @@ -155,59 +148,54 @@ append_errors(Type, WorkerUUIDs, Resps) -> WorkerUUIDs ). -append_resps([], [], Resps) -> +append_resps([], [], #{} = Resps) -> Resps; -append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) -> - NewResps = dict:append(UUID, Reply, Resps), +append_resps([UUID | RestUUIDs], [Reply | RestReplies], #{} = Resps) -> + NewResps = map_append(UUID, Reply, Resps), append_resps(RestUUIDs, RestReplies, NewResps). -maybe_stop(#acc{worker_uuids = []} = Acc) -> +maybe_stop(#acc{worker_uuids = #{} = Map} = Acc) when map_size(Map) == 0 -> {stop, Acc}; -maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) -> +maybe_stop(#acc{resps = #{} = Resps, uuid_counts = #{} = Counts, w = W} = Acc) -> try - dict:fold( - fun(UUID, UUIDResps, _) -> - UUIDCount = dict:fetch(UUID, Counts), + Fun = + fun(UUID, UUIDResps) -> + #{UUID := UUIDCount} = Counts, case has_quorum(UUIDResps, UUIDCount, W) of true -> ok; false -> throw(keep_going) end end, - nil, - Resps - ), + maps:foreach(Fun, Resps), {stop, Acc} catch throw:keep_going -> {ok, Acc} end. -format_resps(UUIDs, #acc{} = Acc) -> - #acc{ - resps = Resps, - w = W - } = Acc, - FoldFun = fun(UUID, Replies, ReplyAcc) -> +format_resps(UUIDs, #acc{resps = Resps, w = W}) -> + Fun = fun(_UUID, Replies) -> OkReplies = [Reply || {ok, Reply} <- Replies], case OkReplies of [] -> [Error | _] = lists:usort(Replies), - [{UUID, Error} | ReplyAcc]; - _ -> + Error; + [_ | _] -> AllRevs = lists:usort(lists:flatten(OkReplies)), - IsOk = - length(OkReplies) >= W andalso - length(lists:usort(OkReplies)) == 1, + IsOk = length(OkReplies) >= W andalso length(lists:usort(OkReplies)) == 1, Health = if IsOk -> ok; true -> accepted end, - [{UUID, {Health, AllRevs}} | ReplyAcc] + {Health, AllRevs} end end, - FinalReplies = dict:fold(FoldFun, [], Resps), - couch_util:reorder_results(UUIDs, FinalReplies); + FinalReplies = maps:map(Fun, Resps), + % Reorder results in the same order as the the initial IdRevs + % this also implicitly asserts that the all UUIDs should have + % a matching reply + [map_get(UUID, FinalReplies) || UUID <- UUIDs]; format_resps(_UUIDs, Else) -> Else. @@ -225,22 +213,45 @@ resp_health(Resps) -> has_quorum(Resps, Count, W) -> OkResps = [R || {ok, _} = R <- Resps], - OkCounts = lists:foldl( - fun(R, Acc) -> - orddict:update_counter(R, 1, Acc) - end, - orddict:new(), - OkResps - ), - MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]), + OkCounts = lists:foldl(fun(R, Acc) -> update_counter(R, Acc) end, #{}, OkResps), + MaxOk = lists:max([0 | maps:values(OkCounts)]), if MaxOk >= W -> true; length(Resps) >= Count -> true; true -> false end. +map_append(Key, Val, #{} = Map) -> + maps:update_with(Key, fun(V) -> [Val | V] end, [Val], Map). + +update_counter(Key, #{} = Map) -> + maps:update_with(Key, fun(V) -> V + 1 end, 1, Map). + -ifdef(TEST). + -include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +response_health_test() -> + ?assertEqual(error, resp_health([])), + ?assertEqual(error, resp_health([{potato, x}])), + ?assertEqual(ok, resp_health([{ok, x}, {ok, y}])), + ?assertEqual(accepted, resp_health([{accepted, x}])), + ?assertEqual(accepted, resp_health([{ok, x}, {accepted, y}])), + ?assertEqual(error, resp_health([{error, x}])), + ?assertEqual(error, resp_health([{ok, x}, {error, y}])), + ?assertEqual(error, resp_health([{error, x}, {accepted, y}, {ok, z}])). + +has_quorum_test() -> + ?assertEqual(true, has_quorum([], 0, 0)), + ?assertEqual(true, has_quorum([], 1, 0)), + ?assertEqual(true, has_quorum([], 0, 1)), + ?assertEqual(false, has_quorum([], 1, 1)), + ?assertEqual(true, has_quorum([{ok, x}], 1, 1)), + ?assertEqual(true, has_quorum([{accepted, x}], 1, 1)), + ?assertEqual(false, has_quorum([{accepted, x}], 2, 1)), + ?assertEqual(false, has_quorum([{accepted, x}, {ok, y}], 3, 2)), + ?assertEqual(true, has_quorum([{accepted, x}, {ok, y}], 2, 2)). purge_test_() -> { @@ -248,6 +259,8 @@ purge_test_() -> fun setup/0, fun teardown/1, with([ + ?TDEF(t_create_reqs), + ?TDEF(t_w2_ok), ?TDEF(t_w3_ok), @@ -262,29 +275,53 @@ purge_test_() -> ?TDEF(t_mixed_ok_accepted), ?TDEF(t_mixed_errors), + ?TDEF(t_rexi_down_error), ?TDEF(t_timeout) ]) }. setup() -> - meck:new(couch_log), - meck:expect(couch_log, warning, fun(_, _) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end), - meck:expect(couch_log, error, fun(_, _) -> ok end). + test_util:start_couch(). -teardown(_) -> - meck:unload(). +teardown(Ctx) -> + test_util:stop_couch(Ctx). + +t_create_reqs(_) -> + ?assertEqual({[], [], #{}}, create_requests_and_responses([])), + IdRevs = [ + {<<"3">>, []}, + {<<"1">>, [<<"2-b">>, <<"1-a">>]}, + {<<"2">>, [<<"3-c">>, <<"1-d">>, <<"3-c">>]} + ], + Res = create_requests_and_responses(IdRevs), + ?assertMatch({[<<_/binary>> | _], [{<<_/binary>>, _, _} | _], #{}}, Res), + {UUIDs, IdRevs1, Resps} = Res, + ?assertEqual(3, length(UUIDs)), + ?assertEqual(3, length(IdRevs1)), + ?assertEqual(3, map_size(Resps)), + ?assertEqual(lists:sort(UUIDs), lists:sort(maps:keys(Resps))), + {IdRevsUUIDs, DocIds, Revs} = lists:unzip3(IdRevs1), + ?assertEqual(UUIDs, IdRevsUUIDs), + ?assertEqual([<<"3">>, <<"1">>, <<"2">>], DocIds), + ?assertEqual( + [ + [], + [<<"1-a">>, <<"2-b">>], + [<<"1-d">>, <<"3-c">>] + ], + Revs + ). t_w2_ok(_) -> Acc0 = create_init_acc(2), Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, true), Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], @@ -300,11 +337,11 @@ t_w3_ok(_) -> check_quorum(Acc1, false), {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], @@ -318,15 +355,15 @@ t_w2_mixed_accepted(_) -> Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [ @@ -343,15 +380,15 @@ t_w3_mixed_accepted(_) -> Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [ @@ -368,15 +405,15 @@ t_w2_exit1_ok(_) -> ExitMsg = {rexi_EXIT, blargh}, {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], @@ -390,15 +427,15 @@ t_w2_exit2_accepted(_) -> ExitMsg = {rexi_EXIT, blargh}, {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], @@ -411,15 +448,15 @@ t_w2_exit3_error(_) -> ExitMsg = {rexi_EXIT, blargh}, {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [ @@ -439,15 +476,15 @@ t_w4_accepted(_) -> Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), + ?assertEqual(2, map_size(Acc1#acc.worker_uuids)), check_quorum(Acc1, false), {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), + ?assertEqual(1, map_size(Acc2#acc.worker_uuids)), check_quorum(Acc2, false), {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), + ?assertEqual(0, map_size(Acc3#acc.worker_uuids)), check_quorum(Acc3, true), Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], @@ -456,20 +493,20 @@ t_w4_accepted(_) -> ?assertEqual(accepted, resp_health(Resps)). t_mixed_ok_accepted(_) -> - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], + WorkerUUIDs = #{ + #shard{node = a, range = [1, 2]} => [<<"uuid1">>], + #shard{node = b, range = [1, 2]} => [<<"uuid1">>], + #shard{node = c, range = [1, 2]} => [<<"uuid1">>], + + #shard{node = a, range = [3, 4]} => [<<"uuid2">>], + #shard{node = b, range = [3, 4]} => [<<"uuid2">>], + #shard{node = c, range = [3, 4]} => [<<"uuid2">>] + }, Acc0 = #acc{ worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), w = 2 }, @@ -477,11 +514,11 @@ t_mixed_ok_accepted(_) -> Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, ExitMsg = {rexi_EXIT, blargh}, - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), + {ok, Acc1} = handle_message(Msg1, worker(a, [1, 2], Acc0), Acc0), + {ok, Acc2} = handle_message(Msg1, worker(b, [1, 2], Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(a, [3, 4], Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(b, [3, 4], Acc0), Acc3), + {stop, Acc5} = handle_message(Msg2, worker(c, [3, 4], Acc0), Acc4), Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), @@ -489,59 +526,94 @@ t_mixed_ok_accepted(_) -> ?assertEqual(accepted, resp_health(Resps)). t_mixed_errors(_) -> - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], + WorkerUUIDs = #{ + #shard{node = a, range = [1, 2]} => [<<"uuid1">>], + #shard{node = b, range = [1, 2]} => [<<"uuid1">>], + #shard{node = c, range = [1, 2]} => [<<"uuid1">>], + + #shard{node = a, range = [3, 4]} => [<<"uuid2">>], + #shard{node = b, range = [3, 4]} => [<<"uuid2">>], + #shard{node = c, range = [3, 4]} => [<<"uuid2">>] + }, Acc0 = #acc{ worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), w = 2 }, Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, ExitMsg = {rexi_EXIT, blargh}, - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), + {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(b, [1, 2], Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(a, [3, 4], Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(b, [3, 4], Acc0), Acc3), + {stop, Acc5} = handle_message(ExitMsg, worker(c, [3, 4], Acc0), Acc4), Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), ?assertEqual(Expect, Resps), ?assertEqual(error, resp_health(Resps)). -t_timeout(_) -> - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} +t_rexi_down_error(_) -> + WorkerUUIDs = #{ + #shard{node = a, range = [1, 2]} => [<<"uuid1">>], + #shard{node = b, range = [1, 2]} => [<<"uuid1">>], + #shard{node = c, range = [1, 2]} => [<<"uuid1">>], + + #shard{node = a, range = [3, 4]} => [<<"uuid2">>], + #shard{node = b, range = [3, 4]} => [<<"uuid2">>], + #shard{node = c, range = [3, 4]} => [<<"uuid2">>] + }, + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0), + + DownMsgB = {rexi_DOWN, nodedown, {nil, b}, nil}, + {ok, Acc2} = handle_message(DownMsgB, worker(b, [1, 2], Acc0), Acc1), + + DownMsgC = {rexi_DOWN, nodedown, {nil, c}, nil}, + {ok, Acc3} = handle_message(DownMsgC, worker(c, [3, 4], Acc0), Acc2), + + Expect = [ + {accepted, [{1, <<"foo">>}]}, + {error, internal_server_error} ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)). + +t_timeout(_) -> + WorkerUUIDs = #{ + #shard{node = a, range = [1, 2]} => [<<"uuid1">>], + #shard{node = b, range = [1, 2]} => [<<"uuid1">>], + #shard{node = c, range = [1, 2]} => [<<"uuid1">>], + + #shard{node = a, range = [3, 4]} => [<<"uuid2">>], + #shard{node = b, range = [3, 4]} => [<<"uuid2">>], + #shard{node = c, range = [3, 4]} => [<<"uuid2">>] + }, Acc0 = #acc{ worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + resps = maps:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = maps:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), w = 2 }, Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + {ok, Acc1} = handle_message(Msg, worker(a, [1, 2], Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(b, [1, 2], Acc0), Acc1), + {ok, Acc3} = handle_message(Msg, worker(c, [1, 2], Acc0), Acc2), Acc4 = handle_timeout(Acc3), Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc4), ?assertEqual([{ok, [{1, <<"foo">>}]}, {error, timeout}], Resps). @@ -556,31 +628,124 @@ create_init_acc(W) -> % Create our worker_uuids. We're relying on the fact that % we're using a fake Q=1 db so we don't have to worry % about any hashing here. - WorkerUUIDs = lists:map( - fun(Shard) -> - {Shard#shard{ref = make_ref()}, [UUID1, UUID2]} - end, - Shards - ), + UUIDs = [UUID1, UUID2], + Workers = [{S#shard{ref = make_ref()}, UUIDs} || S <- Shards], + WorkerUUIDs = maps:from_list(Workers), #acc{ worker_uuids = WorkerUUIDs, - resps = dict:from_list([{UUID1, []}, {UUID2, []}]), - uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]), + resps = #{UUID1 => [], UUID2 => []}, + uuid_counts = #{UUID1 => 3, UUID2 => 3}, w = W }. +worker(Node, Range, #acc{worker_uuids = WorkerUUIDs}) -> + Workers = maps:keys(WorkerUUIDs), + Pred = fun(#shard{node = N, range = R}) -> + Node =:= N andalso Range =:= R + end, + case lists:filter(Pred, Workers) of + [W] -> W; + _ -> error(not_found) + end. + worker(N, #acc{worker_uuids = WorkerUUIDs}) -> - {Worker, _} = lists:nth(N, WorkerUUIDs), - Worker. + Workers = maps:keys(WorkerUUIDs), + lists:nth(N, lists:sort(Workers)). check_quorum(Acc, Expect) -> - dict:fold( - fun(_Shard, Resps, _) -> + maps:map( + fun(_Shard, Resps) -> ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w)) end, - nil, Acc#acc.resps ). +purge_end_to_end_test_() -> + { + setup, + fun() -> + Ctx = test_util:start_couch([fabric]), + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [{q, 2}, {n, 1}]), + {Ctx, DbName} + end, + fun({Ctx, DbName}) -> + fabric:delete_db(DbName), + test_util:stop_couch(Ctx), + meck:unload() + end, + with([ + ?TDEF(t_purge), + ?TDEF(t_purge_missing_doc_id), + ?TDEF(t_purge_missing_rev) + ]) + }. + +t_purge({_Ctx, DbName}) -> + Rev1 = update_doc(DbName, <<"1">>), + Rev2 = update_doc(DbName, <<"2">>), + Rev3 = update_doc(DbName, <<"3">>), + Res = fabric:purge_docs( + DbName, + [ + {<<"3">>, [Rev3]}, + {<<"1">>, [Rev1]}, + {<<"2">>, [Rev2]} + ], + [] + ), + ?assertMatch({ok, [_, _, _]}, Res), + {ok, [Res3, Res1, Res2]} = Res, + ?assertMatch({ok, [Rev1]}, Res1), + ?assertMatch({ok, [Rev2]}, Res2), + ?assertMatch({ok, [Rev3]}, Res3). + +t_purge_missing_doc_id({_Ctx, DbName}) -> + ?assertMatch({ok, []}, fabric:purge_docs(DbName, [], [])), + Rev1 = update_doc(DbName, <<"1">>), + Rev2 = update_doc(DbName, <<"2">>), + Res = fabric:purge_docs( + DbName, + [ + {<<"3">>, [Rev1]}, + {<<"1">>, [Rev1]}, + {<<"2">>, [Rev2]} + ], + [] + ), + ?assertMatch({ok, [_, _, _]}, Res), + {ok, [Res3, Res1, Res2]} = Res, + ?assertMatch({ok, [Rev1]}, Res1), + ?assertMatch({ok, [Rev2]}, Res2), + ?assertMatch({ok, []}, Res3). + +t_purge_missing_rev({_Ctx, DbName}) -> + Rev1 = update_doc(DbName, <<"1">>), + Rev2 = update_doc(DbName, <<"2">>), + update_doc(DbName, <<"3">>), + Res = fabric:purge_docs( + DbName, + [ + {<<"1">>, [Rev2, Rev1]}, + {<<"2">>, [Rev1]}, + {<<"3">>, []} + ], + [] + ), + ?assertMatch({ok, [_, _, _]}, Res), + {ok, [Res1, Res2, Res3]} = Res, + ?assertMatch({ok, [Rev1]}, Res1), + ?assertMatch({ok, []}, Res2), + ?assertMatch({ok, []}, Res3). + +update_doc(DbName, Id) -> + fabric_util:isolate(fun() -> + Data = binary:encode_hex(crypto:strong_rand_bytes(10)), + Doc = #doc{id = Id, body = {[{<<"foo">>, Data}]}}, + case fabric:update_doc(DbName, Doc, []) of + {ok, Res} -> Res + end + end). + -endif.
