This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch lucene-10-again in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 30f66fb604694469657ec563aaa247ed36d078ce Author: Robert Newson <[email protected]> AuthorDate: Mon Aug 18 17:04:11 2025 +0100 Allow ?upgrade=true on search to trigger upgrade TODO ugly code, don't like Upgrade arg plastered all over the place. --- .../org/apache/couchdb/nouveau/core/Index.java | 8 ++ .../apache/couchdb/nouveau/core/IndexManager.java | 103 ++++++++++++++------- .../couchdb/nouveau/health/IndexHealthCheck.java | 4 +- .../couchdb/nouveau/resources/IndexResource.java | 27 ++++-- .../couchdb/nouveau/lucene/LuceneIndexTest.java | 2 +- src/nouveau/src/nouveau_api.erl | 69 ++++++++------ src/nouveau/src/nouveau_fabric_info.erl | 14 +-- src/nouveau/src/nouveau_httpd.erl | 15 ++- src/nouveau/src/nouveau_index_manager.erl | 31 ++++--- src/nouveau/src/nouveau_index_updater.erl | 53 +++++------ src/nouveau/src/nouveau_rpc.erl | 9 +- 11 files changed, 207 insertions(+), 128 deletions(-) diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index b8a03e1d1..2e88241e6 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -52,6 +52,14 @@ public abstract class Index implements Closeable { return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize, upgradeRequired); } + public synchronized long getUpdateSeq() { + return updateSeq; + } + + public synchronized long getPurgeSeq() { + return purgeSeq; + } + protected abstract int doNumDocs() throws IOException; protected abstract long doDiskSize() throws IOException; diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index da185aa2a..b5acaf993 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -38,6 +38,7 @@ import org.apache.couchdb.nouveau.api.IndexDefinition; import org.apache.couchdb.nouveau.lucene.LuceneAnalyzerFactory; import org.apache.couchdb.nouveau.lucene.LuceneIndex; import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig.OpenMode; @@ -78,7 +79,8 @@ public final class IndexManager implements Managed { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private ScheduledFuture<?> commitFuture; private HolderState state = HolderState.NOT_LOADED; - private Index index; + private Index latestIndex; + private Index legacyIndex; } private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class); @@ -104,7 +106,7 @@ public final class IndexManager implements Managed { private StripedLock<String> createLock; - public <R> R with(final String name, final IndexFunction<Index, R> indexFun) + public <R> R with(final String name, final boolean upgrade, final IndexFunction<Index, R> indexFun) throws IOException, InterruptedException { evictIfOverCapacity(); @@ -123,21 +125,28 @@ public final class IndexManager implements Managed { // Load if not already loaded or remove if Lucene closed the index elsewhere. if (holder.state == HolderState.NOT_LOADED - || (holder.state == HolderState.LOADED && !holder.index.isOpen())) { + || (holder.state == HolderState.LOADED + && (!holder.latestIndex.isOpen() + || (holder.legacyIndex != null && !holder.legacyIndex.isOpen())))) { holder.lock.readLock().unlock(); holder.lock.writeLock().lock(); try { - if (holder.state == HolderState.LOADED && !holder.index.isOpen()) { + if (holder.state == HolderState.LOADED + && (holder.legacyIndex != null && !holder.legacyIndex.isOpen())) { LOGGER.info("removing closed index {}", name); holder.state = HolderState.UNLOADED; - holder.index = null; + if (holder.latestIndex.isOpen() || holder.legacyIndex.isOpen()) { + close(name, holder); + } + holder.latestIndex = null; + holder.legacyIndex = null; synchronized (cache) { cache.remove(name, holder); } continue retry; } if (holder.state == HolderState.NOT_LOADED) { - holder.index = load(name); + load(name, holder); holder.commitFuture = this.schedulerExecutorService.scheduleWithFixedDelay( commitFun(name, holder), commitIntervalSeconds, @@ -145,6 +154,12 @@ public final class IndexManager implements Managed { TimeUnit.SECONDS); holder.state = HolderState.LOADED; } + if (holder.state == HolderState.LOADED && upgradeComplete(holder)) { + LOGGER.info("Upgrade of {} complete, removing legacy index", name); + holder.legacyIndex.setDeleteOnClose(true); + holder.legacyIndex.close(); + holder.legacyIndex = null; + } holder.lock.readLock().lock(); } finally { holder.lock.writeLock().unlock(); @@ -159,7 +174,11 @@ public final class IndexManager implements Managed { Thread.sleep(1000); continue retry; case LOADED: - return indexFun.apply(holder.index); + // Use latest index if upgrading or legacy index if it exists if not. + var index = upgrade + ? holder.latestIndex + : holder.legacyIndex != null ? holder.legacyIndex : holder.latestIndex; + return indexFun.apply(index); } } finally { holder.lock.readLock().unlock(); @@ -167,6 +186,14 @@ public final class IndexManager implements Managed { } } + private boolean upgradeComplete(final IndexHolder holder) throws IOException { + if (holder.legacyIndex == null) { + return false; + } + return holder.legacyIndex.getUpdateSeq() >= holder.latestIndex.getUpdateSeq() + && holder.legacyIndex.getPurgeSeq() >= holder.latestIndex.getPurgeSeq(); + } + private void evictIfOverCapacity() throws IOException, InterruptedException { while (true) { final String candidate; @@ -190,7 +217,7 @@ public final class IndexManager implements Managed { switch (holder.state) { case LOADED: if (forceDelete) { - holder.index.setDeleteOnClose(true); + holder.latestIndex.setDeleteOnClose(true); } LOGGER.info("closing {}", name); try { @@ -199,7 +226,8 @@ public final class IndexManager implements Managed { LOGGER.error("I/O exception when evicting {}", name, e); } holder.state = HolderState.UNLOADED; - holder.index = null; + holder.latestIndex = null; + holder.legacyIndex = null; break; case NOT_LOADED: case UNLOADED: @@ -331,8 +359,11 @@ public final class IndexManager implements Managed { return () -> { holder.lock.readLock().lock(); try { - if (holder.index.commit()) { - LOGGER.info("committed {}", name); + if (holder.latestIndex.commit()) { + LOGGER.info("committed {} (latest)", name); + } + if (holder.legacyIndex != null && holder.legacyIndex.commit()) { + LOGGER.info("committed {} (legacy)", name); } } catch (final IOException e) { LOGGER.warn("I/O exception while committing " + name, e); @@ -377,14 +408,31 @@ public final class IndexManager implements Managed { throw new WebApplicationException(name + " attempts to escape from index root directory", Status.BAD_REQUEST); } - private Index load(final String name) throws IOException { + private void load(final String name, final IndexHolder holder) throws IOException { LOGGER.info("opening {}", name); final Path path = indexPath(name); final IndexDefinition indexDefinition = loadIndexDefinition(name); final Analyzer analyzer = LuceneAnalyzerFactory.fromDefinition(indexDefinition); - final int version = getIndexVersion(path); + holder.latestIndex = load(name, Version.LATEST.major, analyzer); + if (indexExists(path.resolve(Integer.toString(Version.LATEST.major - 1)))) { + LOGGER.warn("{} has a legacy index", name); + holder.legacyIndex = load(name, Version.LATEST.major - 1, analyzer); + } + } + + // Same logic as DirectoryReader.indexExists() but without needing a Directory. + private boolean indexExists(final Path path) throws IOException { + if (!Files.exists(path)) { + return false; + } + try (var stream = Files.walk(path, 1)) { + return stream.anyMatch(f -> f.getFileName().toString().startsWith(IndexFileNames.SEGMENTS + "_")); + } + } + + private LuceneIndex load(final String name, final int version, final Analyzer analyzer) throws IOException { + final Path indexPath = indexPath(name).resolve(Integer.toString(version)); final boolean upgradeRequired = version < Version.LATEST.major; - final Path indexPath = path.resolve(Integer.toString(version)); final Directory dir = new DirectIODirectory(FSDirectory.open(indexPath)); final IndexWriterConfig config = new IndexWriterConfig(analyzer); config.setOpenMode(upgradeRequired ? OpenMode.APPEND : OpenMode.CREATE_OR_APPEND); @@ -396,19 +444,6 @@ public final class IndexManager implements Managed { return new LuceneIndex(analyzer, writer, updateSeq, purgeSeq, upgradeRequired, searcherManager); } - /** - * Find highest version index on disk, or latest version if none. - */ - private int getIndexVersion(final Path path) throws IOException { - if (Files.exists(path.resolve(Integer.toString(Version.LATEST.major)))) { - return Version.LATEST.major; - } - if (Files.exists(path.resolve(Integer.toString(Version.LATEST.major - 1)))) { - return Version.LATEST.major - 1; - } - return Version.LATEST.major; - } - private long getSeq(final IndexWriter writer, final String key) throws IOException { final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData(); if (commitData == null) { @@ -429,15 +464,21 @@ public final class IndexManager implements Managed { holder.commitFuture.cancel(true); IOUtils.runAll( () -> { - if (holder.index.commit()) { - LOGGER.debug("committed {} before close", name); + if (holder.latestIndex.commit()) { + LOGGER.debug("committed {} (latest) before close", name); + } + if (holder.legacyIndex != null && holder.legacyIndex.commit()) { + LOGGER.debug("committed {} (legacy) before close", name); } }, () -> { - holder.index.close(); + holder.latestIndex.close(); + if (holder.legacyIndex != null) { + holder.legacyIndex.close(); + } }, () -> { - if (holder.index.isDeleteOnClose()) { + if (holder.latestIndex.isDeleteOnClose()) { IOUtils.rm(indexRootPath(name)); } }); diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index f20efe0c9..64733eb20 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -43,13 +43,13 @@ public final class IndexHealthCheck extends HealthCheck { try { final DocumentUpdateRequest documentUpdateRequest = new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); - indexResource.updateDoc(name, "foo", documentUpdateRequest); + indexResource.updateDoc(name, true, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); searchRequest.setQuery("_id:foo"); searchRequest.setMinUpdateSeq(1); - final SearchResults searchResults = indexResource.searchIndex(name, searchRequest); + final SearchResults searchResults = indexResource.searchIndex(name, true, searchRequest); if (searchResults.getTotalHits() == 1) { return Result.healthy(); } else { diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index a52e00da9..d66220496 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -26,6 +26,7 @@ import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import java.io.IOException; import java.util.List; @@ -65,10 +66,11 @@ public final class IndexResource { @Path("/doc/{docId}") public Ok deleteDoc( @PathParam("name") String name, + @QueryParam("upgrade") boolean upgrade, @PathParam("docId") String docId, @NotNull @Valid DocumentDeleteRequest request) throws Exception { - return indexManager.with(name, (index) -> { + return indexManager.with(name, upgrade, (index) -> { index.delete(docId, request); return Ok.INSTANCE; }); @@ -81,15 +83,20 @@ public final class IndexResource { } @GET - public IndexInfo getIndexInfo(@PathParam("name") String name) throws Exception { - return indexManager.with(name, (index) -> { + public IndexInfo getIndexInfo(@PathParam("name") String name, @QueryParam("upgrade") boolean upgrade) + throws Exception { + return indexManager.with(name, upgrade, (index) -> { return index.info(); }); } @POST - public Ok setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) throws Exception { - return indexManager.with(name, (index) -> { + public Ok setIndexInfo( + @PathParam("name") String name, + @QueryParam("upgrade") boolean upgrade, + @NotNull @Valid IndexInfoRequest request) + throws Exception { + return indexManager.with(name, upgrade, (index) -> { if (request.getMatchUpdateSeq().isPresent() && request.getUpdateSeq().isPresent()) { index.setUpdateSeq( @@ -107,9 +114,12 @@ public final class IndexResource { @POST @Path("/search") - public SearchResults searchIndex(@PathParam("name") String name, @NotNull @Valid SearchRequest request) + public SearchResults searchIndex( + @PathParam("name") String name, + @QueryParam("upgrade") boolean upgrade, + @NotNull @Valid SearchRequest request) throws Exception { - return indexManager.with(name, (index) -> { + return indexManager.with(name, upgrade, (index) -> { return index.search(request); }); } @@ -118,10 +128,11 @@ public final class IndexResource { @Path("/doc/{docId}") public Ok updateDoc( @PathParam("name") String name, + @QueryParam("upgrade") boolean upgrade, @PathParam("docId") String docId, @NotNull @Valid DocumentUpdateRequest request) throws Exception { - return indexManager.with(name, (index) -> { + return indexManager.with(name, upgrade, (index) -> { index.update(docId, request); return Ok.INSTANCE; }); diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java index 9206b83d9..10dd599e8 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java @@ -56,7 +56,7 @@ public class LuceneIndexTest { config.setUseCompoundFile(false); final IndexWriter writer = new IndexWriter(dir, config); final SearcherManager searcherManager = new SearcherManager(writer, null); - return new LuceneIndex(analyzer, writer, 0L, 0L, searcherManager); + return new LuceneIndex(analyzer, writer, 0L, 0L, false, searcherManager); } protected final void cleanup(final Index index) throws IOException { diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 319170a4c..0cc571362 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -19,16 +19,16 @@ -export([ analyze/2, - index_info/1, + index_info/2, create_index/2, delete_path/1, delete_path/2, - delete_doc/4, - purge_doc/4, - update_doc/6, + delete_doc/5, + purge_doc/5, + update_doc/7, search/2, - set_purge_seq/3, - set_update_seq/3, + set_purge_seq/4, + set_update_seq/4, jaxrs_error/2 ]). @@ -56,8 +56,8 @@ analyze(Text, Analyzer) when analyze(_, _) -> {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty strings">>}}. -index_info(#index{} = Index) -> - Resp = send_if_enabled(index_path(Index), [], <<"GET">>), +index_info(#index{} = Index, Upgrade) -> + Resp = send_if_enabled(index_path(Index, Upgrade), [], <<"GET">>), case Resp of {ok, 200, _, RespBody} -> {ok, jiffy:decode(RespBody, [return_maps])}; @@ -98,8 +98,9 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when +delete_doc(#index{} = Index, Upgrade, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), + is_boolean(Upgrade), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), @@ -107,7 +108,7 @@ delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when -> ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, Resp = send_if_enabled( - doc_path(Index, DocId), + doc_path(Index, Upgrade, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) @@ -121,8 +122,9 @@ delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when send_error(Reason) end. -purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when +purge_doc(#index{} = Index, Upgrade, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), + is_boolean(Upgrade), is_integer(MatchSeq), MatchSeq >= 0, is_integer(PurgeSeq), @@ -130,7 +132,7 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when -> ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, Resp = send_if_enabled( - doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) + doc_path(Index, Upgrade, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) ), case Resp of {ok, 200, _, _} -> @@ -141,8 +143,9 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when +update_doc(#index{} = Index, Upgrade, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), + is_boolean(Upgrade), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), @@ -157,7 +160,7 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when fields => Fields }, Resp = send_if_enabled( - doc_path(Index, DocId), + doc_path(Index, Upgrade, DocId), [?JSON_CONTENT_TYPE], <<"PUT">>, jiffy:encode(ReqBody) @@ -171,9 +174,10 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when send_error(Reason) end. -search(#index{} = Index, QueryArgs) -> +search(#index{} = Index, #{} = QueryArgs) -> + Upgrade = maps:get(upgrade, QueryArgs, false), Resp = send_if_enabled( - search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(QueryArgs) + search_path(Index, Upgrade), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(QueryArgs) ), case Resp of {ok, 200, _, RespBody} -> @@ -187,23 +191,23 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) -> +set_update_seq(#index{} = Index, Upgrade, MatchSeq, UpdateSeq) -> ReqBody = #{ match_update_seq => MatchSeq, update_seq => UpdateSeq }, - set_seq(Index, ReqBody). + set_seq(Index, Upgrade, ReqBody). -set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) -> +set_purge_seq(#index{} = Index, Upgrade, MatchSeq, PurgeSeq) -> ReqBody = #{ match_purge_seq => MatchSeq, purge_seq => PurgeSeq }, - set_seq(Index, ReqBody). + set_seq(Index, Upgrade, ReqBody). -set_seq(#index{} = Index, ReqBody) -> +set_seq(#index{} = Index, Upgrade, ReqBody) -> Resp = send_if_enabled( - index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(ReqBody) + index_path(Index, Upgrade), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(ReqBody) ), case Resp of {ok, 200, _, _} -> @@ -217,20 +221,27 @@ set_seq(#index{} = Index, ReqBody) -> %% private functions index_path(Path) when is_binary(Path) -> - [<<"/index/">>, couch_util:url_encode(Path)]; -index_path(#index{} = Index) -> - [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))]. + [<<"/index/">>, couch_util:url_encode(Path)]. -doc_path(#index{} = Index, DocId) -> +index_path(#index{} = Index, false) -> + [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))]; +index_path(#index{} = Index, true) -> + [index_path(Index, false), <<"?upgrade=true">>]. + +doc_path(#index{} = Index, false, DocId) -> [ <<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index)), <<"/doc/">>, couch_util:url_encode(DocId) - ]. + ]; +doc_path(#index{} = Index, true, DocId) -> + [doc_path(Index, false, DocId), <<"?upgrade=true">>]. -search_path(#index{} = Index) -> - [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index)), <<"/search">>]. +search_path(#index{} = Index, false) -> + [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index)), <<"/search">>]; +search_path(#index{} = Index, true) -> + [search_path(Index, false), <<"?upgrade=true">>]. jaxrs_error(400, Body) -> {bad_request, message(Body)}; diff --git a/src/nouveau/src/nouveau_fabric_info.erl b/src/nouveau/src/nouveau_fabric_info.erl index 45bdf177e..50c38744a 100644 --- a/src/nouveau/src/nouveau_fabric_info.erl +++ b/src/nouveau/src/nouveau_fabric_info.erl @@ -15,28 +15,28 @@ -module(nouveau_fabric_info). --export([go/3]). +-export([go/4]). -include_lib("mem3/include/mem3.hrl"). -go(DbName, DDocId, IndexName) when is_binary(DDocId) -> +go(DbName, DDocId, IndexName, Upgrade) when is_binary(DDocId) -> {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", DDocId/binary>>, [ejson_body]), - go(DbName, DDoc, IndexName); -go(DbName, DDoc, IndexName) -> + go(DbName, DDoc, IndexName, Upgrade); +go(DbName, DDoc, IndexName, Upgrade) -> case nouveau_util:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> - go(DbName, DDoc, IndexName, Index); + go(DbName, DDoc, IndexName, Index, Upgrade); {error, Reason} -> {error, Reason} end. -go(DbName, _DDoc, _IndexName, Index) -> +go(DbName, _DDoc, _IndexName, Index, Upgrade) -> Shards = mem3:shards(DbName), Counters0 = lists:map( fun(#shard{} = Shard) -> Ref = rexi:cast( Shard#shard.node, - {nouveau_rpc, info, [Shard#shard.name, Index]} + {nouveau_rpc, info, [Shard#shard.name, Index, Upgrade]} ), Shard#shard{ref = Ref} end, diff --git a/src/nouveau/src/nouveau_httpd.erl b/src/nouveau/src/nouveau_httpd.erl index 878e001b7..6ae470ade 100644 --- a/src/nouveau/src/nouveau_httpd.erl +++ b/src/nouveau/src/nouveau_httpd.erl @@ -75,7 +75,8 @@ handle_search_req_int(#httpd{method = 'GET', path_parts = [_, _, _, _, IndexName counts => chttpd:qs_value(Req, "counts"), update => chttpd:qs_value(Req, "update"), bookmark => chttpd:qs_value(Req, "bookmark"), - include_docs => chttpd:qs_value(Req, "include_docs") + include_docs => chttpd:qs_value(Req, "include_docs"), + upgrade => chttpd:qs_value(Req, "upgrade") }), handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT); handle_search_req_int( @@ -95,7 +96,8 @@ handle_search_req_int( counts => json_or_undefined(<<"counts">>, ReqBody), update => maps:get(<<"update">>, ReqBody, undefined), bookmark => maps:get(<<"bookmark">>, ReqBody, undefined), - include_docs => maps:get(<<"include_docs">>, ReqBody, undefined) + include_docs => maps:get(<<"include_docs">>, ReqBody, undefined), + upgrade => maps:get(<<"upgrade">>, ReqBody, undefined) }), handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT); handle_search_req_int(Req, _Db, _DDoc) -> @@ -136,7 +138,8 @@ handle_info_req( ) -> check_if_enabled(), DbName = couch_db:name(Db), - case nouveau_fabric_info:go(DbName, DDoc, IndexName) of + Upgrade = chttpd:qs_value(Req, "upgrade") == "true", + case nouveau_fabric_info:go(DbName, DDoc, IndexName, Upgrade) of {ok, IndexInfo} -> send_json( Req, @@ -260,6 +263,12 @@ validate_query_arg(include_docs, "false") -> false; validate_query_arg(include_docs, "true") -> true; +validate_query_arg(upgrade, undefined) -> + false; +validate_query_arg(upgrade, "false") -> + false; +validate_query_arg(upgrade, "true") -> + true; validate_query_arg(Key, Val) -> Msg = io_lib:format("Invalid value for ~p: ~p", [Key, Val]), throw({query_parse_error, ?l2b(Msg)}). diff --git a/src/nouveau/src/nouveau_index_manager.erl b/src/nouveau/src/nouveau_index_manager.erl index 45f7a1e8d..2aa3b704f 100644 --- a/src/nouveau/src/nouveau_index_manager.erl +++ b/src/nouveau/src/nouveau_index_manager.erl @@ -24,7 +24,7 @@ %% public api -export([ - update_index/1 + update_index/2 ]). %% gen_server bits @@ -44,10 +44,10 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -update_index(#index{} = Index) -> +update_index(#index{} = Index, Upgrade) when is_boolean(Upgrade) -> case nouveau:enabled() of true -> - gen_server:call(?MODULE, {update, Index}, infinity); + gen_server:call(?MODULE, {update, Index, Upgrade}, infinity); false -> {error, nouveau_not_enabled} end. @@ -59,16 +59,19 @@ init(_) -> couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), {ok, nil}. -handle_call({update, #index{} = Index0}, From, State) -> +handle_call({update, #index{} = Index0, Upgrade}, From, State) when is_boolean(Upgrade) -> DbSig = {Index0#index.dbname, Index0#index.sig}, - case ets:lookup(?BY_DBSIG, DbSig) of + SigKey = {DbSig, Upgrade}, + case ets:lookup(?BY_DBSIG, SigKey) of [] -> - {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [Index0]), + {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [ + Index0, Upgrade + ]), Queue = queue:in(From, queue:new()), - true = ets:insert(?BY_DBSIG, {DbSig, Index0, Queue}), - true = ets:insert(?BY_REF, {IndexerRef, DbSig}); + true = ets:insert(?BY_DBSIG, {SigKey, Index0, Queue}), + true = ets:insert(?BY_REF, {IndexerRef, SigKey}); [{_DbSig, Index1, Queue}] -> - ets:insert(?BY_DBSIG, {DbSig, Index1, queue:in(From, Queue)}) + ets:insert(?BY_DBSIG, {SigKey, Index1, queue:in(From, Queue)}) end, {noreply, State}; handle_call(_Msg, _From, State) -> @@ -82,9 +85,9 @@ handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, State) -> [] -> % not one of ours, somehow... {noreply, State}; - [{_, DbSig}] -> + [{_, SigKey}] -> true = ets:delete(?BY_REF, IndexerRef), - [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, DbSig), + [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, SigKey), {{value, From}, Queue1} = queue:out(Queue0), if Reason /= ok -> @@ -104,13 +107,13 @@ handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, State) -> gen_server:reply(From, Reason), case queue:is_empty(Queue1) of true -> - true = ets:delete(?BY_DBSIG, DbSig); + true = ets:delete(?BY_DBSIG, SigKey); false -> {_IndexerPid, NewIndexerRef} = spawn_monitor(nouveau_index_updater, update, [ Index ]), - true = ets:insert(?BY_DBSIG, {DbSig, Index, Queue1}), - true = ets:insert(?BY_REF, {NewIndexerRef, DbSig}) + true = ets:insert(?BY_DBSIG, {SigKey, Index, Queue1}), + true = ets:insert(?BY_REF, {NewIndexerRef, SigKey}) end, {noreply, State} end; diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 3952a893f..a222cee6c 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -18,10 +18,10 @@ -include("nouveau.hrl"). %% public api --export([outdated/1, get_db_info/1]). +-export([get_db_info/1]). %% callbacks --export([update/1]). +-export([update/2]). -import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). -import(nouveau_util, [index_path/1]). @@ -29,6 +29,7 @@ -record(acc, { db, index, + upgrade, proc, changes_done, total_changes, @@ -42,20 +43,10 @@ index_purge_seq }). -outdated(#index{} = Index) -> - case open_or_create_index(Index) of - {ok, #{} = Info} -> - #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info, - {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index), - DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq; - {error, Reason} -> - {error, Reason} - end. - -update(#index{} = Index) -> +update(#index{} = Index, Upgrade) when is_boolean(Upgrade) -> {ok, Db} = couch_db:open_int(Index#index.dbname, []), try - case open_or_create_index(Db, Index) of + case open_or_create_index(Db, Index, Upgrade) of {error, Reason} -> exit({error, Reason}); {ok, #{} = Info} -> @@ -70,7 +61,8 @@ update(#index{} = Index) -> {index, Index#index.name}, {progress, 0}, {changes_done, 0}, - {total_changes, TotalChanges} + {total_changes, TotalChanges}, + {upgrading, Upgrade} ]), %% update status every half second @@ -80,7 +72,7 @@ update(#index{} = Index) -> index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), + {ok, PurgeAcc1} = purge_index(Db, Index, Upgrade, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), @@ -90,6 +82,7 @@ update(#index{} = Index) -> Acc0 = #acc{ db = Db, index = Index, + upgrade = Upgrade, proc = Proc, changes_done = 0, total_changes = TotalChanges, @@ -99,7 +92,7 @@ update(#index{} = Index) -> {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), - exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) + exit(nouveau_api:set_update_seq(Index, Upgrade, Acc1#acc.update_seq, NewCurSeq)) after ret_os_process(Proc) end @@ -127,6 +120,7 @@ load_docs(FDI, #acc{} = Acc1) -> update_or_delete_index( Acc1#acc.db, Acc1#acc.index, + Acc1#acc.upgrade, Acc1#acc.update_seq, DI, Acc1#acc.proc @@ -142,11 +136,11 @@ load_docs(FDI, #acc{} = Acc1) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(Db, #index{} = Index, Upgrade, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(Index, Upgrade, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,22 +154,22 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(Index, Upgrade, Id, MatchSeq, Seq); _ -> nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields + Index, Upgrade, Id, MatchSeq, Seq, Partition, Fields ) end end. -open_or_create_index(#index{} = Index) -> - case nouveau_api:index_info(Index) of +open_or_create_index(#index{} = Index, Upgrade) -> + case nouveau_api:index_info(Index, Upgrade) of {ok, #{} = Info} -> {ok, Info}; {error, {not_found, _}} -> case nouveau_api:create_index(Index, index_definition(Index)) of ok -> - nouveau_api:index_info(Index); + nouveau_api:index_info(Index, upgrade); {error, Reason} -> {error, Reason} end; @@ -183,8 +177,8 @@ open_or_create_index(#index{} = Index) -> {error, Reason} end. -open_or_create_index(Db, #index{} = Index) -> - case open_or_create_index(Index) of +open_or_create_index(Db, #index{} = Index, Upgrade) -> + case open_or_create_index(Index, Upgrade) of {ok, #{} = Info} -> nouveau_util:maybe_create_local_purge_doc(Db, Index), {ok, Info}; @@ -208,7 +202,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(Db, Index, Upgrade, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -217,7 +211,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> ok = nouveau_api:purge_doc( - Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + Index, Upgrade, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; FDI -> @@ -230,6 +224,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> update_or_delete_index( Db, Index, + Upgrade, PurgeAcc1#purge_acc.index_update_seq, DI, Proc @@ -249,7 +244,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> ), DbPurgeSeq = couch_db:get_purge_seq(Db), ok = nouveau_api:set_purge_seq( - Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq + Index, Upgrade, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq ), update_local_doc(Db, Index, DbPurgeSeq), {ok, PurgeAcc3} diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl index b7e0eb509..e325eddc7 100644 --- a/src/nouveau/src/nouveau_rpc.erl +++ b/src/nouveau/src/nouveau_rpc.erl @@ -17,7 +17,7 @@ -export([ search/3, - info/2, + info/3, cleanup/2 ]). @@ -63,8 +63,9 @@ search(DbName, #index{} = Index0, QueryArgs0, UpdateLatency) -> end. update_and_retry(DbName, Index, QueryArgs, UpdateLatency) -> + Upgrade = maps:get(upgrade, QueryArgs, false), T0 = erlang:monotonic_time(), - case nouveau_index_manager:update_index(Index#index{dbname = DbName}) of + case nouveau_index_manager:update_index(Index#index{dbname = DbName}, Upgrade) of ok -> T1 = erlang:monotonic_time(), search( @@ -78,10 +79,10 @@ update_and_retry(DbName, Index, QueryArgs, UpdateLatency) -> rexi:reply(Else) end. -info(DbName, #index{} = Index0) -> +info(DbName, #index{} = Index0, Upgrade) -> %% Incorporate the shard name into the record. Index1 = Index0#index{dbname = DbName}, - case nouveau_api:index_info(Index1) of + case nouveau_api:index_info(Index1, Upgrade) of {ok, Info0} -> Info1 = Info0#{signature => Index0#index.sig}, rexi:reply({ok, Info1});
