This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new e82f9299f handle shard splits
e82f9299f is described below
commit e82f9299f0accf461e4b025a66814e265012c81d
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 9 10:45:48 2025 +0100
handle shard splits
---
src/fabric/src/fabric_drop_seq.erl | 36 ++++++++++++++++++++++++++++++++++--
1 file changed, 34 insertions(+), 2 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 5dd65884d..333031185 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -34,13 +34,14 @@
go(DbName) ->
Shards0 = mem3:shards(DbName),
{ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
+ PeerCheckpoints1 = substitute_splits(Shards0, PeerCheckpoints0),
{ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
- PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
+ DropSeqs = calculate_drop_seqs(PeerCheckpoints1, ShardSyncHistory),
Workers = lists:filtermap(
fun(Shard) ->
#shard{range = Range, node = Node, name = ShardName} = Shard,
- case maps:find({Range, Node}, PeerCheckpoints1) of
+ case maps:find({Range, Node}, DropSeqs) of
{ok, {UuidPrefix, DropSeq}} ->
Ref = rexi:cast(
Node,
@@ -318,6 +319,24 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
ShardSyncHistory
).
+%% A peer checkpoint might refer to a range that has been split since
+%% it last updated. Find these cases and split the peer checkpoints too.
+substitute_splits(Shards, PeerCheckpoints) ->
+ maps:fold(
+ fun({[B1, E1], Node}, Value, Acc) ->
+ MatchingRanges = [
+ S#shard.range
+ || #shard{range = [B2, E2]} = S <- Shards,
+ Node == S#shard.node,
+ B2 >= B1 andalso E2 =< E1
+ ],
+ AsMap = maps:from_list([{{R, Node}, Value} || R <-
MatchingRanges]),
+ maps:merge_with(fun merge_peers/3, AsMap, Acc)
+ end,
+ #{},
+ PeerCheckpoints
+ ).
+
create_peer_checkpoint_doc_if_missing(
<<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
) when
@@ -652,4 +671,17 @@ fully_replicated_shards_only_test_() ->
)
].
+substitute_splits_test() ->
+ Range = [0, 10],
+ Subrange1 = [0, 5],
+ Subrange2 = [6, 10],
+ Node1 = '[email protected]',
+ Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
+ PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+
+ ?assertEqual(
+ #{{Subrange1, Node1} => {<<"uuid1">>, 12}, {Subrange2, Node1} =>
{<<"uuid1">>, 12}},
+ substitute_splits(Shards, PeerCheckpoints)
+ ).
+
-endif.