This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch auto-delete-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push:
new 2aacb2236 fix streaming
2aacb2236 is described below
commit 2aacb2236cb54774185a312ab6a2dbed3a6e1765
Author: Robert Newson <[email protected]>
AuthorDate: Thu May 8 12:20:59 2025 +0100
fix streaming
---
src/fabric/src/fabric_drop_seq.erl | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/src/fabric/src/fabric_drop_seq.erl
b/src/fabric/src/fabric_drop_seq.erl
index 33c9096d1..2d10960fa 100644
--- a/src/fabric/src/fabric_drop_seq.erl
+++ b/src/fabric/src/fabric_drop_seq.erl
@@ -32,10 +32,10 @@
}.
go(DbName) ->
+ Shards = mem3:shards(DbName),
{ok, PeerCheckpoints0} = get_peer_checkpoint_docs(DbName),
- {ok, ShardSyncHistory} = get_all_shard_sync_docs(DbName),
+ {ok, ShardSyncHistory} = get_all_shard_sync_docs(Shards),
PeerCheckpoints1 = calculate_drop_seqs(PeerCheckpoints0, ShardSyncHistory),
- Shards = mem3:live_shards(DbName, [node() | nodes()]),
Workers = lists:filtermap(
fun(Shard) ->
#shard{range = Range, node = Node, name = ShardName} = Shard,
@@ -147,9 +147,8 @@ crossref(PeerCheckpoints0, ShardSyncHistory) ->
crossref(PeerCheckpoints1, ShardSyncHistory)
end.
--spec get_all_shard_sync_docs(DbName :: binary()) -> shard_sync_history().
-get_all_shard_sync_docs(DbName) ->
- Shards = mem3:shards(DbName),
+-spec get_all_shard_sync_docs(Shards :: [#shard{}]) -> shard_sync_history().
+get_all_shard_sync_docs(Shards) ->
Workers = fabric_util:submit_jobs(
Shards, fabric_rpc, all_docs, [[], shard_sync_docs_mrargs()]
),
@@ -179,11 +178,14 @@ handle_shard_sync_docs_reply({rexi_EXIT, _Reason},
_Worker, {ShardSyncHistory, C
handle_shard_sync_docs_reply(rexi_STREAM_INIT, {_Worker, From}, Acc) ->
rexi:stream_start(From),
{ok, Acc};
-handle_shard_sync_docs_reply({meta, _Meta}, _Worker, Acc) ->
+handle_shard_sync_docs_reply({meta, _Meta}, {_Worker, From}, Acc) ->
+ rexi:stream_ack(From),
{ok, Acc};
-handle_shard_sync_docs_reply(#view_row{} = Row, _Worker, {ShardSyncHistory,
Count}) ->
+handle_shard_sync_docs_reply(#view_row{} = Row, {_Worker, From},
{ShardSyncHistory, Count}) ->
Doc = couch_doc:from_json_obj(Row#view_row.doc),
- {ok, {parse_shard_sync_doc(Doc, ShardSyncHistory), Count}};
+ Result = parse_shard_sync_doc(Doc, ShardSyncHistory),
+ rexi:stream_ack(From),
+ {ok, {Result, Count}};
handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, 0}) ->
{stop, ShardSyncHistory};
handle_shard_sync_docs_reply(complete, _Worker, {ShardSyncHistory, Count}) ->