This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 7f8d1b567 uuidmap instead
7f8d1b567 is described below
commit 7f8d1b567f61a094e90571b2253f94eb5c01e670
Author: Robert Newson <[email protected]>
AuthorDate: Mon May 19 12:21:57 2025 +0100
uuidmap instead
---
src/fabric/src/fabric_drop_seq.erl | 117 +++++++++++++------------------------
1 file changed, 39 insertions(+), 78 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 5d0ea970c..928a1bdb3 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -23,7 +23,7 @@
-type seq() :: non_neg_integer().
--type uuid_fetcher() :: fun((#shard{}) -> {ok, uuid()} | {error, term()}).
+-type uuid_map() :: #{{Range :: range(), Node :: node()} => uuid()}.
-type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(),
Seq :: seq()}}.
@@ -37,11 +37,15 @@
go(DbName) ->
Shards0 = mem3:shards(DbName),
- #{peer_checkpoints := PeerCheckpoints, shard_sync_history :=
ShardSyncHistory} = gather_drop_seq_info(
+ #{
+ uuid_map := UuidMap,
+ peer_checkpoints := PeerCheckpoints,
+ shard_sync_history := ShardSyncHistory
+ } = gather_drop_seq_info(
Shards0
),
{Shards1, DropSeqs} = go_int(
- Shards0, fun uuid_fetcher_rpc/1, PeerCheckpoints, ShardSyncHistory
+ Shards0, UuidMap, PeerCheckpoints, ShardSyncHistory
),
Workers = lists:filtermap(
fun(Shard) ->
@@ -165,42 +169,6 @@ crossref(PeerCheckpoints0, ShardSyncHistory) ->
crossref(PeerCheckpoints1, ShardSyncHistory)
end.
--spec uuid_fetcher_rpc(#shard{}) -> uuid().
-uuid_fetcher_rpc(#shard{} = Shard) ->
- Workers = fabric_util:submit_jobs(
- [Shard], fabric_rpc, get_uuid, []
- ),
- RexiMon = fabric_util:create_monitors(Workers),
- try
- case
- rexi_utils:recv(
- Workers,
- #shard.ref,
- fun handle_get_uuid_reply/3,
- nil,
- fabric_util:request_timeout(),
- infinity
- )
- of
- {ok, Result} ->
- Result;
- {timeout, _State} ->
- {error, timeout};
- {error, Reason} ->
- {error, Reason}
- end
- after
- rexi_monitor:stop(RexiMon),
- fabric_streams:cleanup(Workers)
- end.
-
-handle_get_uuid_reply({rexi_DOWN, _, _, _}, _Worker, _Acc) ->
- {stop, {error, rexi_DOWN}};
-handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, _Acc) ->
- {stop, {error, rexi_EXIT}};
-handle_get_uuid_reply(Uuid, _, _Acc) when is_binary(Uuid) ->
- {stop, {ok, Uuid}}.
-
%% return only the shards that have synced with every other replica
fully_replicated_shards_only(Shards, ShardSyncHistory) ->
lists:filter(
@@ -221,7 +189,7 @@ gather_drop_seq_info([#shard{} | _] = Shards) ->
Shards, ?MODULE, gather_drop_seq_info_rpc, []
),
RexiMon = fabric_util:create_monitors(Workers),
- Acc0 = #{},
+ Acc0 = #{uuid_map => #{}, peer_checkpoints => #{}, shard_sync_history =>
#{}},
try
case
rexi_utils:recv(
@@ -330,19 +298,20 @@ gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count})
->
gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
{ok, {Acc, Count - 1}}.
-merge_info(#shard{} = _Shard, InfoA, InfoB) ->
- PeerCheckpointsA = maps:get(peer_checkpoints, InfoA, #{}),
- PeerCheckpointsB = maps:get(peer_checkpoints, InfoB, #{}),
- MergedPeerCheckpoints = maps:merge_with(
- fun merge_peers/3, PeerCheckpointsA, PeerCheckpointsB
- ),
- ShardSyncHistoryA = maps:get(shard_sync_history, InfoA, #{}),
- ShardSyncHistoryB = maps:get(shard_sync_history, InfoB, #{}),
- MergedShardSyncHistory = maps:merge(
- ShardSyncHistoryA, ShardSyncHistoryB
- ),
+merge_info(#shard{} = Shard, Info, Acc) ->
#{
- peer_checkpoints => MergedPeerCheckpoints, shard_sync_history =>
MergedShardSyncHistory
+ uuid_map =>
+ maps:put(
+ {Shard#shard.range, Shard#shard.node}, maps:get(uuid, Info),
maps:get(uuid_map, Acc)
+ ),
+ peer_checkpoints => maps:merge_with(
+ fun merge_peers/3,
+ maps:get(peer_checkpoints, Info),
+ maps:get(peer_checkpoints, Acc)
+ ),
+ shard_sync_history => maps:merge(
+ maps:get(shard_sync_history, Info), maps:get(shard_sync_history,
Acc)
+ )
}.
merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
@@ -388,8 +357,8 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
).
%% A shard may have been split since a peer saw it.
--spec substitute_splits([#shard{}], uuid_fetcher(), peer_checkpoints()) ->
peer_checkpoints().
-substitute_splits(Shards, UuidFetcher, PeerCheckpoints) ->
+-spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) ->
peer_checkpoints().
+substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
maps:fold(
fun({[B1, E1], Node}, {Uuid, Seq}, Acc) ->
ShardsInRange = [
@@ -407,10 +376,10 @@ substitute_splits(Shards, UuidFetcher, PeerCheckpoints) ->
[B1, E1] == Shard#shard.range ->
{true, {Key, {Uuid, Seq}}};
true ->
- case UuidFetcher(Shard) of
+ case maps:find(Key, UuidMap) of
{ok, SubstUuid} ->
{true, {Key, {SubstUuid, Seq}}};
- {error, _Reason} ->
+ error ->
false
end
end
@@ -764,14 +733,15 @@ substitute_splits_test() ->
Subrange2 = [6, 10],
Node1 = '[email protected]',
Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
- UuidFetcher = uuid_fetcher_from_map(#{
- {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
- }),
+ UuidMap = #{
+ {Subrange1, Node1} => <<"uuid2">>,
+ {Subrange2, Node1} => <<"uuid3">>
+ },
PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
?assertEqual(
#{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} =>
{<<"uuid3">>, 12}},
- substitute_splits(Shards, UuidFetcher, PeerCheckpoints)
+ substitute_splits(Shards, UuidMap, PeerCheckpoints)
).
crossref_test_() ->
@@ -833,15 +803,16 @@ go_int_test_() ->
Subrange2 = [6, 10],
Node1 = '[email protected]',
Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
- UuidFetcher = uuid_fetcher_from_map(#{
- {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
- }),
+ UuidMap = #{
+ {Subrange1, Node1} => <<"uuid2">>,
+ {Subrange2, Node1} => <<"uuid3">>
+ },
[
?_assertEqual(
{Shards, #{
{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} =>
{<<"uuid3">>, 12}
}},
- go_int(Shards, UuidFetcher, #{{Range, Node1} => {<<"uuid1">>,
12}}, #{})
+ go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}},
#{})
),
?_assertEqual(
{Shards, #{
@@ -849,7 +820,7 @@ go_int_test_() ->
}},
go_int(
Shards,
- UuidFetcher,
+ UuidMap,
#{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} =>
{<<"uuid2">>, 10}},
#{}
)
@@ -868,12 +839,12 @@ go_int2_test_() ->
#shard{range = Subrange1, node = Node2},
#shard{range = Subrange2, node = Node2}
],
- UuidFetcher = uuid_fetcher_from_map(#{
+ UuidMap = #{
{Subrange1, Node1} => <<"s1n1">>,
{Subrange2, Node1} => <<"s2n1">>,
{Subrange1, Node2} => <<"s1n2">>,
{Subrange2, Node2} => <<"s2n2">>
- }),
+ },
ShardSyncHistory =
#{
{Subrange1, Node1, Node2} => [
@@ -913,7 +884,7 @@ go_int2_test_() ->
2,
go_int(
Shards,
- UuidFetcher,
+ UuidMap,
#{{Range, Node1} => {<<"ignored">>, 12}},
ShardSyncHistory
)
@@ -921,14 +892,4 @@ go_int2_test_() ->
)
].
-uuid_fetcher_from_map(UuidMap) ->
- fun(Shard) ->
- case maps:find({Shard#shard.range, Shard#shard.node}, UuidMap) of
- {ok, Value} ->
- {ok, Value};
- error ->
- {error, not_found}
- end
- end.
-
-endif.