This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 0dc4008b3 Improve index cleanup
0dc4008b3 is described below
commit 0dc4008b3a84c2239c97639baf43096d4631a291
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Oct 23 02:43:02 2025 -0400
Improve index cleanup
Fix these issues:
* Index cleanup triggered by smoosh only cleaned view indexes and purge
checkpoints, make sure to also clean nouveau and search indexes and
checkpoints.
* `/_search_cleanup` endpoint only cleaned indexes on the coordinator
node,
despite being a clustered fabric endpoint. Fix it so it cleans indexes on
other nodes as well as most users would expect.
* Nouveau cleanup didn't clean purge checkpoints, so make it do so. Use
the
mrview purge checkpoint strategy for all indexes. This improves dreyfus
logic
to avoid traversing internal disk paths from clouseau.
* Make `_view_cleanup` clean all the index types. This is a bit dirty,
but it
may be better than adding another cleanup `_index_cleanup` API. Left
`_search_cleanup` and `_nouveau_cleanup` APIs as is for now.
Some optimizations:
* For each index clean request fetch ddocs once. Calculate signatures and
then call the remote node cleanup logic with them. This avoids fetching
design documents multiple times or sending all of them to the worker
nodes.
This is what Nouveau is doing so stick with that nice pattern.
* Use erpc for remote calls. Our Erlang version is high enough (25+) to
use
the multple requests pattern from erpc. This is more compact than rexi.
The
absolute timeout pattern makes it simpler to have a global timeout for the
whole request.
Cleanups:
* Make purge checkpoint fetching and cleanup more uniform. Use common
utility
logic in `couch_index_util` for all indexes.
* Make index cleanup similar between all three indexes. Use the same erpc
pattern for all of them.
* Move index cleanup functions from fabric.erl to its own module --
fabric_index_cleanup.erl
* Rename fabric function names more uniform and clearly indicate if it
cleans
indexes on all nodes or just the current node.
* Add the db name to dreyfus `#index{}` record when it initializes so it
matches Nouveau.
---
src/couch_index/src/couch_index_util.erl | 47 ++++++++++
src/couch_mrview/src/couch_mrview_cleanup.erl | 51 +++++-----
src/couch_mrview/src/couch_mrview_util.erl | 38 ++++----
src/dreyfus/src/clouseau_rpc.erl | 11 ++-
src/dreyfus/src/dreyfus_fabric_cleanup.erl | 129 ++++++++------------------
src/dreyfus/src/dreyfus_index.erl | 11 ++-
src/dreyfus/src/dreyfus_rpc.erl | 6 +-
src/dreyfus/src/dreyfus_util.erl | 32 ++++++-
src/dreyfus/test/eunit/dreyfus_purge_test.erl | 6 +-
src/fabric/src/fabric.erl | 58 +++---------
src/fabric/src/fabric_index_cleanup.erl | 81 ++++++++++++++++
src/fabric/src/fabric_util.erl | 24 +++++
src/fabric/test/eunit/fabric_tests.erl | 21 +++--
src/ken/src/ken_server.erl | 2 +-
src/mem3/src/mem3_reshard_index.erl | 2 +-
src/mem3/test/eunit/mem3_reshard_test.erl | 2 +-
src/nouveau/src/nouveau_fabric_cleanup.erl | 80 +++++++++-------
src/nouveau/src/nouveau_rpc.erl | 7 +-
src/nouveau/src/nouveau_util.erl | 13 +++
src/smoosh/src/smoosh_channel.erl | 2 +-
src/smoosh/test/smoosh_tests.erl | 2 +-
21 files changed, 369 insertions(+), 256 deletions(-)
diff --git a/src/couch_index/src/couch_index_util.erl
b/src/couch_index/src/couch_index_util.erl
index db8aad470..9a16d06d6 100644
--- a/src/couch_index/src/couch_index_util.erl
+++ b/src/couch_index/src/couch_index_util.erl
@@ -14,6 +14,7 @@
-export([root_dir/0, index_dir/2, index_file/3]).
-export([load_doc/3, sort_lib/1, hexsig/1]).
+-export([get_purge_checkpoints/2, cleanup_purges/3]).
-include_lib("couch/include/couch_db.hrl").
@@ -72,3 +73,49 @@ sort_lib([{LName, LCode} | Rest], LAcc) ->
hexsig(Sig) ->
couch_util:to_hex(Sig).
+
+% Helper function for indexes to get their purge checkpoints as signatures.
+%
+get_purge_checkpoints(DbName, Type) when is_binary(DbName), is_binary(Type) ->
+ couch_util:with_db(DbName, fun(Db) -> get_purge_checkpoints(Db, Type) end);
+get_purge_checkpoints(Db, Type) when is_binary(Type) ->
+ Prefix = <<?LOCAL_DOC_PREFIX, "purge-", Type:(byte_size(Type))/binary,
"-">>,
+ PrefixSize = byte_size(Prefix),
+ FoldFun = fun(#doc{id = Id}, Acc) ->
+ case Id of
+ <<Prefix:PrefixSize/binary, Sig/binary>> -> {ok, Acc#{Sig => Id}};
+ _ -> {stop, Acc}
+ end
+ end,
+ Opts = [{start_key, Prefix}],
+ {ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts),
+ Signatures.
+
+% Helper functions for indexes to clean their purge checkpoints.
+%
+cleanup_purges(DbName, #{} = Sigs, #{} = Checkpoints) when is_binary(DbName) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ cleanup_purges(Db, Sigs, Checkpoints)
+ end);
+cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) ->
+ InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap),
+ InactiveCheckpoints = maps:values(InactiveMap),
+ DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
+ lists:foreach(DeleteFun, InactiveCheckpoints).
+
+delete_checkpoint(Db, DocId) ->
+ DbName = couch_db:name(Db),
+ LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
+ couch_log:debug(LogMsg, [?MODULE, DbName, DocId]),
+ try couch_db:open_doc(Db, DocId, []) of
+ {ok, Doc = #doc{}} ->
+ Deleted = Doc#doc{deleted = true, body = {[]}},
+ couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
+ {not_found, _} ->
+ ok
+ catch
+ Tag:Error ->
+ ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
+ couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
+ ok
+ end.
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl
b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 5b5afbdce..e8a2833a7 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -14,12 +14,9 @@
-export([
run/1,
- cleanup_purges/3,
- cleanup_indices/2
+ cleanup/2
]).
--include_lib("couch/include/couch_db.hrl").
-
run(Db) ->
Indices = couch_mrview_util:get_index_files(Db),
Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
@@ -28,15 +25,26 @@ run(Db) ->
ok = cleanup_purges(Db1, Sigs, Checkpoints),
ok = cleanup_indices(Sigs, Indices).
-cleanup_purges(DbName, Sigs, Checkpoints) when is_binary(DbName) ->
- couch_util:with_db(DbName, fun(Db) ->
- cleanup_purges(Db, Sigs, Checkpoints)
- end);
-cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) ->
- InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap),
- InactiveCheckpoints = maps:values(InactiveMap),
- DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
- lists:foreach(DeleteFun, InactiveCheckpoints).
+% erpc endpoint for fabric_index_cleanup:cleanup_indexes/2
+%
+cleanup(Dbs, #{} = Sigs) ->
+ try
+ lists:foreach(
+ fun(Db) ->
+ Indices = couch_mrview_util:get_index_files(Db),
+ Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
+ ok = cleanup_purges(Db, Sigs, Checkpoints),
+ ok = cleanup_indices(Sigs, Indices)
+ end,
+ Dbs
+ )
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
+
+cleanup_purges(Db, Sigs, Checkpoints) ->
+ couch_index_util:cleanup_purges(Db, Sigs, Checkpoints).
cleanup_indices(#{} = Sigs, #{} = IndexMap) ->
Fun = fun(_, Files) -> lists:foreach(fun delete_file/1, Files) end,
@@ -54,20 +62,3 @@ delete_file(File) ->
couch_log:error(ErrLog, [?MODULE, File, Tag, Error]),
ok
end.
-
-delete_checkpoint(Db, DocId) ->
- DbName = couch_db:name(Db),
- LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
- couch_log:debug(LogMsg, [?MODULE, DbName, DocId]),
- try couch_db:open_doc(Db, DocId, []) of
- {ok, Doc = #doc{}} ->
- Deleted = Doc#doc{deleted = true, body = {[]}},
- couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
- {not_found, _} ->
- ok
- catch
- Tag:Error ->
- ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
- couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
- ok
- end.
diff --git a/src/couch_mrview/src/couch_mrview_util.erl
b/src/couch_mrview/src/couch_mrview_util.erl
index 7149fc1a5..5405e8db8 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -16,6 +16,7 @@
-export([get_local_purge_doc_id/1, get_value_from_options/2]).
-export([verify_view_filename/1, get_signature_from_filename/1]).
-export([get_signatures/1, get_purge_checkpoints/1, get_index_files/1]).
+-export([get_signatures_from_ddocs/2]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
@@ -94,40 +95,35 @@ get_signatures(DbName) when is_binary(DbName) ->
couch_util:with_db(DbName, fun get_signatures/1);
get_signatures(Db) ->
DbName = couch_db:name(Db),
- % get_design_docs/1 returns ejson for clustered shards, and
- % #full_doc_info{}'s for other cases.
{ok, DDocs} = couch_db:get_design_docs(Db),
+ % get_design_docs/1 returns ejson for clustered shards, and
+ % #full_doc_info{}'s for other cases. Both are transformed to #doc{}
records
FoldFun = fun
({[_ | _]} = EJsonDoc, Acc) ->
Doc = couch_doc:from_json_obj(EJsonDoc),
- {ok, Mrst} = ddoc_to_mrst(DbName, Doc),
- Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
- Acc#{Sig => true};
+ [Doc | Acc];
(#full_doc_info{} = FDI, Acc) ->
{ok, Doc} = couch_db:open_doc_int(Db, FDI, [ejson_body]),
- {ok, Mrst} = ddoc_to_mrst(DbName, Doc),
- Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
- Acc#{Sig => true}
+ [Doc | Acc]
+ end,
+ DDocs1 = lists:foldl(FoldFun, [], DDocs),
+ get_signatures_from_ddocs(DbName, DDocs1).
+
+% From a list of design #doc{} records returns signatures map: #{Sig => true}
+%
+get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) ->
+ FoldFun = fun(#doc{} = Doc, Acc) ->
+ {ok, Mrst} = ddoc_to_mrst(DbName, Doc),
+ Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
+ Acc#{Sig => true}
end,
lists:foldl(FoldFun, #{}, DDocs).
% Returns a map of `Sig => DocId` elements for all the purge view
% checkpoint docs. Sig is a hex-encoded binary.
%
-get_purge_checkpoints(DbName) when is_binary(DbName) ->
- couch_util:with_db(DbName, fun get_purge_checkpoints/1);
get_purge_checkpoints(Db) ->
- FoldFun = fun(#doc{id = Id}, Acc) ->
- case Id of
- <<?LOCAL_DOC_PREFIX, "purge-mrview-", Sig/binary>> ->
- {ok, Acc#{Sig => Id}};
- _ ->
- {stop, Acc}
- end
- end,
- Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mrview-">>}],
- {ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts),
- Signatures.
+ couch_index_util:get_purge_checkpoints(Db, <<"mrview">>).
% Returns a map of `Sig => [FilePath, ...]` elements. Sig is a hex-encoded
% binary and FilePaths are lists as they intended to be passed to couch_file
diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl
index 921746a7a..c036a5a9a 100644
--- a/src/dreyfus/src/clouseau_rpc.erl
+++ b/src/dreyfus/src/clouseau_rpc.erl
@@ -262,10 +262,17 @@ rename(DbName) ->
%% and an analyzer represented in a Javascript function in a design document.
%% `Sig` is used to check if an index description is changed,
%% and the index needs to be reconstructed.
--spec cleanup(DbName :: string_as_binary(_), ActiveSigs :: [sig()]) ->
+-spec cleanup(DbName :: string_as_binary(_), SigList :: list() | SigMap ::
#{sig() => true}) ->
ok.
-cleanup(DbName, ActiveSigs) ->
+% Compatibility clause to help when running search index cleanup during
+% a mixed cluster state. Remove after version 3.6
+%
+cleanup(DbName, SigList) when is_list(SigList) ->
+ SigMap = #{Sig => true || Sig <- SigList},
+ cleanup(DbName, SigMap);
+cleanup(DbName, #{} = SigMap) ->
+ ActiveSigs = maps:keys(SigMap),
gen_server:cast({cleanup, clouseau()}, {cleanup, DbName, ActiveSigs}).
%% a binary with value <<"tokens">>
diff --git a/src/dreyfus/src/dreyfus_fabric_cleanup.erl
b/src/dreyfus/src/dreyfus_fabric_cleanup.erl
index 86960812d..0488211be 100644
--- a/src/dreyfus/src/dreyfus_fabric_cleanup.erl
+++ b/src/dreyfus/src/dreyfus_fabric_cleanup.erl
@@ -14,99 +14,50 @@
-module(dreyfus_fabric_cleanup).
--include("dreyfus.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--export([go/1]).
+-export([go/1, go_local/3]).
go(DbName) ->
- DesignDocs =
- case fabric:design_docs(DbName) of
- {ok, DDocs} when is_list(DDocs) ->
- DDocs;
- Else ->
- couch_log:debug("Invalid design docs: ~p~n", [Else]),
- []
- end,
- ActiveSigs = lists:usort(
- lists:flatmap(
- fun active_sigs/1,
- [couch_doc:from_json_obj(DD) || DD <- DesignDocs]
- )
- ),
- cleanup_local_purge_doc(DbName, ActiveSigs),
- clouseau_rpc:cleanup(DbName, ActiveSigs),
- ok.
+ case fabric_util:get_design_doc_records(DbName) of
+ {ok, DDocs} when is_list(DDocs) ->
+ Sigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs),
+ Shards = mem3:shards(DbName),
+ ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1,
Shards),
+ Fun = fun(Node, Dbs, Acc) ->
+ erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs,
Sigs], Node, Acc)
+ end,
+ Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode),
+ recv(DbName, Reqs, fabric_util:abs_request_timeout());
+ Error ->
+ couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE,
DbName, Error]),
+ Error
+ end.
-active_sigs(#doc{body = {Fields}} = Doc) ->
+% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2
+%
+go_local(DbName, Dbs, #{} = Sigs) ->
try
- {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
- {IndexNames, _} = lists:unzip(RawIndexes),
- [
- begin
- {ok, Index} = dreyfus_index:design_doc_to_index(Doc,
IndexName),
- Index#index.sig
- end
- || IndexName <- IndexNames
- ]
+ lists:foreach(
+ fun(Db) ->
+ Checkpoints = dreyfus_util:get_purge_checkpoints(Db),
+ ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints)
+ end,
+ Dbs
+ ),
+ clouseau_rpc:cleanup(DbName, Sigs),
+ ok
catch
- error:{badmatch, _Error} ->
- []
+ error:database_does_not_exist ->
+ ok
end.
-cleanup_local_purge_doc(DbName, ActiveSigs) ->
- {ok, BaseDir} = clouseau_rpc:get_root_dir(),
- DbNamePattern = <<DbName/binary, ".*">>,
- Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]),
- Pattern = binary_to_list(iolist_to_binary(Pattern0)),
- DirListStrs = filelib:wildcard(Pattern),
- DirList = [iolist_to_binary(DL) || DL <- DirListStrs],
- LocalShards = mem3:local_shards(DbName),
- ActiveDirs = lists:foldl(
- fun(LS, AccOuter) ->
- lists:foldl(
- fun(Sig, AccInner) ->
- DirName = filename:join([BaseDir, LS#shard.name, Sig]),
- [DirName | AccInner]
- end,
- AccOuter,
- ActiveSigs
- )
- end,
- [],
- LocalShards
- ),
-
- DeadDirs = DirList -- ActiveDirs,
- lists:foreach(
- fun(IdxDir) ->
- Sig = dreyfus_util:get_signature_from_idxdir(IdxDir),
- case Sig of
- undefined ->
- ok;
- _ ->
- DocId = dreyfus_util:get_local_purge_doc_id(Sig),
- LocalShards = mem3:local_shards(DbName),
- lists:foreach(
- fun(LS) ->
- ShardDbName = LS#shard.name,
- {ok, ShardDb} = couch_db:open_int(ShardDbName, []),
- case couch_db:open_doc(ShardDb, DocId, []) of
- {ok, LocalPurgeDoc} ->
- couch_db:update_doc(
- ShardDb,
- LocalPurgeDoc#doc{deleted = true},
- [?ADMIN_CTX]
- );
- {not_found, _} ->
- ok
- end,
- couch_db:close(ShardDb)
- end,
- LocalShards
- )
- end
- end,
- DeadDirs
- ).
+recv(DbName, Reqs, Timeout) ->
+ case erpc:receive_response(Reqs, Timeout, true) of
+ {ok, _Lable, Reqs1} ->
+ recv(DbName, Reqs1, Timeout);
+ {Error, Label, Reqs1} ->
+ ErrMsg = "~p : error cleaning dreyfus indexes db:~p req:~p
error:~p",
+ couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]),
+ recv(DbName, Reqs1, Timeout);
+ no_request ->
+ ok
+ end.
diff --git a/src/dreyfus/src/dreyfus_index.erl
b/src/dreyfus/src/dreyfus_index.erl
index c97a837d5..5295a0065 100644
--- a/src/dreyfus/src/dreyfus_index.erl
+++ b/src/dreyfus/src/dreyfus_index.erl
@@ -22,13 +22,13 @@
% public api.
-export([
start_link/2,
- design_doc_to_index/2,
+ design_doc_to_index/3,
await/2,
search/2,
info/1,
group1/2,
group2/2,
- design_doc_to_indexes/1
+ design_doc_to_indexes/2
]).
% gen_server api.
@@ -87,14 +87,14 @@ to_index_pid(Pid) ->
false -> Pid
end.
-design_doc_to_indexes(#doc{body = {Fields}} = Doc) ->
+design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) ->
RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
case RawIndexes of
{IndexList} when is_list(IndexList) ->
{IndexNames, _} = lists:unzip(IndexList),
lists:flatmap(
fun(IndexName) ->
- case (catch design_doc_to_index(Doc, IndexName)) of
+ case (catch design_doc_to_index(DbName, Doc, IndexName)) of
{ok, #index{} = Index} -> [Index];
_ -> []
end
@@ -301,7 +301,7 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig})
->
Error
end.
-design_doc_to_index(#doc{id = Id, body = {Fields}}, IndexName) ->
+design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) ->
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
InvalidDDocError =
@@ -323,6 +323,7 @@ design_doc_to_index(#doc{id = Id, body = {Fields}},
IndexName) ->
)
),
{ok, #index{
+ dbname = DbName,
analyzer = Analyzer,
ddoc_id = Id,
def = Def,
diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl
index ffa75fb61..3aed82d76 100644
--- a/src/dreyfus/src/dreyfus_rpc.erl
+++ b/src/dreyfus/src/dreyfus_rpc.erl
@@ -46,7 +46,7 @@ call(Fun, DbName, DDoc, IndexName, QueryArgs0) ->
stale = Stale
} = QueryArgs,
{_LastSeq, MinSeq} = calculate_seqs(Db, Stale),
- case dreyfus_index:design_doc_to_index(DDoc, IndexName) of
+ case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of
{ok, Index} ->
try
rexi:reply(index_call(Fun, DbName, Index, QueryArgs, MinSeq))
@@ -81,7 +81,7 @@ info(DbName, DDoc, IndexName) ->
info_int(DbName, DDoc, IndexName) ->
erlang:put(io_priority, {search, DbName}),
check_interactive_mode(),
- case dreyfus_index:design_doc_to_index(DDoc, IndexName) of
+ case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of
{ok, Index} ->
case dreyfus_index_manager:get_index(DbName, Index) of
{ok, Pid} ->
@@ -102,7 +102,7 @@ info_int(DbName, DDoc, IndexName) ->
disk_size(DbName, DDoc, IndexName) ->
erlang:put(io_priority, {search, DbName}),
check_interactive_mode(),
- case dreyfus_index:design_doc_to_index(DDoc, IndexName) of
+ case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of
{ok, Index} ->
Result = dreyfus_index_manager:get_disk_size(DbName, Index),
rexi:reply(Result);
diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl
index a8afc36b6..b8806c089 100644
--- a/src/dreyfus/src/dreyfus_util.erl
+++ b/src/dreyfus/src/dreyfus_util.erl
@@ -25,9 +25,11 @@
ensure_local_purge_docs/2,
get_value_from_options/2,
get_local_purge_doc_id/1,
+ get_purge_checkpoints/1,
get_local_purge_doc_body/4,
maybe_create_local_purge_doc/2,
maybe_create_local_purge_doc/3,
+ get_signatures_from_ddocs/2,
get_signature_from_idxdir/1,
verify_index_exists/2
]).
@@ -305,7 +307,7 @@ ensure_local_purge_docs(DbName, DDocs) ->
undefined ->
false;
_ ->
- try dreyfus_index:design_doc_to_indexes(DDoc) of
+ try dreyfus_index:design_doc_to_indexes(DbName, DDoc)
of
SIndexes -> ensure_local_purge_doc(Db, SIndexes)
catch
_:_ ->
@@ -360,6 +362,32 @@ maybe_create_local_purge_doc(Db, IndexPid, Index) ->
get_local_purge_doc_id(Sig) ->
?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ "dreyfus-" ++ Sig).
+% Returns a map of `Sig => DocId` elements for all the purge view
+% checkpoint docs. Sig is a hex-encoded binary.
+%
+get_purge_checkpoints(Db) ->
+ couch_index_util:get_purge_checkpoints(Db, <<"dreyfus">>).
+
+get_signatures_from_ddocs(DbName, DesignDocs) ->
+ SigList = lists:flatmap(fun(Doc) -> active_sigs(DbName, Doc) end,
DesignDocs),
+ #{Sig => true || Sig <- SigList}.
+
+active_sigs(DbName, #doc{body = {Fields}} = Doc) ->
+ try
+ {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
+ {IndexNames, _} = lists:unzip(RawIndexes),
+ [
+ begin
+ {ok, Index} = dreyfus_index:design_doc_to_index(DbName, Doc,
IndexName),
+ Index#index.sig
+ end
+ || IndexName <- IndexNames
+ ]
+ catch
+ error:{badmatch, _Error} ->
+ []
+ end.
+
get_signature_from_idxdir(IdxDir) ->
IdxDirList = filename:split(IdxDir),
Sig = lists:last(IdxDirList),
@@ -415,7 +443,7 @@ verify_index_exists(DbName, Props) ->
case couch_db:get_design_doc(Db, DDocId) of
{ok, #doc{} = DDoc} ->
{ok, IdxState} = dreyfus_index:design_doc_to_index(
- DDoc, IndexName
+ DbName, DDoc, IndexName
),
IdxState#index.sig == Sig;
{not_found, _} ->
diff --git a/src/dreyfus/test/eunit/dreyfus_purge_test.erl
b/src/dreyfus/test/eunit/dreyfus_purge_test.erl
index 17bd5cd89..a7c0068e0 100644
--- a/src/dreyfus/test/eunit/dreyfus_purge_test.erl
+++ b/src/dreyfus/test/eunit/dreyfus_purge_test.erl
@@ -1102,17 +1102,17 @@ get_sigs(DbName) ->
{ok, DesignDocs} = fabric:design_docs(DbName),
lists:usort(
lists:flatmap(
- fun active_sigs/1,
+ fun(Doc) -> active_sigs(DbName, Doc) end,
[couch_doc:from_json_obj(DD) || DD <- DesignDocs]
)
).
-active_sigs(#doc{body = {Fields}} = Doc) ->
+active_sigs(DbName, #doc{body = {Fields}} = Doc) ->
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
{IndexNames, _} = lists:unzip(RawIndexes),
[
begin
- {ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName),
+ {ok, Index} = dreyfus_index:design_doc_to_index(DbName, Doc,
IndexName),
Index#index.sig
end
|| IndexName <- IndexNames
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 0d567ac47..3a6ce7ebd 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -66,9 +66,10 @@
-export([
design_docs/1,
reset_validation_funs/1,
- cleanup_index_files/0,
- cleanup_index_files/1,
+ cleanup_index_files_all_nodes/0,
cleanup_index_files_all_nodes/1,
+ cleanup_index_files_this_node/0,
+ cleanup_index_files_this_node/1,
dbname/1,
db_uuids/1
]).
@@ -634,54 +635,17 @@ reset_validation_funs(DbName) ->
|| #shard{node = Node, name = Name} <- mem3:shards(DbName)
].
-%% @doc clean up index files for all Dbs
--spec cleanup_index_files() -> [ok].
-cleanup_index_files() ->
- {ok, Dbs} = fabric:all_dbs(),
- [cleanup_index_files(Db) || Db <- Dbs].
+cleanup_index_files_this_node() ->
+ fabric_index_cleanup:cleanup_this_node().
-%% @doc clean up index files for a specific db
--spec cleanup_index_files(dbname()) -> ok.
-cleanup_index_files(DbName) ->
- try
- ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))],
- cleanup_local_indices_and_purge_checkpoints(ShardNames)
- catch
- error:database_does_not_exist ->
- ok
- end.
+cleanup_index_files_this_node(Db) ->
+ fabric_index_cleanup:cleanup_this_node(dbname(Db)).
-cleanup_local_indices_and_purge_checkpoints([]) ->
- ok;
-cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) ->
- AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs),
- AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs),
- Sigs = couch_mrview_util:get_signatures(hd(Dbs)),
- ok = cleanup_purges(Sigs, AllPurges, Dbs),
- ok = cleanup_indices(Sigs, AllIndices).
-
-cleanup_purges(Sigs, AllPurges, Dbs) ->
- Fun = fun(DbPurges, Db) ->
- couch_mrview_cleanup:cleanup_purges(Db, Sigs, DbPurges)
- end,
- lists:zipwith(Fun, AllPurges, Dbs),
- ok.
+cleanup_index_files_all_nodes() ->
+ fabric_index_cleanup:cleanup_all_nodes().
-cleanup_indices(Sigs, AllIndices) ->
- Fun = fun(DbIndices) ->
- couch_mrview_cleanup:cleanup_indices(Sigs, DbIndices)
- end,
- lists:foreach(Fun, AllIndices).
-
-%% @doc clean up index files for a specific db on all nodes
--spec cleanup_index_files_all_nodes(dbname()) -> [reference()].
-cleanup_index_files_all_nodes(DbName) ->
- lists:foreach(
- fun(Node) ->
- rexi:cast(Node, {?MODULE, cleanup_index_files, [DbName]})
- end,
- mem3:nodes()
- ).
+cleanup_index_files_all_nodes(Db) ->
+ fabric_index_cleanup:cleanup_all_nodes(dbname(Db)).
%% some simple type validation and transcoding
dbname(DbName) when is_list(DbName) ->
diff --git a/src/fabric/src/fabric_index_cleanup.erl
b/src/fabric/src/fabric_index_cleanup.erl
new file mode 100644
index 000000000..13759ba1d
--- /dev/null
+++ b/src/fabric/src/fabric_index_cleanup.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_index_cleanup).
+
+-export([
+ cleanup_all_nodes/0,
+ cleanup_all_nodes/1,
+ cleanup_this_node/0,
+ cleanup_this_node/1
+]).
+
+cleanup_all_nodes() ->
+ Fun = fun(DbName, _) -> cleanup_all_nodes(DbName) end,
+ mem3:fold_dbs(Fun, nil),
+ ok.
+
+cleanup_all_nodes(DbName) ->
+ cleanup_indexes(DbName, mem3_util:live_nodes()).
+
+cleanup_this_node() ->
+ Fun = fun(DbName, _) ->
+ case mem3:local_shards(DbName) of
+ [_ | _] -> cleanup_this_node(DbName);
+ [] -> ok
+ end
+ end,
+ mem3:fold_dbs(Fun, nil),
+ ok.
+
+cleanup_this_node(DbName) ->
+ cleanup_indexes(DbName, [config:node_name()]).
+
+cleanup_indexes(DbName, Nodes) ->
+ try fabric_util:get_design_doc_records(DbName) of
+ {ok, DDocs} when is_list(DDocs) ->
+ VSigs = couch_mrview_util:get_signatures_from_ddocs(DbName, DDocs),
+ DSigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs),
+ NSigs = nouveau_util:get_signatures_from_ddocs(DbName, DDocs),
+ Shards = [S || S <- mem3:shards(DbName),
lists:member(mem3:node(S), Nodes)],
+ ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1,
Shards),
+ Fun = fun(Node, Dbs, Acc) ->
+ Acc1 = send(Node, couch_mrview_cleanup, cleanup, [Dbs, VSigs],
Acc),
+ Acc2 = send(Node, dreyfus_fabric_cleanup, go_local, [DbName,
Dbs, DSigs], Acc1),
+ Acc3 = send(Node, nouveau_fabric_cleanup, go_local, [DbName,
Dbs, NSigs], Acc2),
+ Acc3
+ end,
+ Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode),
+ recv(DbName, Reqs, fabric_util:abs_request_timeout());
+ Error ->
+ couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE,
DbName, Error]),
+ Error
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
+
+send(Node, M, F, A, Reqs) ->
+ Label = {Node, M, F},
+ erpc:send_request(Node, M, F, A, Label, Reqs).
+
+recv(DbName, Reqs, Timeout) ->
+ case erpc:receive_response(Reqs, Timeout, true) of
+ {ok, _Label, Reqs1} ->
+ recv(DbName, Reqs1, Timeout);
+ {Error, Label, Reqs1} ->
+ ErrMsg = "~p : error cleaning indexes db:~p req:~p error:~p",
+ couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]),
+ recv(DbName, Reqs1, Timeout);
+ no_request ->
+ ok
+ end.
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 88d9839df..b10f20a63 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -26,6 +26,7 @@
doc_id_and_rev/1
]).
-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0,
view_timeout/1, timeout/2]).
+-export([abs_request_timeout/0]).
-export([log_timeout/2, remove_done_workers/2]).
-export([is_users_db/1, is_replicator_db/1]).
-export([open_cluster_db/1, open_cluster_db/2]).
@@ -35,6 +36,7 @@
-export([worker_ranges/1]).
-export([get_uuid_prefix_len/0]).
-export([isolate/1, isolate/2]).
+-export([get_design_doc_records/1]).
-compile({inline, [{doc_id_and_rev, 1}]}).
@@ -107,6 +109,15 @@ log_timeout(Workers, EndPoint) ->
Workers
).
+% Return {abs, MonotonicMSec}. This is a format used by erpc to
+% provide an absolute time limit for a collection or requests
+% See https://www.erlang.org/doc/apps/kernel/erpc.html#t:timeout_time/0
+%
+abs_request_timeout() ->
+ Timeout = fabric_util:request_timeout(),
+ NowMSec = erlang:monotonic_time(millisecond),
+ {abs, NowMSec + Timeout}.
+
remove_done_workers(Workers, WaitingIndicator) ->
[W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator].
@@ -391,6 +402,19 @@ worker_ranges(Workers) ->
get_uuid_prefix_len() ->
config:get_integer("fabric", "uuid_prefix_len", 7).
+% Get design #doc{} records. Run in an isolated process. This is often used
+% when computing signatures of various indexes
+%
+get_design_doc_records(DbName) ->
+ fabric_util:isolate(fun() ->
+ case fabric:design_docs(DbName) of
+ {ok, DDocs} when is_list(DDocs) ->
+ Fun = fun({[_ | _]} = Doc) -> couch_doc:from_json_obj(Doc) end,
+ {ok, lists:map(Fun, DDocs)};
+ Else ->
+ Else
+ end
+ end).
% If we issue multiple fabric calls from the same process we have to isolate
% them so in case of error they don't pollute the processes dictionary or the
% mailbox
diff --git a/src/fabric/test/eunit/fabric_tests.erl
b/src/fabric/test/eunit/fabric_tests.erl
index 2788af19e..77327f445 100644
--- a/src/fabric/test/eunit/fabric_tests.erl
+++ b/src/fabric/test/eunit/fabric_tests.erl
@@ -47,17 +47,20 @@ teardown({Ctx, DbName}) ->
test_util:stop_couch(Ctx).
t_cleanup_index_files(_) ->
- CheckFun = fun(Res) -> Res =:= ok end,
- ?assert(lists:all(CheckFun, fabric:cleanup_index_files())).
+ ?assertEqual(ok, fabric:cleanup_index_files_this_node()),
+ ?assertEqual(ok, fabric:cleanup_index_files_all_nodes()).
t_cleanup_index_files_with_existing_db({_, DbName}) ->
- ?assertEqual(ok, fabric:cleanup_index_files(DbName)).
+ ?assertEqual(ok, fabric:cleanup_index_files_this_node(DbName)),
+ ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(DbName)),
+ ?assertEqual(ok, fabric:cleanup_index_files_this_node(<<"non_existent">>)),
+ ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(<<"non_existent">>)).
t_cleanup_index_files_with_view_data({_, DbName}) ->
Sigs = sigs(DbName),
Indices = indices(DbName),
Purges = purges(DbName),
- ok = fabric:cleanup_index_files(DbName),
+ ok = fabric:cleanup_index_files_all_nodes(DbName),
% We haven't inadvertently removed any active index bits
?assertEqual(Sigs, sigs(DbName)),
?assertEqual(Indices, indices(DbName)),
@@ -65,7 +68,7 @@ t_cleanup_index_files_with_view_data({_, DbName}) ->
t_cleanup_index_files_with_deleted_db(_) ->
SomeDb = ?tempdb(),
- ?assertEqual(ok, fabric:cleanup_index_files(SomeDb)).
+ ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(SomeDb)).
t_cleanup_index_file_after_ddoc_update({_, DbName}) ->
?assertEqual(
@@ -84,7 +87,7 @@ t_cleanup_index_file_after_ddoc_update({_, DbName}) ->
),
update_ddoc(DbName, <<"_design/foo">>, <<"bar1">>),
- ok = fabric:cleanup_index_files(DbName),
+ ok = fabric:cleanup_index_files_all_nodes(DbName),
{ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar1">>),
% One 4bc stays, da8 should gone and 9e3 is added
@@ -120,7 +123,7 @@ t_cleanup_index_file_after_ddoc_delete({_, DbName}) ->
),
delete_ddoc(DbName, <<"_design/foo">>),
- ok = fabric:cleanup_index_files(DbName),
+ ok = fabric:cleanup_index_files_all_nodes(DbName),
% 4bc stays the same, da8 should be gone
?assertEqual(
@@ -137,13 +140,13 @@ t_cleanup_index_file_after_ddoc_delete({_, DbName}) ->
),
delete_ddoc(DbName, <<"_design/boo">>),
- ok = fabric:cleanup_index_files(DbName),
+ ok = fabric:cleanup_index_files_all_nodes(DbName),
?assertEqual([], indices(DbName)),
?assertEqual([], purges(DbName)),
% cleaning a db with all deleted indices should still work
- ok = fabric:cleanup_index_files(DbName),
+ ok = fabric:cleanup_index_files_all_nodes(DbName),
?assertEqual([], indices(DbName)),
?assertEqual([], purges(DbName)).
diff --git a/src/ken/src/ken_server.erl b/src/ken/src/ken_server.erl
index b73088e6c..acafb41c5 100644
--- a/src/ken/src/ken_server.erl
+++ b/src/ken/src/ken_server.erl
@@ -341,7 +341,7 @@ update_ddoc_indexes(Name, #doc{} = Doc, State) ->
search_updated(Name, Doc, Seq, State) ->
case should_update(Doc, <<"indexes">>) of
true ->
- try dreyfus_index:design_doc_to_indexes(Doc) of
+ try dreyfus_index:design_doc_to_indexes(Name, Doc) of
SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq,
State)
catch
_:_ ->
diff --git a/src/mem3/src/mem3_reshard_index.erl
b/src/mem3/src/mem3_reshard_index.erl
index 41e225d22..0cd6f2cf3 100644
--- a/src/mem3/src/mem3_reshard_index.erl
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -101,7 +101,7 @@ nouveau_indices(DbName, Doc) ->
dreyfus_indices(DbName, Doc) ->
try
- Indices = dreyfus_index:design_doc_to_indexes(Doc),
+ Indices = dreyfus_index:design_doc_to_indexes(DbName, Doc),
[{?DREYFUS, DbName, Index} || Index <- Indices]
catch
Tag:Err ->
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl
b/src/mem3/test/eunit/mem3_reshard_test.erl
index a72f07d6e..9fed4e295 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -371,7 +371,7 @@ indices_can_be_built_with_errors(#{db1 := Db}) ->
end)}.
mock_dreyfus_indices() ->
- meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
+ meck:expect(dreyfus_index, design_doc_to_indexes, fun(_, Doc) ->
#doc{body = {BodyProps}} = Doc,
case couch_util:get_value(<<"indexes">>, BodyProps) of
undefined ->
diff --git a/src/nouveau/src/nouveau_fabric_cleanup.erl
b/src/nouveau/src/nouveau_fabric_cleanup.erl
index 07167e8c7..75c2190b8 100644
--- a/src/nouveau/src/nouveau_fabric_cleanup.erl
+++ b/src/nouveau/src/nouveau_fabric_cleanup.erl
@@ -14,40 +14,52 @@
-module(nouveau_fabric_cleanup).
--include_lib("couch/include/couch_db.hrl").
-
--include("nouveau.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--export([go/1]).
+-export([go/1, go_local/3]).
go(DbName) ->
- DesignDocs =
- case fabric:design_docs(DbName) of
- {ok, DDocs} when is_list(DDocs) ->
- DDocs;
- Else ->
- couch_log:debug("Invalid design docs: ~p~n", [Else]),
- []
- end,
- ActiveSigs =
- lists:usort(
- lists:flatmap(
- fun(Doc) -> active_sigs(DbName, Doc) end,
- [couch_doc:from_json_obj(DD) || DD <- DesignDocs]
- )
- ),
- Shards = mem3:shards(DbName),
- lists:foreach(
- fun(Shard) ->
- Path =
- <<"shards/",
(mem3_util:range_to_hex(Shard#shard.range))/binary, "/", DbName/binary,
- ".*/*">>,
- rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, [Path,
ActiveSigs]})
- end,
- Shards
- ).
+ case fabric_util:get_design_doc_records(DbName) of
+ {ok, DDocs} when is_list(DDocs) ->
+ Sigs = nouveau_util:get_signatures_from_ddocs(DbName, DDocs),
+ Shards = mem3:shards(DbName),
+ ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1,
Shards),
+ Fun = fun(Node, Dbs, Acc) ->
+ erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs,
Sigs], Node, Acc)
+ end,
+ Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode),
+ recv(DbName, Reqs, fabric_util:abs_request_timeout());
+ Error ->
+ couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE,
DbName, Error]),
+ Error
+ end.
+
+% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2
+%
+go_local(DbName, Dbs, Sigs) ->
+ try
+ lists:foreach(
+ fun(Db) ->
+ Sz = byte_size(DbName),
+ <<"shards/", Range:17/binary, "/", DbName:Sz/binary, ".",
_/binary>> = Db,
+ Checkpoints = nouveau_util:get_purge_checkpoints(Db),
+ ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints),
+ Path = <<"shards/", Range/binary, "/", DbName/binary, ".*/*">>,
+ nouveau_api:delete_path(nouveau_util:index_name(Path),
maps:keys(Sigs))
+ end,
+ Dbs
+ )
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
-active_sigs(DbName, #doc{} = Doc) ->
- Indexes = nouveau_util:design_doc_to_indexes(DbName, Doc),
- lists:map(fun(Index) -> Index#index.sig end, Indexes).
+recv(DbName, Reqs, Timeout) ->
+ case erpc:receive_response(Reqs, Timeout, true) of
+ {ok, _Label, Reqs1} ->
+ recv(DbName, Reqs1, Timeout);
+ {Error, Label, Reqs1} ->
+ ErrMsg = "~p : error cleaning nouveau indexes db:~p node: ~p
error:~p",
+ couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]),
+ recv(DbName, Reqs1, Timeout);
+ no_request ->
+ ok
+ end.
diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl
index f7ab5a433..2037c7e7e 100644
--- a/src/nouveau/src/nouveau_rpc.erl
+++ b/src/nouveau/src/nouveau_rpc.erl
@@ -17,8 +17,7 @@
-export([
search/3,
- info/2,
- cleanup/2
+ info/2
]).
-include("nouveau.hrl").
@@ -88,7 +87,3 @@ info(DbName, #index{} = Index0) ->
{error, Reason} ->
rexi:reply({error, Reason})
end.
-
-cleanup(Path, Exclusions) ->
- nouveau_api:delete_path(nouveau_util:index_name(Path), Exclusions),
- rexi:reply(ok).
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index 0dfcb1e1e..3df43f2ff 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -27,6 +27,8 @@
maybe_create_local_purge_doc/2,
get_local_purge_doc_id/1,
get_local_purge_doc_body/3,
+ get_purge_checkpoints/1,
+ get_signatures_from_ddocs/2,
nouveau_url/0
]).
@@ -174,6 +176,9 @@ maybe_create_local_purge_doc(Db, Index) ->
get_local_purge_doc_id(Sig) ->
iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]).
+get_purge_checkpoints(Db) ->
+ couch_index_util:get_purge_checkpoints(Db, <<"nouveau">>).
+
get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
#index{
name = IdxName,
@@ -193,5 +198,13 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
]},
couch_doc:from_json_obj(JsonList).
+get_signatures_from_ddocs(DbName, DesignDocs) ->
+ SigList = lists:flatmap(fun(Doc) -> active_sigs(DbName, Doc) end,
DesignDocs),
+ #{Sig => true || Sig <- SigList}.
+
+active_sigs(DbName, #doc{} = Doc) ->
+ Indexes = nouveau_util:design_doc_to_indexes(DbName, Doc),
+ lists:map(fun(Index) -> Index#index.sig end, Indexes).
+
nouveau_url() ->
config:get("nouveau", "url", "http://127.0.0.1:5987").
diff --git a/src/smoosh/src/smoosh_channel.erl
b/src/smoosh/src/smoosh_channel.erl
index 74e625944..eabb751ad 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -451,7 +451,7 @@ re_enqueue(Obj) ->
cleanup_index_files(DbName) ->
case should_clean_up_indices() of
- true -> fabric:cleanup_index_files(DbName);
+ true -> fabric:cleanup_index_files_this_node(DbName);
false -> ok
end.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
index 96caa28f0..517024875 100644
--- a/src/smoosh/test/smoosh_tests.erl
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -155,7 +155,7 @@ t_index_cleanup_happens_by_default(DbName) ->
get_channel_pid("index_cleanup") ! unpause,
{ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
% View cleanup should have been invoked
- meck:wait(fabric, cleanup_index_files, [DbName], 4000),
+ meck:wait(fabric, cleanup_index_files_this_node, [DbName], 4000),
wait_view_compacted(DbName, <<"foo">>).
t_index_cleanup_can_be_disabled(DbName) ->