This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch reduce-intra-cluster-conflicts-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1e6dbe67996144cacd3833bdc8353146fdeeaae6
Author: Robert Newson <[email protected]>
AuthorDate: Wed Feb 18 14:04:33 2026 +0000

    refactor to allow delayed worker startup
---
 src/fabric/src/fabric_doc_update.erl | 32 +++++++++++++++++++++++++-------
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git a/src/fabric/src/fabric_doc_update.erl 
b/src/fabric/src/fabric_doc_update.erl
index a977180bc..1fac344ef 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -22,7 +22,9 @@
     doc_count,
     w,
     grouped_docs,
-    reply
+    reply,
+    dbname,
+    update_options
 }).
 
 go(_, [], _) ->
@@ -33,10 +35,8 @@ go(DbName, AllDocs0, Opts) ->
     validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, 
Opts)),
     Options = lists:delete(all_or_nothing, Opts),
     GroupedDocs = lists:map(
-        fun({#shard{name = Name, node = Node} = Shard, Docs}) ->
-            Docs1 = untag_docs(Docs),
-            Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, 
Options]}),
-            {Shard#shard{ref = Ref}, Docs}
+        fun({#shard{} = Shard, Docs}) ->
+            {Shard#shard{ref = make_ref()}, Docs}
         end,
         group_docs_by_shard(DbName, AllDocs)
     ),
@@ -47,10 +47,13 @@ go(DbName, AllDocs0, Opts) ->
         doc_count = length(AllDocs),
         w = fabric_util:w_from_opts(DbName, Options),
         grouped_docs = GroupedDocs,
-        reply = dict:new()
+        reply = dict:new(),
+        dbname = DbName,
+        update_options = Options
     },
     Timeout = fabric_util:request_timeout(),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, 
infinity, Timeout) of
+    Acc1 = start_workers(Acc0),
+    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, 
infinity, Timeout) of
         {ok, {Health, Results}} when
             Health =:= ok; Health =:= accepted; Health =:= error
         ->
@@ -350,6 +353,21 @@ validate_atomic_update(_DbName, AllDocs, true) ->
     ),
     throw({aborted, PreCommitFailures}).
 
+start_workers(#acc{} = Acc) ->
+    lists:foreach(
+        fun({Worker, Docs}) ->
+            start_worker(Worker, Docs, Acc)
+        end,
+        Acc#acc.grouped_docs
+    ),
+    Acc.
+
+start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc) when 
is_reference(Ref) ->
+    #shard{name = Name, node = Node} = Worker,
+    #acc{update_options = UpdateOptions} = Acc,
+    rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, 
untag_docs(Docs), UpdateOptions]}),
+    ok.
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").

Reply via email to