This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 35e35193a push some calculation to the worker side
35e35193a is described below
commit 35e35193a85c355e617362e4d48eca40ac69c5c0
Author: Robert Newson <[email protected]>
AuthorDate: Fri May 16 14:44:52 2025 +0100
push some calculation to the worker side
---
src/fabric/src/fabric_drop_seq.erl | 224 ++++++++++++++++++++-----------------
1 file changed, 119 insertions(+), 105 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 1811fbb7d..5d0ea970c 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -3,7 +3,6 @@
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
--include_lib("fabric/include/fabric.hrl").
-export([go/1]).
@@ -15,6 +14,9 @@
peer_id_from_sig/2
]).
+%% rpc
+-export([gather_drop_seq_info_rpc/1]).
+
-type range() :: [non_neg_integer()].
-type uuid() :: binary().
@@ -35,8 +37,9 @@
go(DbName) ->
Shards0 = mem3:shards(DbName),
- {ok, PeerCheckpoints} = get_peer_checkpoint_docs(DbName),
- {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards0),
+ #{peer_checkpoints := PeerCheckpoints, shard_sync_history :=
ShardSyncHistory} = gather_drop_seq_info(
+ Shards0
+ ),
{Shards1, DropSeqs} = go_int(
Shards0, fun uuid_fetcher_rpc/1, PeerCheckpoints, ShardSyncHistory
),
@@ -198,56 +201,96 @@ handle_get_uuid_reply({rexi_EXIT, _Reason}, _Worker,
_Acc) ->
handle_get_uuid_reply(Uuid, _, _Acc) when is_binary(Uuid) ->
{stop, {ok, Uuid}}.
--spec get_all_shard_sync_docs(Shards :: [#shard{}]) -> shard_sync_history().
-get_all_shard_sync_docs(Shards) ->
+%% return only the shards that have synced with every other replica
+fully_replicated_shards_only(Shards, ShardSyncHistory) ->
+ lists:filter(
+ fun(#shard{range = Range, node = Node}) ->
+ ExpectedPeers = [
+ S#shard.node
+ || S <- Shards, S#shard.range == Range, S#shard.node /= Node
+ ],
+ ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
+ lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end,
ExpectedKeys)
+ end,
+ Shards
+ ).
+
+-spec gather_drop_seq_info(Shards :: [#shard{}]) -> {peer_checkpoints(),
shard_sync_history()}.
+gather_drop_seq_info([#shard{} | _] = Shards) ->
Workers = fabric_util:submit_jobs(
- Shards, fabric_rpc, all_docs, [[], shard_sync_docs_mrargs()]
+ Shards, ?MODULE, gather_drop_seq_info_rpc, []
),
- Acc0 = {#{}, length(Workers) - 1},
RexiMon = fabric_util:create_monitors(Workers),
+ Acc0 = #{},
try
- rexi_utils:recv(
- Workers,
- #shard.ref,
- fun handle_shard_sync_docs_reply/3,
- Acc0,
- fabric_util:request_timeout(),
- infinity
- )
+ case
+ rexi_utils:recv(
+ Workers,
+ #shard.ref,
+ fun gather_drop_seq_info_cb/3,
+ {Acc0, length(Workers) - 1},
+ fabric_util:request_timeout(),
+ infinity
+ )
+ of
+ {ok, Result} ->
+ Result;
+ {timeout, _State} ->
+ {error, timeout};
+ {error, Reason} ->
+ {error, Reason}
+ end
after
rexi_monitor:stop(RexiMon),
fabric_streams:cleanup(Workers)
end.
-%% consult every copy of every range for shard sync information but ignore
failures (otherwise
-%% this only works when all nodes are up). We'll only update drop seq for a
shard if we have
-%% seen all other copies have synced to it.
-handle_shard_sync_docs_reply({rexi_DOWN, _, _, _}, _Worker, {ShardSyncHistory,
Count}) ->
- {ok, {ShardSyncHistory, Count - 1}};
-handle_shard_sync_docs_reply({rexi_EXIT, _Reason}, _Worker, {ShardSyncHistory,
Count}) ->
- {ok, {ShardSyncHistory, Count - 1}};
-handle_shard_sync_docs_reply(rexi_STREAM_INIT, {_Worker, From}, Acc) ->
- rexi:stream_start(From),
- {ok, Acc};
-handle_shard_sync_docs_reply({meta, _Meta}, {_Worker, From}, Acc) ->
- rexi:stream_ack(From),
- {ok, Acc};
-handle_shard_sync_docs_reply(#view_row{} = Row, {_Worker, From},
{ShardSyncHistory, Count}) ->
- Doc = couch_doc:from_json_obj(Row#view_row.doc),
- Result = parse_shard_sync_doc(Doc, ShardSyncHistory),
- rexi:stream_ack(From),
- {ok, {Result, Count}};
-handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, 0}) ->
- {stop, ShardSyncHistory};
-handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, Count}) ->
- {ok, {ShardSyncHistory, Count - 1}}.
-
-parse_shard_sync_doc(#doc{id = <<"_local/shard-sync-", _/binary>>} = Doc, Acc)
->
+gather_drop_seq_info_rpc(DbName) ->
+ case couch_db:open_int(DbName, []) of
+ {ok, Db} ->
+ try
+ Uuid = couch_db:get_uuid(Db),
+ Acc0 = {#{}, #{}},
+ {ok, {PeerCheckpoints, ShardSyncHistory}} =
couch_db:fold_local_docs(
+ Db, fun gather_drop_seq_info_fun/2, Acc0, []
+ ),
+ rexi:reply(
+ {ok, #{
+ uuid => Uuid,
+ peer_checkpoints => PeerCheckpoints,
+ shard_sync_history => ShardSyncHistory
+ }}
+ )
+ after
+ couch_db:close(Db)
+ end;
+ Else ->
+ rexi:reply(Else)
+ end.
+
+gather_drop_seq_info_fun(
+ #doc{id = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", _/binary>>} = Doc,
+ {PeerCheckpoints0, ShardSyncHistory} = Acc
+) ->
+ {Props} = Doc#doc.body,
+ case couch_util:get_value(<<"update_seq">>, Props) of
+ undefined ->
+ {ok, Acc};
+ UpdateSeq ->
+ PeerCheckpoints1 = maps:merge_with(
+ fun merge_peers/3, decode_seq(UpdateSeq), PeerCheckpoints0
+ ),
+ {ok, {PeerCheckpoints1, ShardSyncHistory}}
+ end;
+gather_drop_seq_info_fun(
+ #doc{id = <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>>} = Doc,
+ {PeerCheckpoints, ShardSyncHistory0} = Acc
+) ->
{Props} = Doc#doc.body,
case couch_util:get_value(<<"dbname">>, Props) of
undefined ->
%% not yet upgraded with new property
- Acc;
+ {ok, Acc};
DbName ->
Range = mem3:range(DbName),
{[{_SrcNode, History}]} = couch_util:get_value(<<"history">>,
Props),
@@ -263,52 +306,45 @@ parse_shard_sync_doc(#doc{id = <<"_local/shard-sync-",
_/binary>>} = Doc, Acc) -
couch_util:get_value(<<"target_seq">>, Item)
}
end,
- maps:merge(
- maps:groups_from_list(KeyFun, ValueFun, History), Acc
- )
- end.
-
-%% return only the shards that have synced with every other replica
-fully_replicated_shards_only(Shards, ShardSyncHistory) ->
- lists:filter(
- fun(#shard{range = Range, node = Node}) ->
- ExpectedPeers = [
- S#shard.node
- || S <- Shards, S#shard.range == Range, S#shard.node /= Node
- ],
- ExpectedKeys = [{Range, Peer, Node} || Peer <- ExpectedPeers],
- lists:all(fun(Key) -> maps:is_key(Key, ShardSyncHistory) end,
ExpectedKeys)
- end,
- Shards
- ).
-
--spec get_peer_checkpoint_docs(DbName :: binary()) -> peer_checkpoints().
-get_peer_checkpoint_docs(DbName) ->
- fabric:all_docs(
- DbName, fun parse_peer_checkpoint_docs_cb/2, #{},
peer_checkpoint_docs_mrargs()
- ).
-
-parse_peer_checkpoint_docs_cb({row, Row}, PeerCheckpoints0) ->
- case lists:keyfind(doc, 1, Row) of
- false ->
- {ok, PeerCheckpoints0};
- {doc, Doc0} ->
- #doc{id = <<"_local/peer-checkpoint-", _/binary>>} =
- Doc1 = couch_doc:from_json_obj(Doc0),
- {Props} = Doc1#doc.body,
- case couch_util:get_value(<<"update_seq">>, Props) of
- undefined ->
- {ok, PeerCheckpoints0};
- UpdateSeq ->
- {ok,
- maps:merge_with(
- fun merge_peers/3, decode_seq(UpdateSeq),
PeerCheckpoints0
- )}
- end
+ ShardSyncHistory1 = maps:merge(
+ maps:groups_from_list(KeyFun, ValueFun, History),
ShardSyncHistory0
+ ),
+ {ok, {PeerCheckpoints, ShardSyncHistory1}}
end;
-parse_peer_checkpoint_docs_cb(_Else, Acc) ->
+gather_drop_seq_info_fun(#doc{}, Acc) ->
+ %% ignored
{ok, Acc}.
+gather_drop_seq_info_cb({rexi_DOWN, _, _, _}, _Worker, {Acc, Count}) ->
+ {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({rexi_EXIT, _Reason}, _Worker, {Acc, Count}) ->
+ {ok, {Acc, Count - 1}};
+gather_drop_seq_info_cb({ok, Info}, Worker, {Acc, Count}) ->
+ MergedInfo = merge_info(Worker, Info, Acc),
+ if
+ Count == 0 ->
+ {stop, MergedInfo};
+ true ->
+ {ok, {MergedInfo, Count - 1}}
+ end;
+gather_drop_seq_info_cb(_Error, _Worker, {Acc, Count}) ->
+ {ok, {Acc, Count - 1}}.
+
+merge_info(#shard{} = _Shard, InfoA, InfoB) ->
+ PeerCheckpointsA = maps:get(peer_checkpoints, InfoA, #{}),
+ PeerCheckpointsB = maps:get(peer_checkpoints, InfoB, #{}),
+ MergedPeerCheckpoints = maps:merge_with(
+ fun merge_peers/3, PeerCheckpointsA, PeerCheckpointsB
+ ),
+ ShardSyncHistoryA = maps:get(shard_sync_history, InfoA, #{}),
+ ShardSyncHistoryB = maps:get(shard_sync_history, InfoB, #{}),
+ MergedShardSyncHistory = maps:merge(
+ ShardSyncHistoryA, ShardSyncHistoryB
+ ),
+ #{
+ peer_checkpoints => MergedPeerCheckpoints, shard_sync_history =>
MergedShardSyncHistory
+ }.
+
merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
->
@@ -341,28 +377,6 @@ decode_seq(OpaqueSeq) ->
Decoded
).
-all_docs_mrargs() ->
- #mrargs{
- view_type = map,
- include_docs = true,
- extra = [
- {include_system, true},
- {namespace, <<"_local">>}
- ]
- }.
-
-peer_checkpoint_docs_mrargs() ->
- (all_docs_mrargs())#mrargs{
- start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-">>,
- end_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint.">>
- }.
-
-shard_sync_docs_mrargs() ->
- (all_docs_mrargs())#mrargs{
- start_key = <<?LOCAL_DOC_PREFIX, "shard-sync-">>,
- end_key = <<?LOCAL_DOC_PREFIX, "shard-sync.">>
- }.
-
latest_shard_sync_checkpoints(ShardSyncHistory) ->
maps:fold(
fun({R, SN, _TN}, History, Acc) ->