This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/auto-delete-3 by this push:
     new 494ea0b41 handle shard splits
494ea0b41 is described below

commit 494ea0b410a65fa8dea4ed5b8a2dc52d5d4446c1
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 9 10:45:48 2025 +0100

    handle shard splits
    
    had to make uuid matching optional for the specific case where a
    peer checkpoint contains a pre-split range and the shard map is returned
    with a split. we don't know the uuids of the post-split shards and looking
    them up just to substitute them here is pointless.
---
 src/couch/src/couch_bt_engine.erl  | 15 ++++++---
 src/fabric/src/fabric_drop_seq.erl | 64 +++++++++++++++++++++++++++++++++++---
 2 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl 
b/src/couch/src/couch_bt_engine.erl
index 3a365162d..df3d01d30 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -814,16 +814,21 @@ set_update_seq(#st{header = Header} = St, UpdateSeq) ->
         needs_commit = true
     }}.
 
-set_drop_seq(#st{header = Header} = St, ExpectedUuidPrefix, NewDropSeq) when
-    is_binary(ExpectedUuidPrefix), is_integer(NewDropSeq), NewDropSeq > 0
-->
-    CurrentDropSeq = get_drop_seq(St),
+set_drop_seq(#st{} = St, undefined, NewDropSeq) ->
+    set_drop_seq(St, NewDropSeq);
+set_drop_seq(#st{} = St, ExpectedUuidPrefix, NewDropSeq) when 
is_binary(ExpectedUuidPrefix) ->
     Uuid = get_uuid(St),
     ActualUuidPrefix = binary:part(Uuid, 0, byte_size(ExpectedUuidPrefix)),
-
     if
         ExpectedUuidPrefix /= ActualUuidPrefix ->
             {error, uuid_mismatch};
+        true ->
+            set_drop_seq(St, NewDropSeq)
+    end.
+
+set_drop_seq(#st{header = Header} = St, NewDropSeq) when 
is_integer(NewDropSeq), NewDropSeq > 0 ->
+    CurrentDropSeq = get_drop_seq(St),
+    if
         NewDropSeq < CurrentDropSeq ->
             {error, {drop_seq_cant_decrease, CurrentDropSeq, NewDropSeq}};
         true ->
diff --git a/src/fabric/src/fabric_drop_seq.erl 
b/src/fabric/src/fabric_drop_seq.erl
index 5dd65884d..76fb6ae3f 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -33,14 +33,13 @@
 
 go(DbName) ->
     Shards0 = mem3:shards(DbName),
-    {ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
+    {ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName),
     {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
-    Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
-    PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
+    {Shards1, DropSeqs} = go_int(Shards0, PeerCheckpoints, ShardSyncHistory),
     Workers = lists:filtermap(
         fun(Shard) ->
             #shard{range = Range, node = Node, name = ShardName} = Shard,
-            case maps:find({Range, Node}, PeerCheckpoints1) of
+            case maps:find({Range, Node}, DropSeqs) of
                 {ok, {UuidPrefix, DropSeq}} ->
                     Ref = rexi:cast(
                         Node,
@@ -82,6 +81,12 @@ go(DbName) ->
             end
     end.
 
+go_int(Shards, PeerCheckpoints, ShardSyncHistory) ->
+    {
+        fully_replicated_shards_only(Shards, ShardSyncHistory),
+        calculate_drop_seqs(substitute_splits(Shards, PeerCheckpoints), 
ShardSyncHistory)
+    }.
+
 -spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) -> 
peer_checkpoints().
 calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) ->
     ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
@@ -259,6 +264,10 @@ parse_peer_checkpoint_docs_cb({row, Row}, 
PeerCheckpoints0) ->
 parse_peer_checkpoint_docs_cb(_Else, Acc) ->
     {ok, Acc}.
 
+merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
+    Uuid1 == undefined orelse Uuid2 == undefined, is_integer(Val1), 
is_integer(Val2)
+->
+    {undefined, min(Val1, Val2)};
 merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when is_integer(Val1), 
is_integer(Val2) ->
     PrefixLen = min(byte_size(Uuid1), byte_size(Uuid2)),
     true = binary:longest_common_prefix([Uuid1, Uuid2]) == PrefixLen,
@@ -318,6 +327,25 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
         ShardSyncHistory
     ).
 
+%% A peer checkpoint might refer to a range that has been split since
+%% it last updated. Find these cases and split the peer checkpoints too.
+substitute_splits(Shards, PeerCheckpoints) ->
+    maps:fold(
+        fun({[B1, E1], Node}, {_Uuid, Seq}, Acc) ->
+            MatchingRanges = [
+                S#shard.range
+             || #shard{range = [B2, E2]} = S <- Shards,
+                Node == S#shard.node,
+                B2 >= B1 andalso E2 =< E1
+            ],
+            %% we don't know the uuids of the split shards
+            AsMap = maps:from_list([{{R, Node}, {undefined, Seq}} || R <- 
MatchingRanges]),
+            maps:merge_with(fun merge_peers/3, AsMap, Acc)
+        end,
+        #{},
+        PeerCheckpoints
+    ).
+
 create_peer_checkpoint_doc_if_missing(
     <<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
 ) when
@@ -652,4 +680,32 @@ fully_replicated_shards_only_test_() ->
         )
     ].
 
+substitute_splits_test() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+
+    ?assertEqual(
+        #{{Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} => 
{undefined, 12}},
+        substitute_splits(Shards, PeerCheckpoints)
+    ).
+
+go_int_test() ->
+    Range = [0, 10],
+    Subrange1 = [0, 5],
+    Subrange2 = [6, 10],
+    Node1 = '[email protected]',
+    Shards = [#shard{range = Subrange1, node = Node1}, #shard{range = 
Subrange2, node = Node1}],
+    PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+    ShardSyncHistory = #{},
+    ?assertEqual(
+        {Shards, #{
+            {Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} => 
{undefined, 12}
+        }},
+        go_int(Shards, PeerCheckpoints, ShardSyncHistory)
+    ).
+
 -endif.

Reply via email to