This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 3a552f20e fetch substitute uuid on demand only
3a552f20e is described below
commit 3a552f20e811ce0e155775698d731613cf33608d
Author: Robert Newson <[email protected]>
AuthorDate: Thu May 15 12:11:02 2025 +0100
fetch substitute uuid on demand only
---
src/fabric/src/fabric_drop_seq.erl | 128 ++++++++++++++++++++++---------------
1 file changed, 76 insertions(+), 52 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 739356115..1811fbb7d 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -7,8 +7,6 @@
-export([go/1]).
--compile(export_all).
-
-export([
create_peer_checkpoint_doc_if_missing/5,
update_peer_checkpoint_doc/5,
@@ -23,7 +21,7 @@
-type seq() :: non_neg_integer().
--type uuid_map() :: #{{range(), Node :: node()} => Uuid :: uuid()}.
+-type uuid_fetcher() :: fun((#shard{}) -> {ok, uuid()} | {error, term()}).
-type peer_checkpoints() :: #{{range(), Node :: node()} => {Uuid :: uuid(),
Seq :: seq()}}.
@@ -37,10 +35,11 @@
go(DbName) ->
Shards0 = mem3:shards(DbName),
- {ok, UuidMap} = get_uuids(Shards0),
{ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName),
{ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
- {Shards1, DropSeqs} = go_int(Shards0, UuidMap, PeerCheckpoints,
ShardSyncHistory),
+ {Shards1, DropSeqs} = go_int(
+ Shards0, fun uuid_fetcher_rpc/1, PeerCheckpoints, ShardSyncHistory
+ ),
Workers = lists:filtermap(
fun(Shard) ->
#shard{range = Range, node = Node, name = ShardName} = Shard,
@@ -86,10 +85,10 @@ go(DbName) ->
end
end.
-go_int(Shards0, UuidMap, PeerCheckpoints0, ShardSyncHistory) ->
+go_int(Shards0, UuidFetcher, PeerCheckpoints0, ShardSyncHistory) ->
Shards1 = fully_replicated_shards_only(Shards0, ShardSyncHistory),
PeerCheckpoints1 = crossref(PeerCheckpoints0, ShardSyncHistory),
- PeerCheckpoints2 = substitute_splits(Shards1, UuidMap, PeerCheckpoints1),
+ PeerCheckpoints2 = substitute_splits(Shards1, UuidFetcher,
PeerCheckpoints1),
DropSeqs = calculate_drop_seqs(PeerCheckpoints2, ShardSyncHistory),
{Shards1, DropSeqs}.
@@ -163,39 +162,41 @@ crossref(PeerCheckpoints0, ShardSyncHistory) ->
crossref(PeerCheckpoints1, ShardSyncHistory)
end.
--spec get_uuids(Shards :: [#shard{}]) -> uuid_map().
-get_uuids(Shards) ->
+-spec uuid_fetcher_rpc(#shard{}) -> uuid().
+uuid_fetcher_rpc(#shard{} = Shard) ->
Workers = fabric_util:submit_jobs(
- Shards, fabric_rpc, get_uuid, []
+ [Shard], fabric_rpc, get_uuid, []
),
- Acc0 = {#{}, length(Workers) - 1},
RexiMon = fabric_util:create_monitors(Workers),
try
- rexi_utils:recv(
- Workers,
- #shard.ref,
- fun handle_get_uuid_reply/3,
- Acc0,
- fabric_util:request_timeout(),
- infinity
- )
+ case
+ rexi_utils:recv(
+ Workers,
+ #shard.ref,
+ fun handle_get_uuid_reply/3,
+ nil,
+ fabric_util:request_timeout(),
+ infinity
+ )
+ of
+ {ok, Result} ->
+ Result;
+ {timeout, _State} ->
+ {error, timeout};
+ {error, Reason} ->
+ {error, Reason}
+ end
after
rexi_monitor:stop(RexiMon),
fabric_streams:cleanup(Workers)
end.
-handle_get_uuid_reply({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
- {ok, {Acc, Count - 1}};
-handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
- {ok, {Acc, Count - 1}};
-handle_get_uuid_reply(Uuid, Worker, {Acc0, Count}) when is_binary(Uuid) ->
- Acc1 = Acc0#{{Worker#shard.range, Worker#shard.node} => Uuid},
- if
- Count == 0 ->
- {stop, Acc1};
- true ->
- {ok, {Acc1, Count - 1}}
- end.
+handle_get_uuid_reply({rexi_DOWN, _, _, _}, _Worker, _Acc) ->
+ {stop, {error, rexi_DOWN}};
+handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker, _Acc) ->
+ {stop, {error, rexi_EXIT}};
+handle_get_uuid_reply(Uuid, _, _Acc) when is_binary(Uuid) ->
+ {stop, {ok, Uuid}}.
-spec get_all_shard_sync_docs(Shards :: [#shard{}]) -> shard_sync_history().
get_all_shard_sync_docs(Shards) ->
@@ -373,27 +374,36 @@ latest_shard_sync_checkpoints(ShardSyncHistory) ->
).
%% A shard may have been split since a peer saw it.
--spec substitute_splits([#shard{}], uuid_map(), peer_checkpoints()) ->
peer_checkpoints().
-substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
+-spec substitute_splits([#shard{}], uuid_fetcher(), peer_checkpoints()) ->
peer_checkpoints().
+substitute_splits(Shards, UuidFetcher, PeerCheckpoints) ->
maps:fold(
fun({[B1, E1], Node}, {Uuid, Seq}, Acc) ->
- MatchingRanges = [
- S#shard.range
+ ShardsInRange = [
+ S
|| #shard{range = [B2, E2]} = S <- Shards,
Node == S#shard.node,
B2 >= B1 andalso E2 =< E1
],
%% lookup uuid from map if substituted
- AsMap = maps:from_list([
- {{R, Node}, {
- if
- [B1, E1] == R -> Uuid;
- true -> maps:get({R, Node}, UuidMap)
+ AsMap = maps:from_list(
+ lists:filtermap(
+ fun(#shard{} = Shard) ->
+ Key = {Shard#shard.range, Shard#shard.node},
+ if
+ [B1, E1] == Shard#shard.range ->
+ {true, {Key, {Uuid, Seq}}};
+ true ->
+ case UuidFetcher(Shard) of
+ {ok, SubstUuid} ->
+ {true, {Key, {SubstUuid, Seq}}};
+ {error, _Reason} ->
+ false
+ end
+ end
end,
- Seq
- }}
- || R <- MatchingRanges
- ]),
+ ShardsInRange
+ )
+ ),
maps:merge_with(fun merge_peers/3, AsMap, Acc)
end,
#{},
@@ -740,12 +750,14 @@ substitute_splits_test() ->
Subrange2 = [6, 10],
Node1 = '[email protected]',
Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
- UuidMap = #{{Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} =>
<<"uuid3">>},
+ UuidFetcher = uuid_fetcher_from_map(#{
+ {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
+ }),
PeerCheckpoints = #{{Range, Node1} => {<<"uuid1">>, 12}},
?assertEqual(
#{{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} =>
{<<"uuid3">>, 12}},
- substitute_splits(Shards, UuidMap, PeerCheckpoints)
+ substitute_splits(Shards, UuidFetcher, PeerCheckpoints)
).
crossref_test_() ->
@@ -807,13 +819,15 @@ go_int_test_() ->
Subrange2 = [6, 10],
Node1 = '[email protected]',
Shards = [#shard{range = Subrange1, node = Node1}, #shard{range =
Subrange2, node = Node1}],
- UuidMap = #{{Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} =>
<<"uuid3">>},
+ UuidFetcher = uuid_fetcher_from_map(#{
+ {Subrange1, Node1} => <<"uuid2">>, {Subrange2, Node1} => <<"uuid3">>
+ }),
[
?_assertEqual(
{Shards, #{
{Subrange1, Node1} => {<<"uuid2">>, 12}, {Subrange2, Node1} =>
{<<"uuid3">>, 12}
}},
- go_int(Shards, UuidMap, #{{Range, Node1} => {<<"uuid1">>, 12}},
#{})
+ go_int(Shards, UuidFetcher, #{{Range, Node1} => {<<"uuid1">>,
12}}, #{})
),
?_assertEqual(
{Shards, #{
@@ -821,7 +835,7 @@ go_int_test_() ->
}},
go_int(
Shards,
- UuidMap,
+ UuidFetcher,
#{{Range, Node1} => {<<"uuid1">>, 12}, {Subrange1, Node1} =>
{<<"uuid2">>, 10}},
#{}
)
@@ -840,12 +854,12 @@ go_int2_test_() ->
#shard{range = Subrange1, node = Node2},
#shard{range = Subrange2, node = Node2}
],
- UuidMap = #{
+ UuidFetcher = uuid_fetcher_from_map(#{
{Subrange1, Node1} => <<"s1n1">>,
{Subrange2, Node1} => <<"s2n1">>,
{Subrange1, Node2} => <<"s1n2">>,
{Subrange2, Node2} => <<"s2n2">>
- },
+ }),
ShardSyncHistory =
#{
{Subrange1, Node1, Node2} => [
@@ -885,7 +899,7 @@ go_int2_test_() ->
2,
go_int(
Shards,
- UuidMap,
+ UuidFetcher,
#{{Range, Node1} => {<<"ignored">>, 12}},
ShardSyncHistory
)
@@ -893,4 +907,14 @@ go_int2_test_() ->
)
].
+uuid_fetcher_from_map(UuidMap) ->
+ fun(Shard) ->
+ case maps:find({Shard#shard.range, Shard#shard.node}, UuidMap) of
+ {ok, Value} ->
+ {ok, Value};
+ error ->
+ {error, not_found}
+ end
+ end.
+
-endif.