This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 3ba534da7 only set_drop_seq on a shard if we've seen sync from all
peers
3ba534da7 is described below
commit 3ba534da73455ebcb5c14a66b9b64010f5323666
Author: Robert Newson <[email protected]>
AuthorDate: Thu May 8 15:50:22 2025 +0100
only set_drop_seq on a shard if we've seen sync from all peers
---
src/fabric/src/fabric_drop_seq.erl | 69 +++++++++++++++++++++++++++++++++++---
1 file changed, 65 insertions(+), 4 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 2d10960fa..5dd65884d 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -32,9 +32,10 @@
}.
go(DbName) ->
- Shards = mem3:shards(DbName),
+ Shards0 = mem3:shards(DbName),
{ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
- {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards),
+ {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
+ Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
Workers = lists:filtermap(
fun(Shard) ->
@@ -50,14 +51,14 @@ go(DbName) ->
false
end
end,
- Shards
+ Shards1
),
if
Workers == [] ->
%% nothing to do
{ok, #{}};
true ->
- RexiMon = fabric_util:create_monitors(Shards),
+ RexiMon = fabric_util:create_monitors(Shards1),
Acc0 = {#{}, length(Workers) - 1},
try
case fabric_util:recv(Workers, #shard.ref, fun
handle_set_drop_seq_reply/3, Acc0) of
@@ -217,6 +218,20 @@ parse_shard_sync_doc(#doc{id = <<"_local/shard-sync-",
_/binary>>} = Doc, Acc) -
)
end.
+%% return only the shards that have synced with every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+ lists:filter(
+ fun(#shard{range = Range, node = Node}) ->
+ ExpectedPeers = [
+ S#shard.node
+ || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+ ],
+ ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+ lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end,
ExpectedKeys)
+ end,
+ Shards
+ ).
+
-spec get_peer_checkpoint_docs(DbName :: binary()) -> peer_checkpoints().
get_peer_checkpoint_docs(DbName) ->
fabric:all_docs(
@@ -591,4 +606,50 @@ search_history_for_latest_safe_crossover_test() ->
calculate_drop_seqs(PeerCheckpoints, ShardSyncHistory)
).
+fully_replicated_shards_only_test_() ->
+ Range1 = [0, 1],
+ Range2 = [1, 2],
+ Shards = [
+ #shard{node = node1, range = Range1},
+ #shard{node = node2, range = Range1},
+ #shard{node = node3, range = Range1},
+ #shard{node = node1, range = Range2},
+ #shard{node = node2, range = Range2}
+ ],
+ [
+ %% empty history means no fully replicated shards
+ ?_assertEqual([], fully_replicated_shards_only(Shards, #{})),
+ %% some but not all peers
+ ?_assertEqual(
+ [],
+ fully_replicated_shards_only(Shards, #{
+ {Range1, node2, node1} => {0, <<>>}
+ })
+ ),
+ %% all peers of one replica
+ ?_assertEqual(
+ [#shard{node = node1, range = Range1}],
+ fully_replicated_shards_only(Shards, #{
+ {Range1, node2, node1} => {0, <<>>},
+ {Range1, node3, node1} => {0, <<>>}
+ })
+ ),
+ %% all peers of one range
+ ?_assertEqual(
+ [
+ #shard{node = node1, range = Range1},
+ #shard{node = node2, range = Range1},
+ #shard{node = node3, range = Range1}
+ ],
+ fully_replicated_shards_only(Shards, #{
+ {Range1, node2, node1} => {0, <<>>},
+ {Range1, node3, node1} => {0, <<>>},
+ {Range1, node1, node2} => {0, <<>>},
+ {Range1, node3, node2} => {0, <<>>},
+ {Range1, node1, node3} => {0, <<>>},
+ {Range1, node2, node3} => {0, <<>>}
+ })
+ )
+ ].
+
-endif.