This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1e6dbe67996144cacd3833bdc8353146fdeeaae6 Author: Robert Newson <[email protected]> AuthorDate: Wed Feb 18 14:04:33 2026 +0000 refactor to allow delayed worker startup --- src/fabric/src/fabric_doc_update.erl | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index a977180bc..1fac344ef 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -22,7 +22,9 @@ doc_count, w, grouped_docs, - reply + reply, + dbname, + update_options }). go(_, [], _) -> @@ -33,10 +35,8 @@ go(DbName, AllDocs0, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), GroupedDocs = lists:map( - fun({#shard{name = Name, node = Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), - {Shard#shard{ref = Ref}, Docs} + fun({#shard{} = Shard, Docs}) -> + {Shard#shard{ref = make_ref()}, Docs} end, group_docs_by_shard(DbName, AllDocs) ), @@ -47,10 +47,13 @@ go(DbName, AllDocs0, Opts) -> doc_count = length(AllDocs), w = fabric_util:w_from_opts(DbName, Options), grouped_docs = GroupedDocs, - reply = dict:new() + reply = dict:new(), + dbname = DbName, + update_options = Options }, Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of + Acc1 = start_workers(Acc0), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted; Health =:= error -> @@ -350,6 +353,21 @@ validate_atomic_update(_DbName, AllDocs, true) -> ), throw({aborted, PreCommitFailures}). +start_workers(#acc{} = Acc) -> + lists:foreach( + fun({Worker, Docs}) -> + start_worker(Worker, Docs, Acc) + end, + Acc#acc.grouped_docs + ), + Acc. + +start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when is_reference(Ref) -> + #shard{name = Name, node = Node} = Worker, + #acc{update_options = UpdateOptions} = Acc, + rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}), + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl").
