This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/auto-delete-3 by this push:
     new e82f9299f handle shard splits
e82f9299f is described below

commit e82f9299f0accf461e4b025a66814e265012c81d
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 9 10:45:48 2025 +0100

    handle shard splits
---
 src/fabric/src/fabric_drop_seq.erl | 36 ++++++++++++++++++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)

diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index 5dd65884d..333031185 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -34,13 +34,14 @@
 go(DbName) ->
     Shards0 = mem3:shards(DbName),
     {ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
+    PeerCheckpoints1 = substitute_splits(Shards0, PeerCheckpoints0),
     {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
     Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
-    PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
+    DropSeqs = calculate_drop_seqs(PeerCheckpoints1, ShardSyncHistory),
     Workers = lists:filtermap(
         fun(Shard) ->
             #shard{range = Range, node = Node, name = ShardName} = Shard,
-            case maps:find({Range, Node}, PeerCheckpoints1) of
+            case maps:find({Range, Node}, DropSeqs) of
                 {ok, {UuidPrefix, DropSeq}} ->
                     Ref = rexi:cast(
                         Node,
@@ -318,6 +319,24 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
         ShardSyncHistory
     ).
 
+%% A peer checkpoint might refer to a range that has been split since
+%% it last updated. Find these cases and split the peer checkpoints too.
+substitute_splits(Shards, PeerCheckpoints) ->
+    maps:fold(
+        fun({[B1, E1], Node}, Value, Acc) ->
+            MatchingRanges = [
+                S#shard.range
+             || #shard{range = [B2, E2]} = S <- Shards,
+                Node == S#shard.node,
+                B2 >= B1 andalso E2 =< E1
+            ],
+            AsMap = maps:from_list([{{R, Node}, Value} || R <- 
MatchingRanges]),
+            maps:merge_with(fun merge_peers/3, AsMap, Acc)
+        end,
+        #{},
+        PeerCheckpoints
+    ).
+
 create_peer_checkpoint_doc_if_missing(
     <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
 ) when
@@ -652,4 +671,17 @@ fully_replicated_shards_only_test_() ->
         )
     ].
 
+substitute_splits_test() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+
+    ?assertEqual(
+        #{{Subrange1, Node1} => {<<"uuid1">>, 12}, {Subrange2, Node1} => 
{<<"uuid1">>, 12}},
+        substitute_splits(Shards, PeerCheckpoints)
+    ).
+
 -endif.

Reply via email to