This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 494ea0b41 handle shard splits
494ea0b41 is described below
commit 494ea0b410a65fa8dea4ed5b8a2dc52d5d4446c1
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 9 10:45:48 2025 +0100
handle shard splits
had to make uuid matching optional for the specific case where a
peer checkpoint contains a pre-split range and the shard map is returned
with a split. we don't know the uuids of the post-split shards and looking
them up just to substitute them here is pointless.
---
src/couch/src/couch_bt_engine.erl | 15 ++++++---
src/fabric/src/fabric_drop_seq.erl | 64 +++++++++++++++++++++++++++++++++++---
2 files changed, 70 insertions(+), 9 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl
b/src/couch/src/couch_bt_engine.erl
index 3a365162d..df3d01d30 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -814,16 +814,21 @@ set_update_seq(#st{header = Header} = St, UpdateSeq) ->
needs_commit = true
}}.
-set_drop_seq(#st{header = Header} = St, ExpectedUuidPrefix, NewDropSeq) when
- is_binary(ExpectedUuidPrefix), is_integer(NewDropSeq), NewDropSeq > 0
-->
- CurrentDropSeq = get_drop_seq(St),
+set_drop_seq(#st{} = St, undefined, NewDropSeq) ->
+ set_drop_seq(St, NewDropSeq);
+set_drop_seq(#st{} = St, ExpectedUuidPrefix, NewDropSeq) when
is_binary(ExpectedUuidPrefix) ->
Uuid = get_uuid(St),
ActualUuidPrefix = binary:part(Uuid, 0, byte_size(ExpectedUuidPrefix)),
-
if
ExpectedUuidPrefix /= ActualUuidPrefix ->
{error, uuid_mismatch};
+ true ->
+ set_drop_seq(St, NewDropSeq)
+ end.
+
+set_drop_seq(#st{header = Header} = St, NewDropSeq) when
is_integer(NewDropSeq), NewDropSeq > 0 ->
+ CurrentDropSeq = get_drop_seq(St),
+ if
NewDropSeq < CurrentDropSeq ->
{error, {drop_seq_cant_decrease, CurrentDropSeq, NewDropSeq}};
true ->
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 5dd65884d..76fb6ae3f 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -33,14 +33,13 @@
go(DbName) ->
Shards0 = mem3:shards(DbName),
- {ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
+ {ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName),
{ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
- Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
- PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
+ {Shards1, DropSeqs} = go_int(Shards0, PeerCheckpoints, ShardSyncHistory),
Workers = lists:filtermap(
fun(Shard) ->
#shard{range = Range, node = Node, name = ShardName} = Shard,
- case maps:find({Range, Node}, PeerCheckpoints1) of
+ case maps:find({Range, Node}, DropSeqs) of
{ok, {UuidPrefix, DropSeq}} ->
Ref = rexi:cast(
Node,
@@ -82,6 +81,12 @@ go(DbName) ->
end
end.
+go_int(Shards, PeerCheckpoints, ShardSyncHistory) ->
+ {
+ fully_replicated_shards_only(Shards, ShardSyncHistory),
+ calculate_drop_seqs(substitute_splits(Shards, PeerCheckpoints),
ShardSyncHistory)
+ }.
+
-spec calculate_drop_seqs(peer_checkpoints(), shard_sync_history()) ->
peer_checkpoints().
calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory) ->
ShardSyncCheckpoints = latest_shard_sync_checkpoints(ShardSyncHistory),
@@ -259,6 +264,10 @@ parse_peer_checkpoint_docs_cb({row, Row},
PeerCheckpoints0) ->
parse_peer_checkpoint_docs_cb(_Else, Acc) ->
{ok, Acc}.
+merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
+ Uuid1 == undefined orelse Uuid2 == undefined, is_integer(Val1),
is_integer(Val2)
+->
+ {undefined, min(Val1, Val2)};
merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when is_integer(Val1),
is_integer(Val2) ->
PrefixLen = min(byte_size(Uuid1), byte_size(Uuid2)),
true = binary:longest_common_prefix([Uuid1, Uuid2]) == PrefixLen,
@@ -318,6 +327,25 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
ShardSyncHistory
).
+%% A peer checkpoint might refer to a range that has been split since
+%% it last updated. Find these cases and split the peer checkpoints too.
+substitute_splits(Shards, PeerCheckpoints) ->
+ maps:fold(
+ fun({[B1, E1], Node}, {_Uuid, Seq}, Acc) ->
+ MatchingRanges = [
+ S#shard.range
+ || #shard{range = [B2, E2]} = S <- Shards,
+ Node == S#shard.node,
+ B2 >= B1 andalso E2 =< E1
+ ],
+ %% we don't know the uuids of the split shards
+ AsMap = maps:from_list([{{R, Node}, {undefined, Seq}} || R <-
MatchingRanges]),
+ maps:merge_with(fun merge_peers/3, AsMap, Acc)
+ end,
+ #{},
+ PeerCheckpoints
+ ).
+
create_peer_checkpoint_doc_if_missing(
<<"shards/", _/binary>> = DbName, Subtype, Source, PeerId, UpdateSeq
) when
@@ -652,4 +680,32 @@ fully_replicated_shards_only_test_() ->
)
].
+substitute_splits_test() ->
+ Range = [0, 10],
+ Subrange1 = [0, 5],
+ Subrange2 = [6, 10],
+ Node1 = '[email protected]',
+ Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
+ PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+
+ ?assertEqual(
+ #{{Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} =>
{undefined, 12}},
+ substitute_splits(Shards, PeerCheckpoints)
+ ).
+
+go_int_test() ->
+ Range = [0, 10],
+ Subrange1 = [0, 5],
+ Subrange2 = [6, 10],
+ Node1 = '[email protected]',
+ Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
+ PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
+ ShardSyncHistory = #{},
+ ?assertEqual(
+ {Shards, #{
+ {Subrange1, Node1} => {undefined, 12}, {Subrange2, Node1} =>
{undefined, 12}
+ }},
+ go_int(Shards, PeerCheckpoints, ShardSyncHistory)
+ ).
+
-endif.