This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 754f327bd62aab2a94344aa2622919147806b3f4 Author: Robert Newson <[email protected]> AuthorDate: Wed Feb 18 14:51:36 2026 +0000 start leaders first --- src/fabric/src/fabric_doc_update.erl | 74 +++++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 1fac344ef..5aba9d482 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -24,7 +24,9 @@ grouped_docs, reply, dbname, - update_options + update_options, + leaders = [], + started = [] }). go(_, [], _) -> @@ -52,7 +54,7 @@ go(DbName, AllDocs0, Opts) -> update_options = Options }, Timeout = fabric_util:request_timeout(), - Acc1 = start_workers(Acc0), + Acc1 = start_leaders(Acc0), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted; Health =:= error @@ -76,22 +78,33 @@ go(DbName, AllDocs0, Opts) -> handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) -> #acc{grouped_docs = GroupedDocs} = Acc0, - NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}); + {NewGrpDocs, DroppedGrpDocs} = lists:partition( + fun({#shard{node = N}, _}) -> N =/= NodeRef end, GroupedDocs + ), + DroppedRanges = lists:usort([S#shard.range || {S, _} <- DroppedGrpDocs]), + Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}, + Acc2 = start_followers(DroppedRanges, Acc1), + skip_message(Acc2); handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) -> #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers([Worker#shard.range], Acc1), + skip_message(Acc2); handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) -> % treat it like rexi_EXIT, the hope at least one copy will return successfully #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers([Worker#shard.range], Acc1), + skip_message(Acc2); handle_message(internal_server_error, Worker, #acc{} = Acc0) -> % happens when we fail to load validation functions in an RPC worker #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers([Worker#shard.range], Acc1), + skip_message(Acc2); handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) -> {ok, Acc0}; handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> @@ -102,6 +115,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, + Acc1 = start_followers([Worker#shard.range], Acc0), {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of @@ -117,7 +131,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> % we've got at least one reply for each document, let's take a look case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of continue -> - {ok, Acc0#acc{ + {ok, Acc1#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict @@ -126,7 +140,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> {stop, {ok, FinalReplies}} end; _ -> - {ok, Acc0#acc{ + {ok, Acc1#acc{ waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict }} end; @@ -353,19 +367,53 @@ validate_atomic_update(_DbName, AllDocs, true) -> ), throw({aborted, PreCommitFailures}). -start_workers(#acc{} = Acc) -> +start_leaders(#acc{} = Acc0) -> + #acc{dbname = DbName, grouped_docs = GroupedDocs} = Acc0, + {Workers, _} = lists:unzip(GroupedDocs), + LeaderRefs = lists:foldl( + fun({Worker, Docs}, RefAcc) -> + case is_leader(DbName, Worker, Workers) of + true -> + start_worker(Worker, Docs, Acc0), + [Worker#shard.ref | RefAcc]; + false -> + RefAcc + end + end, + [], + GroupedDocs + ), + Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}. + +start_followers(Ranges, #acc{} = Acc0) -> + Followers = [ + {Worker, Docs} + || {Worker, Docs} <- Acc0#acc.grouped_docs, + lists:member(Worker#shard.range, Ranges), + not lists:member(Worker#shard.ref, Acc0#acc.started) + ], lists:foreach( fun({Worker, Docs}) -> - start_worker(Worker, Docs, Acc) + start_worker(Worker, Docs, Acc0) end, - Acc#acc.grouped_docs + Followers ), - Acc. + Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers], + Acc0#acc{started = lists:append([Started, Acc0#acc.started])}. + +%% use 'lowest' node that hosts this shard range as leader +is_leader(DbName, Worker, Workers) -> + Nodes0 = lists:sort([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]), + Nodes1 = mem3_util:rotate_list(DbName, Nodes0), + Worker#shard.node == hd(Nodes1). start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when is_reference(Ref) -> #shard{name = Name, node = Node} = Worker, #acc{update_options = UpdateOptions} = Acc, rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}), + ok; +start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> + % for unit tests below. ok. -ifdef(TEST).
