This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-streaming-index-update-alt in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ab03b34f6d9e8f75ca424d11ea1d5d52be6c6770 Author: Robert Newson <[email protected]> AuthorDate: Thu Feb 19 21:45:37 2026 +0000 stream index updates in one request for performance --- .../apache/couchdb/nouveau/NouveauApplication.java | 2 +- .../couchdb/nouveau/api/DocumentDeleteRequest.java | 4 +- .../couchdb/nouveau/api/DocumentRequest.java | 38 ++++++ .../couchdb/nouveau/api/DocumentUpdateRequest.java | 4 +- .../couchdb/nouveau/health/IndexHealthCheck.java | 2 +- .../couchdb/nouveau/resources/IndexResource.java | 35 ++++- .../nouveau/health/IndexHealthCheckTest.java | 5 +- .../couchdb/nouveau/lucene/LuceneIndexTest.java | 28 ++-- src/nouveau/src/nouveau_api.erl | 150 ++++++++++++--------- src/nouveau/src/nouveau_index_updater.erl | 26 ++-- 10 files changed, 199 insertions(+), 95 deletions(-) diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java index c2230d1eb..eb886d826 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java @@ -77,7 +77,7 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat environment.jersey().register(analyzeResource); // IndexResource - final IndexResource indexResource = new IndexResource(indexManager); + final IndexResource indexResource = new IndexResource(indexManager, environment.getObjectMapper()); environment.jersey().register(indexResource); // Health checks diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 82e9b716a..ddc067a16 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; -public final class DocumentDeleteRequest { +public final class DocumentDeleteRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -28,9 +28,11 @@ public final class DocumentDeleteRequest { private final boolean purge; public DocumentDeleteRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("purge") final boolean purge) { + super(id); if (matchSeq < 0) { throw new IllegalArgumentException("matchSeq must be 0 or greater"); } diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java new file mode 100644 index 000000000..05e60988a --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java @@ -0,0 +1,38 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"), + @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"), +}) +public abstract class DocumentRequest { + + private final String id; + + protected DocumentRequest(final String id) { + this.id = id; + } + + public final String getId() { + return id; + } +} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 82c196602..7b1db0c09 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -20,7 +20,7 @@ import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; -public final class DocumentUpdateRequest { +public final class DocumentUpdateRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -35,10 +35,12 @@ public final class DocumentUpdateRequest { private final Collection<Field> fields; public DocumentUpdateRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("partition") final String partition, @JsonProperty("fields") final Collection<Field> fields) { + super(id); this.matchSeq = matchSeq; this.seq = seq; this.partition = partition; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 7e5facb2e..3a70dd7a7 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck { indexResource.createIndex(name, new IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null)); try { final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); + new DocumentUpdateRequest("foo", 0, 1, null, Collections.emptyList()); indexResource.updateDoc(name, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9ba382109..f59f29369 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -16,6 +16,8 @@ package org.apache.couchdb.nouveau.resources; import com.codahale.metrics.annotation.ExceptionMetered; import com.codahale.metrics.annotation.Metered; import com.codahale.metrics.annotation.ResponseMetered; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.Consumes; @@ -27,12 +29,14 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import java.io.IOException; import java.util.List; import java.util.Objects; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; +import org.apache.couchdb.nouveau.api.DocumentRequest; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexDefinition; import org.apache.couchdb.nouveau.api.IndexInfo; @@ -41,6 +45,8 @@ import org.apache.couchdb.nouveau.api.Ok; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.core.IndexManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/index/{name}") @Metered @@ -50,10 +56,15 @@ import org.apache.couchdb.nouveau.core.IndexManager; @Produces(MediaType.APPLICATION_JSON) public final class IndexResource { + private static final Logger LOGGER = LoggerFactory.getLogger(IndexResource.class); + private final IndexManager indexManager; - public IndexResource(final IndexManager indexManager) { + private final ObjectMapper objectMapper; + + public IndexResource(final IndexManager indexManager, final ObjectMapper objectMapper) { this.indexManager = Objects.requireNonNull(indexManager); + this.objectMapper = Objects.requireNonNull(objectMapper); } @PUT @@ -67,6 +78,7 @@ public final class IndexResource { return Ok.INSTANCE; } + @Deprecated(since = "2.5.2", forRemoval = true) @DELETE @Path("/doc/{docId}") public Ok deleteDoc( @@ -120,6 +132,7 @@ public final class IndexResource { }); } + @Deprecated(since = "2.5.2", forRemoval = true) @PUT @Path("/doc/{docId}") public Ok updateDoc( @@ -132,4 +145,24 @@ public final class IndexResource { return Ok.INSTANCE; }); } + + @POST + @Path("/update") + @Consumes({"application/json-seq"}) + public Ok updates(@PathParam("name") String name, @Context HttpServletRequest req) throws Exception { + var reader = req.getReader(); + return indexManager.with(name, (index) -> { + String line; + while ((line = reader.readLine()) != null) { + var docReq = objectMapper.readValue(line.trim(), DocumentRequest.class); + if (docReq instanceof DocumentUpdateRequest) { + index.update(docReq.getId(), (DocumentUpdateRequest) docReq); + } + if (docReq instanceof DocumentDeleteRequest) { + index.delete(docReq.getId(), (DocumentDeleteRequest) docReq); + } + } + return Ok.INSTANCE; + }); + } } diff --git a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java index 0c777fbab..77f47a52c 100644 --- a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java +++ b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java @@ -29,17 +29,18 @@ public class IndexHealthCheckTest { @Test public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception { var manager = new IndexManager(); + var objectMapper = new ObjectMapper(); manager.setCommitIntervalSeconds(30); manager.setIdleSeconds(60); manager.setMaxIndexesOpen(1); - manager.setObjectMapper(new ObjectMapper()); + manager.setObjectMapper(objectMapper); manager.setRootDir(tempDir); manager.setScheduledExecutorService(Executors.newScheduledThreadPool(2)); manager.setSearcherFactory(new SearcherFactory()); manager.start(); try { - var resource = new IndexResource(manager); + var resource = new IndexResource(manager, objectMapper); var check = new IndexHealthCheck(resource); var result = check.check(); assertTrue(result.isHealthy(), result.toString()); diff --git a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java index f87af2fe0..2d56b14e9 100644 --- a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java +++ b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java @@ -77,7 +77,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -97,7 +97,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -118,7 +118,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -139,7 +139,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new DoubleField("bar", (double) i, false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -164,13 +164,13 @@ public class LuceneIndexTest { final int count = 50; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } for (int i = count + 1; i <= (count * 2) + 5; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } @@ -195,15 +195,15 @@ public class LuceneIndexTest { // get match seq wrong assertThrows( UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(1, 2, null, fields))); + () -> index.update("foo", new DocumentUpdateRequest(null, 1, 2, null, fields))); // Go to 2. - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); // Should be prevented from going down to 1. assertThrows( UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(2, 1, null, fields))); + () -> index.update("foo", new DocumentUpdateRequest(null, 2, 1, null, fields))); } finally { cleanup(index); } @@ -235,7 +235,7 @@ public class LuceneIndexTest { assertThat(info.getUpdateSeq()).isEqualTo(0); final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); info = index.info(); @@ -252,13 +252,13 @@ public class LuceneIndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(2, 3, false)); + index.delete("foo", new DocumentDeleteRequest(null, 2, 3, false)); index.commit(); info = index.info(); @@ -274,13 +274,13 @@ public class LuceneIndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(0, 3, true)); + index.delete("foo", new DocumentDeleteRequest(null, 0, 3, true)); index.commit(); info = index.info(); diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 2d140e580..56c736aac 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -26,6 +26,9 @@ delete_doc/4, purge_doc/4, update_doc/6, + start_update/1, + end_update/1, + cancel_update/1, search/2, set_purge_seq/3, set_update_seq/3, @@ -34,6 +37,7 @@ ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). +-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}). analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) @@ -99,50 +103,54 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when +delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), UpdateSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"DELETE">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + Row = #{<<"@type">> => delete, doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, purge => false}, + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). -purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when +purge_doc({_, _} = PoolStreamRef, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(PurgeSeq), PurgeSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, - Resp = send_if_enabled( - doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> + Row = #{<<"@type">> => delete, doc_id => DocId, match_seq => MatchSeq, seq => PurgeSeq, purge => true}, + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). + +start_update(#index{} = Index) -> + case nouveau:enabled() of + true -> + gun_pool:post( + update_path(Index), + [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE] + ); + false -> + {error, nouveau_not_enabled} + end. + +end_update({_, _} = PoolStreamRef) -> + ok = gun_pool:data(PoolStreamRef, fin, <<>>), + case await(PoolStreamRef) of + {ok, 200, _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; {error, Reason} -> send_error(Reason) end. -update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when +cancel_update({_, _} = PoolStreamRef) -> + gun_pool:cancel(PoolStreamRef). + +update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -151,26 +159,16 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when (is_binary(Partition) orelse Partition == null), is_list(Fields) -> - ReqBody = #{ + Row = #{ + <<"@type">> => update, + doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, partition => Partition, fields => Fields }, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"PUT">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( @@ -234,17 +232,12 @@ index_path(Path) when is_binary(Path) -> index_path(#index{} = Index) -> [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))]. -doc_path(#index{} = Index, DocId) -> - [ - <<"/index/">>, - couch_util:url_encode(nouveau_util:index_name(Index)), - <<"/doc/">>, - couch_util:url_encode(DocId) - ]. - search_path(#index{} = Index) -> [index_path(Index), <<"/search">>]. +update_path(#index{} = Index) -> + [index_path(Index), <<"/update">>]. + jaxrs_error(400, Body) -> {bad_request, message(Body)}; jaxrs_error(404, Body) -> @@ -291,20 +284,7 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> ) of {async, PoolStreamRef} -> - Timeout = config:get_integer("nouveau", "request_timeout", 30000), - case gun_pool:await(PoolStreamRef, Timeout) of - {response, fin, Status, RespHeaders} -> - {ok, Status, RespHeaders, []}; - {response, nofin, Status, RespHeaders} -> - case gun_pool:await_body(PoolStreamRef, Timeout) of - {ok, RespBody} -> - {ok, Status, RespHeaders, RespBody}; - {error, Reason} -> - {error, Reason} - end; - {error, Reason} -> - {error, Reason} - end; + await(PoolStreamRef); {error, no_connection_available, _Reason} when RemainingTries > 0 -> timer:sleep(1000), send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries - 1); @@ -314,3 +294,47 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> false -> {error, nouveau_not_enabled} end. + +await(PoolStreamRef) -> + Timeout = config:get_integer("nouveau", "request_timeout", 30000), + await(PoolStreamRef, Timeout). + +await(PoolStreamRef, Timeout) -> + case gun_pool:await(PoolStreamRef, Timeout) of + {response, fin, Status, RespHeaders} -> + {ok, Status, RespHeaders, []}; + {response, nofin, Status, RespHeaders} -> + case gun_pool:await_body(PoolStreamRef, Timeout) of + {ok, RespBody} -> + {ok, Status, RespHeaders, RespBody}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +encode_json_seq(Data) -> + [$\x{1e}, jiffy:encode(Data), $\n]. + +check_status({ConnPid, StreamRef} = PoolStreamRef) -> + MRef = monitor(process, ConnPid), + Res = receive + {gun_response, ConnPid, StreamRef, fin, Status, _Headers} -> + {error, Status}; + {gun_response, ConnPid, StreamRef, nofin, Status, _Headers} -> + case gun_pool:await_body(PoolStreamRef, MRef) of + {ok, Body} -> + {error, Status, Body}; + {ok, Body, _Trailers} -> + {error, Status, Body}; + {error, Reason} -> + {error, Reason} + end; + {'DOWN', MRef, process, ConnPid, Reason} -> + {error, Reason} + after 0 -> + ok + end, + demonitor(MRef, [flush]), + Res. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 4bfea753a..288b18007 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -27,6 +27,7 @@ -import(nouveau_util, [index_path/1]). -record(acc, { + pool_stream_ref, db, index, proc, @@ -80,14 +81,15 @@ update(#index{} = Index) -> index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), - + {async, PoolStreamRef} = nouveau_api:start_update(Index), + {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), Acc0 = #acc{ + pool_stream_ref = PoolStreamRef, db = Db, index = Index, proc = Proc, @@ -99,8 +101,10 @@ update(#index{} = Index) -> {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), + ok = nouveau_api:end_update(PoolStreamRef), exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) after + nouveau_api:cancel_update(PoolStreamRef), ret_os_process(Proc) end end @@ -125,8 +129,8 @@ load_docs(FDI, #acc{} = Acc1) -> false -> case update_or_delete_index( + Acc1#acc.pool_stream_ref, Acc1#acc.db, - Acc1#acc.index, Acc1#acc.update_seq, DI, Acc1#acc.proc @@ -142,11 +146,11 @@ load_docs(FDI, #acc{} = Acc1) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(PoolStreamRef, Db, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,10 +164,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + ok = nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); _ -> - nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields + ok = nouveau_api:update_doc( + PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields ) end end. @@ -209,7 +213,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -218,7 +222,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> ok = nouveau_api:purge_doc( - Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + PoolStreamRef, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; FDI -> @@ -229,8 +233,8 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> PurgeAcc1; false -> update_or_delete_index( + PoolStreamRef, Db, - Index, PurgeAcc1#purge_acc.index_update_seq, DI, Proc
