This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch lucene-10-again
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 30f66fb604694469657ec563aaa247ed36d078ce
Author: Robert Newson <[email protected]>
AuthorDate: Mon Aug 18 17:04:11 2025 +0100

    Allow ?upgrade=true on search to trigger upgrade
    
    TODO ugly code, don't like Upgrade arg plastered all over the place.
---
 .../org/apache/couchdb/nouveau/core/Index.java     |   8 ++
 .../apache/couchdb/nouveau/core/IndexManager.java  | 103 ++++++++++++++-------
 .../couchdb/nouveau/health/IndexHealthCheck.java   |   4 +-
 .../couchdb/nouveau/resources/IndexResource.java   |  27 ++++--
 .../couchdb/nouveau/lucene/LuceneIndexTest.java    |   2 +-
 src/nouveau/src/nouveau_api.erl                    |  69 ++++++++------
 src/nouveau/src/nouveau_fabric_info.erl            |  14 +--
 src/nouveau/src/nouveau_httpd.erl                  |  15 ++-
 src/nouveau/src/nouveau_index_manager.erl          |  31 ++++---
 src/nouveau/src/nouveau_index_updater.erl          |  53 +++++------
 src/nouveau/src/nouveau_rpc.erl                    |   9 +-
 11 files changed, 207 insertions(+), 128 deletions(-)

diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index b8a03e1d1..2e88241e6 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -52,6 +52,14 @@ public abstract class Index implements Closeable {
         return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize, 
upgradeRequired);
     }
 
+    public synchronized long getUpdateSeq() {
+        return updateSeq;
+    }
+
+    public synchronized long getPurgeSeq() {
+        return purgeSeq;
+    }
+
     protected abstract int doNumDocs() throws IOException;
 
     protected abstract long doDiskSize() throws IOException;
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
index da185aa2a..b5acaf993 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
@@ -38,6 +38,7 @@ import org.apache.couchdb.nouveau.api.IndexDefinition;
 import org.apache.couchdb.nouveau.lucene.LuceneAnalyzerFactory;
 import org.apache.couchdb.nouveau.lucene.LuceneIndex;
 import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@@ -78,7 +79,8 @@ public final class IndexManager implements Managed {
         private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
         private ScheduledFuture<?> commitFuture;
         private HolderState state = HolderState.NOT_LOADED;
-        private Index index;
+        private Index latestIndex;
+        private Index legacyIndex;
     }
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexManager.class);
@@ -104,7 +106,7 @@ public final class IndexManager implements Managed {
 
     private StripedLock<String> createLock;
 
-    public <R> R with(final String name, final IndexFunction<Index, R> 
indexFun)
+    public <R> R with(final String name, final boolean upgrade, final 
IndexFunction<Index, R> indexFun)
             throws IOException, InterruptedException {
         evictIfOverCapacity();
 
@@ -123,21 +125,28 @@ public final class IndexManager implements Managed {
 
             // Load if not already loaded or remove if Lucene closed the index 
elsewhere.
             if (holder.state == HolderState.NOT_LOADED
-                    || (holder.state == HolderState.LOADED && 
!holder.index.isOpen())) {
+                    || (holder.state == HolderState.LOADED
+                            && (!holder.latestIndex.isOpen()
+                                    || (holder.legacyIndex != null && 
!holder.legacyIndex.isOpen())))) {
                 holder.lock.readLock().unlock();
                 holder.lock.writeLock().lock();
                 try {
-                    if (holder.state == HolderState.LOADED && 
!holder.index.isOpen()) {
+                    if (holder.state == HolderState.LOADED
+                            && (holder.legacyIndex != null && 
!holder.legacyIndex.isOpen())) {
                         LOGGER.info("removing closed index {}", name);
                         holder.state = HolderState.UNLOADED;
-                        holder.index = null;
+                        if (holder.latestIndex.isOpen() || 
holder.legacyIndex.isOpen()) {
+                            close(name, holder);
+                        }
+                        holder.latestIndex = null;
+                        holder.legacyIndex = null;
                         synchronized (cache) {
                             cache.remove(name, holder);
                         }
                         continue retry;
                     }
                     if (holder.state == HolderState.NOT_LOADED) {
-                        holder.index = load(name);
+                        load(name, holder);
                         holder.commitFuture = 
this.schedulerExecutorService.scheduleWithFixedDelay(
                                 commitFun(name, holder),
                                 commitIntervalSeconds,
@@ -145,6 +154,12 @@ public final class IndexManager implements Managed {
                                 TimeUnit.SECONDS);
                         holder.state = HolderState.LOADED;
                     }
+                    if (holder.state == HolderState.LOADED && 
upgradeComplete(holder)) {
+                        LOGGER.info("Upgrade of {} complete, removing legacy 
index", name);
+                        holder.legacyIndex.setDeleteOnClose(true);
+                        holder.legacyIndex.close();
+                        holder.legacyIndex = null;
+                    }
                     holder.lock.readLock().lock();
                 } finally {
                     holder.lock.writeLock().unlock();
@@ -159,7 +174,11 @@ public final class IndexManager implements Managed {
                         Thread.sleep(1000);
                         continue retry;
                     case LOADED:
-                        return indexFun.apply(holder.index);
+                        // Use latest index if upgrading or legacy index if it 
exists if not.
+                        var index = upgrade
+                                ? holder.latestIndex
+                                : holder.legacyIndex != null ? 
holder.legacyIndex : holder.latestIndex;
+                        return indexFun.apply(index);
                 }
             } finally {
                 holder.lock.readLock().unlock();
@@ -167,6 +186,14 @@ public final class IndexManager implements Managed {
         }
     }
 
+    private boolean upgradeComplete(final IndexHolder holder) throws 
IOException {
+        if (holder.legacyIndex == null) {
+            return false;
+        }
+        return holder.legacyIndex.getUpdateSeq() >= 
holder.latestIndex.getUpdateSeq()
+                && holder.legacyIndex.getPurgeSeq() >= 
holder.latestIndex.getPurgeSeq();
+    }
+
     private void evictIfOverCapacity() throws IOException, 
InterruptedException {
         while (true) {
             final String candidate;
@@ -190,7 +217,7 @@ public final class IndexManager implements Managed {
             switch (holder.state) {
                 case LOADED:
                     if (forceDelete) {
-                        holder.index.setDeleteOnClose(true);
+                        holder.latestIndex.setDeleteOnClose(true);
                     }
                     LOGGER.info("closing {}", name);
                     try {
@@ -199,7 +226,8 @@ public final class IndexManager implements Managed {
                         LOGGER.error("I/O exception when evicting {}", name, 
e);
                     }
                     holder.state = HolderState.UNLOADED;
-                    holder.index = null;
+                    holder.latestIndex = null;
+                    holder.legacyIndex = null;
                     break;
                 case NOT_LOADED:
                 case UNLOADED:
@@ -331,8 +359,11 @@ public final class IndexManager implements Managed {
         return () -> {
             holder.lock.readLock().lock();
             try {
-                if (holder.index.commit()) {
-                    LOGGER.info("committed {}", name);
+                if (holder.latestIndex.commit()) {
+                    LOGGER.info("committed {} (latest)", name);
+                }
+                if (holder.legacyIndex != null && holder.legacyIndex.commit()) 
{
+                    LOGGER.info("committed {} (legacy)", name);
                 }
             } catch (final IOException e) {
                 LOGGER.warn("I/O exception while committing " + name, e);
@@ -377,14 +408,31 @@ public final class IndexManager implements Managed {
         throw new WebApplicationException(name + " attempts to escape from 
index root directory", Status.BAD_REQUEST);
     }
 
-    private Index load(final String name) throws IOException {
+    private void load(final String name, final IndexHolder holder) throws 
IOException {
         LOGGER.info("opening {}", name);
         final Path path = indexPath(name);
         final IndexDefinition indexDefinition = loadIndexDefinition(name);
         final Analyzer analyzer = 
LuceneAnalyzerFactory.fromDefinition(indexDefinition);
-        final int version = getIndexVersion(path);
+        holder.latestIndex = load(name, Version.LATEST.major, analyzer);
+        if (indexExists(path.resolve(Integer.toString(Version.LATEST.major - 
1)))) {
+            LOGGER.warn("{} has a legacy index", name);
+            holder.legacyIndex = load(name, Version.LATEST.major - 1, 
analyzer);
+        }
+    }
+
+    // Same logic as DirectoryReader.indexExists() but without needing a 
Directory.
+    private boolean indexExists(final Path path) throws IOException {
+        if (!Files.exists(path)) {
+            return false;
+        }
+        try (var stream = Files.walk(path, 1)) {
+            return stream.anyMatch(f -> 
f.getFileName().toString().startsWith(IndexFileNames.SEGMENTS + "_"));
+        }
+    }
+
+    private LuceneIndex load(final String name, final int version, final 
Analyzer analyzer) throws IOException {
+        final Path indexPath = 
indexPath(name).resolve(Integer.toString(version));
         final boolean upgradeRequired = version < Version.LATEST.major;
-        final Path indexPath = path.resolve(Integer.toString(version));
         final Directory dir = new 
DirectIODirectory(FSDirectory.open(indexPath));
         final IndexWriterConfig config = new IndexWriterConfig(analyzer);
         config.setOpenMode(upgradeRequired ? OpenMode.APPEND : 
OpenMode.CREATE_OR_APPEND);
@@ -396,19 +444,6 @@ public final class IndexManager implements Managed {
         return new LuceneIndex(analyzer, writer, updateSeq, purgeSeq, 
upgradeRequired, searcherManager);
     }
 
-    /**
-     * Find highest version index on disk, or latest version if none.
-     */
-    private int getIndexVersion(final Path path) throws IOException {
-        if 
(Files.exists(path.resolve(Integer.toString(Version.LATEST.major)))) {
-            return Version.LATEST.major;
-        }
-        if (Files.exists(path.resolve(Integer.toString(Version.LATEST.major - 
1)))) {
-            return Version.LATEST.major - 1;
-        }
-        return Version.LATEST.major;
-    }
-
     private long getSeq(final IndexWriter writer, final String key) throws 
IOException {
         final Iterable<Map.Entry<String, String>> commitData = 
writer.getLiveCommitData();
         if (commitData == null) {
@@ -429,15 +464,21 @@ public final class IndexManager implements Managed {
         holder.commitFuture.cancel(true);
         IOUtils.runAll(
                 () -> {
-                    if (holder.index.commit()) {
-                        LOGGER.debug("committed {} before close", name);
+                    if (holder.latestIndex.commit()) {
+                        LOGGER.debug("committed {} (latest) before close", 
name);
+                    }
+                    if (holder.legacyIndex != null && 
holder.legacyIndex.commit()) {
+                        LOGGER.debug("committed {} (legacy) before close", 
name);
                     }
                 },
                 () -> {
-                    holder.index.close();
+                    holder.latestIndex.close();
+                    if (holder.legacyIndex != null) {
+                        holder.legacyIndex.close();
+                    }
                 },
                 () -> {
-                    if (holder.index.isDeleteOnClose()) {
+                    if (holder.latestIndex.isDeleteOnClose()) {
                         IOUtils.rm(indexRootPath(name));
                     }
                 });
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
index f20efe0c9..64733eb20 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java
@@ -43,13 +43,13 @@ public final class IndexHealthCheck extends HealthCheck {
         try {
             final DocumentUpdateRequest documentUpdateRequest =
                     new DocumentUpdateRequest(0, 1, null, 
Collections.emptyList());
-            indexResource.updateDoc(name, "foo", documentUpdateRequest);
+            indexResource.updateDoc(name, true, "foo", documentUpdateRequest);
 
             final SearchRequest searchRequest = new SearchRequest();
             searchRequest.setQuery("_id:foo");
             searchRequest.setMinUpdateSeq(1);
 
-            final SearchResults searchResults = 
indexResource.searchIndex(name, searchRequest);
+            final SearchResults searchResults = 
indexResource.searchIndex(name, true, searchRequest);
             if (searchResults.getTotalHits() == 1) {
                 return Result.healthy();
             } else {
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index a52e00da9..d66220496 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -26,6 +26,7 @@ import jakarta.ws.rs.PUT;
 import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.List;
@@ -65,10 +66,11 @@ public final class IndexResource {
     @Path("/doc/{docId}")
     public Ok deleteDoc(
             @PathParam("name") String name,
+            @QueryParam("upgrade") boolean upgrade,
             @PathParam("docId") String docId,
             @NotNull @Valid DocumentDeleteRequest request)
             throws Exception {
-        return indexManager.with(name, (index) -> {
+        return indexManager.with(name, upgrade, (index) -> {
             index.delete(docId, request);
             return Ok.INSTANCE;
         });
@@ -81,15 +83,20 @@ public final class IndexResource {
     }
 
     @GET
-    public IndexInfo getIndexInfo(@PathParam("name") String name) throws 
Exception {
-        return indexManager.with(name, (index) -> {
+    public IndexInfo getIndexInfo(@PathParam("name") String name, 
@QueryParam("upgrade") boolean upgrade)
+            throws Exception {
+        return indexManager.with(name, upgrade, (index) -> {
             return index.info();
         });
     }
 
     @POST
-    public Ok setIndexInfo(@PathParam("name") String name, @NotNull @Valid 
IndexInfoRequest request) throws Exception {
-        return indexManager.with(name, (index) -> {
+    public Ok setIndexInfo(
+            @PathParam("name") String name,
+            @QueryParam("upgrade") boolean upgrade,
+            @NotNull @Valid IndexInfoRequest request)
+            throws Exception {
+        return indexManager.with(name, upgrade, (index) -> {
             if (request.getMatchUpdateSeq().isPresent()
                     && request.getUpdateSeq().isPresent()) {
                 index.setUpdateSeq(
@@ -107,9 +114,12 @@ public final class IndexResource {
 
     @POST
     @Path("/search")
-    public SearchResults searchIndex(@PathParam("name") String name, @NotNull 
@Valid SearchRequest request)
+    public SearchResults searchIndex(
+            @PathParam("name") String name,
+            @QueryParam("upgrade") boolean upgrade,
+            @NotNull @Valid SearchRequest request)
             throws Exception {
-        return indexManager.with(name, (index) -> {
+        return indexManager.with(name, upgrade, (index) -> {
             return index.search(request);
         });
     }
@@ -118,10 +128,11 @@ public final class IndexResource {
     @Path("/doc/{docId}")
     public Ok updateDoc(
             @PathParam("name") String name,
+            @QueryParam("upgrade") boolean upgrade,
             @PathParam("docId") String docId,
             @NotNull @Valid DocumentUpdateRequest request)
             throws Exception {
-        return indexManager.with(name, (index) -> {
+        return indexManager.with(name, upgrade, (index) -> {
             index.update(docId, request);
             return Ok.INSTANCE;
         });
diff --git 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
index 9206b83d9..10dd599e8 100644
--- 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
+++ 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java
@@ -56,7 +56,7 @@ public class LuceneIndexTest {
         config.setUseCompoundFile(false);
         final IndexWriter writer = new IndexWriter(dir, config);
         final SearcherManager searcherManager = new SearcherManager(writer, 
null);
-        return new LuceneIndex(analyzer, writer, 0L, 0L, searcherManager);
+        return new LuceneIndex(analyzer, writer, 0L, 0L, false, 
searcherManager);
     }
 
     protected final void cleanup(final Index index) throws IOException {
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 319170a4c..0cc571362 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -19,16 +19,16 @@
 
 -export([
     analyze/2,
-    index_info/1,
+    index_info/2,
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc/4,
-    purge_doc/4,
-    update_doc/6,
+    delete_doc/5,
+    purge_doc/5,
+    update_doc/7,
     search/2,
-    set_purge_seq/3,
-    set_update_seq/3,
+    set_purge_seq/4,
+    set_update_seq/4,
     jaxrs_error/2
 ]).
 
@@ -56,8 +56,8 @@ analyze(Text, Analyzer) when
 analyze(_, _) ->
     {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty 
strings">>}}.
 
-index_info(#index{} = Index) ->
-    Resp = send_if_enabled(index_path(Index), [], <<"GET">>),
+index_info(#index{} = Index, Upgrade) ->
+    Resp = send_if_enabled(index_path(Index, Upgrade), [], <<"GET">>),
     case Resp of
         {ok, 200, _, RespBody} ->
             {ok, jiffy:decode(RespBody, [return_maps])};
@@ -98,8 +98,9 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
+delete_doc(#index{} = Index, Upgrade, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
+    is_boolean(Upgrade),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(UpdateSeq),
@@ -107,7 +108,7 @@ delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) 
when
 ->
     ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
     Resp = send_if_enabled(
-        doc_path(Index, DocId),
+        doc_path(Index, Upgrade, DocId),
         [?JSON_CONTENT_TYPE],
         <<"DELETE">>,
         jiffy:encode(ReqBody)
@@ -121,8 +122,9 @@ delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) 
when
             send_error(Reason)
     end.
 
-purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
+purge_doc(#index{} = Index, Upgrade, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
+    is_boolean(Upgrade),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(PurgeSeq),
@@ -130,7 +132,7 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
 ->
     ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
     Resp = send_if_enabled(
-        doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(ReqBody)
+        doc_path(Index, Upgrade, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
         {ok, 200, _, _} ->
@@ -141,8 +143,9 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
+update_doc(#index{} = Index, Upgrade, DocId, MatchSeq, UpdateSeq, Partition, 
Fields) when
     is_binary(DocId),
+    is_boolean(Upgrade),
     is_integer(MatchSeq),
     MatchSeq >= 0,
     is_integer(UpdateSeq),
@@ -157,7 +160,7 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
         fields => Fields
     },
     Resp = send_if_enabled(
-        doc_path(Index, DocId),
+        doc_path(Index, Upgrade, DocId),
         [?JSON_CONTENT_TYPE],
         <<"PUT">>,
         jiffy:encode(ReqBody)
@@ -171,9 +174,10 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
             send_error(Reason)
     end.
 
-search(#index{} = Index, QueryArgs) ->
+search(#index{} = Index, #{} = QueryArgs) ->
+    Upgrade = maps:get(upgrade, QueryArgs, false),
     Resp = send_if_enabled(
-        search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(QueryArgs)
+        search_path(Index, Upgrade), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(QueryArgs)
     ),
     case Resp of
         {ok, 200, _, RespBody} ->
@@ -187,23 +191,23 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
+set_update_seq(#index{} = Index, Upgrade, MatchSeq, UpdateSeq) ->
     ReqBody = #{
         match_update_seq => MatchSeq,
         update_seq => UpdateSeq
     },
-    set_seq(Index, ReqBody).
+    set_seq(Index, Upgrade, ReqBody).
 
-set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
+set_purge_seq(#index{} = Index, Upgrade, MatchSeq, PurgeSeq) ->
     ReqBody = #{
         match_purge_seq => MatchSeq,
         purge_seq => PurgeSeq
     },
-    set_seq(Index, ReqBody).
+    set_seq(Index, Upgrade, ReqBody).
 
-set_seq(#index{} = Index, ReqBody) ->
+set_seq(#index{} = Index, Upgrade, ReqBody) ->
     Resp = send_if_enabled(
-        index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(ReqBody)
+        index_path(Index, Upgrade), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
         {ok, 200, _, _} ->
@@ -217,20 +221,27 @@ set_seq(#index{} = Index, ReqBody) ->
 %% private functions
 
 index_path(Path) when is_binary(Path) ->
-    [<<"/index/">>, couch_util:url_encode(Path)];
-index_path(#index{} = Index) ->
-    [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))].
+    [<<"/index/">>, couch_util:url_encode(Path)].
 
-doc_path(#index{} = Index, DocId) ->
+index_path(#index{} = Index, false) ->
+    [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))];
+index_path(#index{} = Index, true) ->
+    [index_path(Index, false), <<"?upgrade=true">>].
+
+doc_path(#index{} = Index, false, DocId) ->
     [
         <<"/index/">>,
         couch_util:url_encode(nouveau_util:index_name(Index)),
         <<"/doc/">>,
         couch_util:url_encode(DocId)
-    ].
+    ];
+doc_path(#index{} = Index, true, DocId) ->
+    [doc_path(Index, false, DocId), <<"?upgrade=true">>].
 
-search_path(#index{} = Index) ->
-    [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index)), 
<<"/search">>].
+search_path(#index{} = Index, false) ->
+    [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index)), 
<<"/search">>];
+search_path(#index{} = Index, true) ->
+    [search_path(Index, false), <<"?upgrade=true">>].
 
 jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
diff --git a/src/nouveau/src/nouveau_fabric_info.erl 
b/src/nouveau/src/nouveau_fabric_info.erl
index 45bdf177e..50c38744a 100644
--- a/src/nouveau/src/nouveau_fabric_info.erl
+++ b/src/nouveau/src/nouveau_fabric_info.erl
@@ -15,28 +15,28 @@
 
 -module(nouveau_fabric_info).
 
--export([go/3]).
+-export([go/4]).
 
 -include_lib("mem3/include/mem3.hrl").
 
-go(DbName, DDocId, IndexName) when is_binary(DDocId) ->
+go(DbName, DDocId, IndexName, Upgrade) when is_binary(DDocId) ->
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", DDocId/binary>>, 
[ejson_body]),
-    go(DbName, DDoc, IndexName);
-go(DbName, DDoc, IndexName) ->
+    go(DbName, DDoc, IndexName, Upgrade);
+go(DbName, DDoc, IndexName, Upgrade) ->
     case nouveau_util:design_doc_to_index(DbName, DDoc, IndexName) of
         {ok, Index} ->
-            go(DbName, DDoc, IndexName, Index);
+            go(DbName, DDoc, IndexName, Index, Upgrade);
         {error, Reason} ->
             {error, Reason}
     end.
 
-go(DbName, _DDoc, _IndexName, Index) ->
+go(DbName, _DDoc, _IndexName, Index, Upgrade) ->
     Shards = mem3:shards(DbName),
     Counters0 = lists:map(
         fun(#shard{} = Shard) ->
             Ref = rexi:cast(
                 Shard#shard.node,
-                {nouveau_rpc, info, [Shard#shard.name, Index]}
+                {nouveau_rpc, info, [Shard#shard.name, Index, Upgrade]}
             ),
             Shard#shard{ref = Ref}
         end,
diff --git a/src/nouveau/src/nouveau_httpd.erl 
b/src/nouveau/src/nouveau_httpd.erl
index 878e001b7..6ae470ade 100644
--- a/src/nouveau/src/nouveau_httpd.erl
+++ b/src/nouveau/src/nouveau_httpd.erl
@@ -75,7 +75,8 @@ handle_search_req_int(#httpd{method = 'GET', path_parts = [_, 
_, _, _, IndexName
         counts => chttpd:qs_value(Req, "counts"),
         update => chttpd:qs_value(Req, "update"),
         bookmark => chttpd:qs_value(Req, "bookmark"),
-        include_docs => chttpd:qs_value(Req, "include_docs")
+        include_docs => chttpd:qs_value(Req, "include_docs"),
+        upgrade => chttpd:qs_value(Req, "upgrade")
     }),
     handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT);
 handle_search_req_int(
@@ -95,7 +96,8 @@ handle_search_req_int(
         counts => json_or_undefined(<<"counts">>, ReqBody),
         update => maps:get(<<"update">>, ReqBody, undefined),
         bookmark => maps:get(<<"bookmark">>, ReqBody, undefined),
-        include_docs => maps:get(<<"include_docs">>, ReqBody, undefined)
+        include_docs => maps:get(<<"include_docs">>, ReqBody, undefined),
+        upgrade => maps:get(<<"upgrade">>, ReqBody, undefined)
     }),
     handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT);
 handle_search_req_int(Req, _Db, _DDoc) ->
@@ -136,7 +138,8 @@ handle_info_req(
 ) ->
     check_if_enabled(),
     DbName = couch_db:name(Db),
-    case nouveau_fabric_info:go(DbName, DDoc, IndexName) of
+    Upgrade = chttpd:qs_value(Req, "upgrade") == "true",
+    case nouveau_fabric_info:go(DbName, DDoc, IndexName, Upgrade) of
         {ok, IndexInfo} ->
             send_json(
                 Req,
@@ -260,6 +263,12 @@ validate_query_arg(include_docs, "false") ->
     false;
 validate_query_arg(include_docs, "true") ->
     true;
+validate_query_arg(upgrade, undefined) ->
+    false;
+validate_query_arg(upgrade, "false") ->
+    false;
+validate_query_arg(upgrade, "true") ->
+    true;
 validate_query_arg(Key, Val) ->
     Msg = io_lib:format("Invalid value for ~p: ~p", [Key, Val]),
     throw({query_parse_error, ?l2b(Msg)}).
diff --git a/src/nouveau/src/nouveau_index_manager.erl 
b/src/nouveau/src/nouveau_index_manager.erl
index 45f7a1e8d..2aa3b704f 100644
--- a/src/nouveau/src/nouveau_index_manager.erl
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -24,7 +24,7 @@
 
 %% public api
 -export([
-    update_index/1
+    update_index/2
 ]).
 
 %% gen_server bits
@@ -44,10 +44,10 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-update_index(#index{} = Index) ->
+update_index(#index{} = Index, Upgrade) when is_boolean(Upgrade) ->
     case nouveau:enabled() of
         true ->
-            gen_server:call(?MODULE, {update, Index}, infinity);
+            gen_server:call(?MODULE, {update, Index, Upgrade}, infinity);
         false ->
             {error, nouveau_not_enabled}
     end.
@@ -59,16 +59,19 @@ init(_) ->
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
     {ok, nil}.
 
-handle_call({update, #index{} = Index0}, From, State) ->
+handle_call({update, #index{} = Index0, Upgrade}, From, State) when 
is_boolean(Upgrade) ->
     DbSig = {Index0#index.dbname, Index0#index.sig},
-    case ets:lookup(?BY_DBSIG, DbSig) of
+    SigKey = {DbSig, Upgrade},
+    case ets:lookup(?BY_DBSIG, SigKey) of
         [] ->
-            {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, 
update, [Index0]),
+            {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, 
update, [
+                Index0, Upgrade
+            ]),
             Queue = queue:in(From, queue:new()),
-            true = ets:insert(?BY_DBSIG, {DbSig, Index0, Queue}),
-            true = ets:insert(?BY_REF, {IndexerRef, DbSig});
+            true = ets:insert(?BY_DBSIG, {SigKey, Index0, Queue}),
+            true = ets:insert(?BY_REF, {IndexerRef, SigKey});
         [{_DbSig, Index1, Queue}] ->
-            ets:insert(?BY_DBSIG, {DbSig, Index1, queue:in(From, Queue)})
+            ets:insert(?BY_DBSIG, {SigKey, Index1, queue:in(From, Queue)})
     end,
     {noreply, State};
 handle_call(_Msg, _From, State) ->
@@ -82,9 +85,9 @@ handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, 
State) ->
         [] ->
             % not one of ours, somehow...
             {noreply, State};
-        [{_, DbSig}] ->
+        [{_, SigKey}] ->
             true = ets:delete(?BY_REF, IndexerRef),
-            [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, DbSig),
+            [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, SigKey),
             {{value, From}, Queue1} = queue:out(Queue0),
             if
                 Reason /= ok ->
@@ -104,13 +107,13 @@ handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, 
State) ->
             gen_server:reply(From, Reason),
             case queue:is_empty(Queue1) of
                 true ->
-                    true = ets:delete(?BY_DBSIG, DbSig);
+                    true = ets:delete(?BY_DBSIG, SigKey);
                 false ->
                     {_IndexerPid, NewIndexerRef} = 
spawn_monitor(nouveau_index_updater, update, [
                         Index
                     ]),
-                    true = ets:insert(?BY_DBSIG, {DbSig, Index, Queue1}),
-                    true = ets:insert(?BY_REF, {NewIndexerRef, DbSig})
+                    true = ets:insert(?BY_DBSIG, {SigKey, Index, Queue1}),
+                    true = ets:insert(?BY_REF, {NewIndexerRef, SigKey})
             end,
             {noreply, State}
     end;
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index 3952a893f..a222cee6c 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -18,10 +18,10 @@
 -include("nouveau.hrl").
 
 %% public api
--export([outdated/1, get_db_info/1]).
+-export([get_db_info/1]).
 
 %% callbacks
--export([update/1]).
+-export([update/2]).
 
 -import(couch_query_servers, [get_os_process/1, ret_os_process/1, 
proc_prompt/2]).
 -import(nouveau_util, [index_path/1]).
@@ -29,6 +29,7 @@
 -record(acc, {
     db,
     index,
+    upgrade,
     proc,
     changes_done,
     total_changes,
@@ -42,20 +43,10 @@
     index_purge_seq
 }).
 
-outdated(#index{} = Index) ->
-    case open_or_create_index(Index) of
-        {ok, #{} = Info} ->
-            #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := 
IndexPurgeSeq} = Info,
-            {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index),
-            DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq;
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-update(#index{} = Index) ->
+update(#index{} = Index, Upgrade) when is_boolean(Upgrade) ->
     {ok, Db} = couch_db:open_int(Index#index.dbname, []),
     try
-        case open_or_create_index(Db, Index) of
+        case open_or_create_index(Db, Index, Upgrade) of
             {error, Reason} ->
                 exit({error, Reason});
             {ok, #{} = Info} ->
@@ -70,7 +61,8 @@ update(#index{} = Index) ->
                     {index, Index#index.name},
                     {progress, 0},
                     {changes_done, 0},
-                    {total_changes, TotalChanges}
+                    {total_changes, TotalChanges},
+                    {upgrading, Upgrade}
                 ]),
 
                 %% update status every half second
@@ -80,7 +72,7 @@ update(#index{} = Index) ->
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
+                {ok, PurgeAcc1} = purge_index(Db, Index, Upgrade, PurgeAcc0),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
@@ -90,6 +82,7 @@ update(#index{} = Index) ->
                     Acc0 = #acc{
                         db = Db,
                         index = Index,
+                        upgrade = Upgrade,
                         proc = Proc,
                         changes_done = 0,
                         total_changes = TotalChanges,
@@ -99,7 +92,7 @@ update(#index{} = Index) ->
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
-                    exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
+                    exit(nouveau_api:set_update_seq(Index, Upgrade, 
Acc1#acc.update_seq, NewCurSeq))
                 after
                     ret_os_process(Proc)
                 end
@@ -127,6 +120,7 @@ load_docs(FDI, #acc{} = Acc1) ->
                     update_or_delete_index(
                         Acc1#acc.db,
                         Acc1#acc.index,
+                        Acc1#acc.upgrade,
                         Acc1#acc.update_seq,
                         DI,
                         Acc1#acc.proc
@@ -142,11 +136,11 @@ load_docs(FDI, #acc{} = Acc1) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
+update_or_delete_index(Db, #index{} = Index, Upgrade, MatchSeq, #doc_info{} = 
DI, Proc) ->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+            nouveau_api:delete_doc(Index, Upgrade, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -160,22 +154,22 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, 
#doc_info{} = DI, Proc) -
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
+                    nouveau_api:delete_doc(Index, Upgrade, Id, MatchSeq, Seq);
                 _ ->
                     nouveau_api:update_doc(
-                        Index, Id, MatchSeq, Seq, Partition, Fields
+                        Index, Upgrade, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
 
-open_or_create_index(#index{} = Index) ->
-    case nouveau_api:index_info(Index) of
+open_or_create_index(#index{} = Index, Upgrade) ->
+    case nouveau_api:index_info(Index, Upgrade) of
         {ok, #{} = Info} ->
             {ok, Info};
         {error, {not_found, _}} ->
             case nouveau_api:create_index(Index, index_definition(Index)) of
                 ok ->
-                    nouveau_api:index_info(Index);
+                    nouveau_api:index_info(Index, upgrade);
                 {error, Reason} ->
                     {error, Reason}
             end;
@@ -183,8 +177,8 @@ open_or_create_index(#index{} = Index) ->
             {error, Reason}
     end.
 
-open_or_create_index(Db, #index{} = Index) ->
-    case open_or_create_index(Index) of
+open_or_create_index(Db, #index{} = Index, Upgrade) ->
+    case open_or_create_index(Index, Upgrade) of
         {ok, #{} = Info} ->
             nouveau_util:maybe_create_local_purge_doc(Db, Index),
             {ok, Info};
@@ -208,7 +202,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(Db, Index, Upgrade, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -217,7 +211,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
                         ok = nouveau_api:purge_doc(
-                            Index, Id, PurgeAcc1#purge_acc.index_purge_seq, 
PurgeSeq
+                            Index, Upgrade, Id, 
PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
                         ),
                         PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
                     FDI ->
@@ -230,6 +224,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
                                 update_or_delete_index(
                                     Db,
                                     Index,
+                                    Upgrade,
                                     PurgeAcc1#purge_acc.index_update_seq,
                                     DI,
                                     Proc
@@ -249,7 +244,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
         ),
         DbPurgeSeq = couch_db:get_purge_seq(Db),
         ok = nouveau_api:set_purge_seq(
-            Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
+            Index, Upgrade, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
         ),
         update_local_doc(Db, Index, DbPurgeSeq),
         {ok, PurgeAcc3}
diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl
index b7e0eb509..e325eddc7 100644
--- a/src/nouveau/src/nouveau_rpc.erl
+++ b/src/nouveau/src/nouveau_rpc.erl
@@ -17,7 +17,7 @@
 
 -export([
     search/3,
-    info/2,
+    info/3,
     cleanup/2
 ]).
 
@@ -63,8 +63,9 @@ search(DbName, #index{} = Index0, QueryArgs0, UpdateLatency) 
->
     end.
 
 update_and_retry(DbName, Index, QueryArgs, UpdateLatency) ->
+    Upgrade = maps:get(upgrade, QueryArgs, false),
     T0 = erlang:monotonic_time(),
-    case nouveau_index_manager:update_index(Index#index{dbname = DbName}) of
+    case nouveau_index_manager:update_index(Index#index{dbname = DbName}, 
Upgrade) of
         ok ->
             T1 = erlang:monotonic_time(),
             search(
@@ -78,10 +79,10 @@ update_and_retry(DbName, Index, QueryArgs, UpdateLatency) ->
             rexi:reply(Else)
     end.
 
-info(DbName, #index{} = Index0) ->
+info(DbName, #index{} = Index0, Upgrade) ->
     %% Incorporate the shard name into the record.
     Index1 = Index0#index{dbname = DbName},
-    case nouveau_api:index_info(Index1) of
+    case nouveau_api:index_info(Index1, Upgrade) of
         {ok, Info0} ->
             Info1 = Info0#{signature => Index0#index.sig},
             rexi:reply({ok, Info1});


Reply via email to