This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-gun-http2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 61c03f93cf9f2b3ae37ad0287f0e5f97857c4e68
Author: Robert Newson <[email protected]>
AuthorDate: Thu Jun 26 22:16:09 2025 +0100

    nouveau: use http/2
    
    we lose the pipelining benefit for indexing as http/2 does not
    support it (and we need the requests that update the index to happen
    in the order we issue them).
---
 .gitignore                                         |   2 +
 .../java/org/apache/couchdb/nouveau/api/Ok.java    |  26 +++
 .../couchdb/nouveau/resources/IndexResource.java   |  26 ++-
 rebar.config.script                                |   2 +
 rel/nouveau.yaml                                   |   4 +-
 rel/reltool.config                                 |   4 +
 src/nouveau/src/nouveau.app.src                    |   2 +-
 src/nouveau/src/nouveau_api.erl                    | 244 +++++++--------------
 src/nouveau/src/nouveau_gun.erl                    | 142 ++++++++++++
 src/nouveau/src/nouveau_index_manager.erl          |  33 ---
 src/nouveau/src/nouveau_index_updater.erl          |  46 ++--
 src/nouveau/src/nouveau_sup.erl                    |   1 +
 src/nouveau/src/nouveau_util.erl                   |  10 +-
 13 files changed, 289 insertions(+), 253 deletions(-)

diff --git a/.gitignore b/.gitignore
index 080a7dd6f..d46777512 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,10 +53,12 @@ src/couch/priv/couchspawnkillable
 src/couch/priv/couch_ejson_compare/couch_ejson_compare.d
 src/couch/priv/couch_js/**/*.d
 src/couch/priv/icu_driver/couch_icu_driver.d
+src/cowlib/
 src/mango/src/mango_cursor_text.nocompile
 src/excoveralls/
 src/fauxton/
 src/folsom/
+src/gun/
 src/hackney/
 src/hqueue/
 src/ibrowse/
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java
new file mode 100644
index 000000000..b393e1978
--- /dev/null
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java
@@ -0,0 +1,26 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Ok {
+
+    public static final Ok INSTANCE = new Ok();
+
+    @JsonProperty
+    public boolean ok() {
+        return true;
+    }
+}
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index a6ca2c47b..a52e00da9 100644
--- 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -35,6 +35,7 @@ import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
 import org.apache.couchdb.nouveau.api.IndexDefinition;
 import org.apache.couchdb.nouveau.api.IndexInfo;
 import org.apache.couchdb.nouveau.api.IndexInfoRequest;
+import org.apache.couchdb.nouveau.api.Ok;
 import org.apache.couchdb.nouveau.api.SearchRequest;
 import org.apache.couchdb.nouveau.api.SearchResults;
 import org.apache.couchdb.nouveau.core.IndexManager;
@@ -54,27 +55,29 @@ public final class IndexResource {
     }
 
     @PUT
-    public void createIndex(@PathParam("name") String name, @NotNull @Valid 
IndexDefinition indexDefinition)
+    public Ok createIndex(@PathParam("name") String name, @NotNull @Valid 
IndexDefinition indexDefinition)
             throws IOException {
         indexManager.create(name, indexDefinition);
+        return Ok.INSTANCE;
     }
 
     @DELETE
     @Path("/doc/{docId}")
-    public void deleteDoc(
+    public Ok deleteDoc(
             @PathParam("name") String name,
             @PathParam("docId") String docId,
             @NotNull @Valid DocumentDeleteRequest request)
             throws Exception {
-        indexManager.with(name, (index) -> {
+        return indexManager.with(name, (index) -> {
             index.delete(docId, request);
-            return null;
+            return Ok.INSTANCE;
         });
     }
 
     @DELETE
-    public void deletePath(@PathParam("name") String path, @Valid final 
List<String> exclusions) throws IOException {
+    public Ok deletePath(@PathParam("name") String path, @Valid final 
List<String> exclusions) throws IOException {
         indexManager.deleteAll(path, exclusions);
+        return Ok.INSTANCE;
     }
 
     @GET
@@ -85,9 +88,8 @@ public final class IndexResource {
     }
 
     @POST
-    public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid 
IndexInfoRequest request)
-            throws Exception {
-        indexManager.with(name, (index) -> {
+    public Ok setIndexInfo(@PathParam("name") String name, @NotNull @Valid 
IndexInfoRequest request) throws Exception {
+        return indexManager.with(name, (index) -> {
             if (request.getMatchUpdateSeq().isPresent()
                     && request.getUpdateSeq().isPresent()) {
                 index.setUpdateSeq(
@@ -99,7 +101,7 @@ public final class IndexResource {
                         request.getMatchPurgeSeq().getAsLong(),
                         request.getPurgeSeq().getAsLong());
             }
-            return null;
+            return Ok.INSTANCE;
         });
     }
 
@@ -114,14 +116,14 @@ public final class IndexResource {
 
     @PUT
     @Path("/doc/{docId}")
-    public void updateDoc(
+    public Ok updateDoc(
             @PathParam("name") String name,
             @PathParam("docId") String docId,
             @NotNull @Valid DocumentUpdateRequest request)
             throws Exception {
-        indexManager.with(name, (index) -> {
+        return indexManager.with(name, (index) -> {
             index.update(docId, request);
-            return null;
+            return Ok.INSTANCE;
         });
     }
 }
diff --git a/rebar.config.script b/rebar.config.script
index 1f82036cd..0ac822c38 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -158,6 +158,8 @@ DepDescs = [
 {fauxton,          {url, "https://github.com/apache/couchdb-fauxton"},
                    {tag, "v1.3.4"}, [raw]},
 {ibrowse,          "ibrowse",          {tag, "CouchDB-4.4.2-6"}},
+{gun,              {url, "https://github.com/ninenines/gun"},
+                   {tag, "2.2.0"}},
 {jiffy,            "jiffy",            {tag, "1.1.2"}},
 {mochiweb,         "mochiweb",         {tag, "v3.2.2"}},
 {meck,             "meck",             {tag, "1.0.0"}},
diff --git a/rel/nouveau.yaml b/rel/nouveau.yaml
index 40837f12c..0f33d5f25 100644
--- a/rel/nouveau.yaml
+++ b/rel/nouveau.yaml
@@ -8,12 +8,12 @@ logging:
 
 server:
   applicationConnectors:
-    - type: http
+    - type: h2c
       bindHost: 127.0.0.1
       port: {{nouveau_port}}
       useDateHeader: false
   adminConnectors:
-    - type: http
+    - type: h2c
       bindHost: 127.0.0.1
       port: {{nouveau_admin_port}}
       useDateHeader: false
diff --git a/rel/reltool.config b/rel/reltool.config
index c1f0ea070..b85bd49b6 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -48,6 +48,8 @@
         ets_lru,
         fabric,
         global_changes,
+        gun,
+        cowlib,
         ibrowse,
         ioq,
         jiffy,
@@ -111,6 +113,8 @@
     {app, ets_lru, [{incl_cond, include}]},
     {app, fabric, [{incl_cond, include}]},
     {app, global_changes, [{incl_cond, include}]},
+    {app, gun, [{incl_cond, include}]},
+    {app, cowlib, [{incl_cond, include}]},
     {app, ibrowse, [{incl_cond, include}]},
     {app, ioq, [{incl_cond, include}]},
     {app, jiffy, [{incl_cond, include}]},
diff --git a/src/nouveau/src/nouveau.app.src b/src/nouveau/src/nouveau.app.src
index 0828437c1..e8ea54915 100644
--- a/src/nouveau/src/nouveau.app.src
+++ b/src/nouveau/src/nouveau.app.src
@@ -18,7 +18,7 @@
     {vsn, git},
     {applications, [
         config,
-        ibrowse,
+        gun,
         kernel,
         stdlib,
         mem3,
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index b700524f7..6ae85d0b9 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,13 +23,12 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc_async/5,
-    purge_doc/5,
-    update_doc_async/7,
+    delete_doc/4,
+    purge_doc/4,
+    update_doc/6,
     search/2,
-    set_purge_seq/4,
-    set_update_seq/4,
-    drain_async_responses/2,
+    set_purge_seq/3,
+    set_update_seq/3,
     jaxrs_error/2
 ]).
 
@@ -40,13 +39,13 @@ analyze(Text, Analyzer) when
 ->
     ReqBody = {[{<<"text">>, Text}, {<<"analyzer">>, Analyzer}]},
     Resp = send_if_enabled(
-        nouveau_util:nouveau_url() ++ "/analyze",
+        "/analyze",
         [?JSON_CONTENT_TYPE],
-        post,
+        <<"POST">>,
         jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             Json = jiffy:decode(RespBody, [return_maps]),
             {ok, maps:get(<<"tokens">>, Json)};
         {ok, StatusCode, _, RespBody} ->
@@ -58,9 +57,9 @@ analyze(_, _) ->
     {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty 
strings">>}}.
 
 index_info(#index{} = Index) ->
-    Resp = send_if_enabled(index_url(Index), [], get),
+    Resp = send_if_enabled(index_path(Index), [], <<"GET">>),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             {ok, jiffy:decode(RespBody, [return_maps])};
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -70,10 +69,10 @@ index_info(#index{} = Index) ->
 
 create_index(#index{} = Index, IndexDefinition) ->
     Resp = send_if_enabled(
-        index_url(Index), [?JSON_CONTENT_TYPE], put, 
jiffy:encode(IndexDefinition)
+        index_path(Index), [?JSON_CONTENT_TYPE], <<"PUT">>, 
jiffy:encode(IndexDefinition)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 200, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -88,10 +87,10 @@ delete_path(Path, Exclusions) when
     is_binary(Path), is_list(Exclusions)
 ->
     Resp = send_if_enabled(
-        index_path(Path), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(Exclusions)
+        index_path(Path), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(Exclusions)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 200, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -99,8 +98,7 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when
-    is_pid(ConnPid),
+delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -108,19 +106,14 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, 
MatchSeq, UpdateSeq) when
     UpdateSeq > 0
 ->
     ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
-    send_direct_if_enabled(
-        ConnPid,
-        doc_url(Index, DocId),
+    send_if_enabled(
+        doc_path(Index, DocId),
         [?JSON_CONTENT_TYPE],
-        delete,
-        jiffy:encode(ReqBody),
-        [
-            {stream_to, self()}
-        ]
+        <<"DELETE">>,
+        jiffy:encode(ReqBody)
     ).
 
-purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when
-    is_pid(ConnPid),
+purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -128,11 +121,11 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, 
PurgeSeq) when
     PurgeSeq > 0
 ->
     ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
-    Resp = send_direct_if_enabled(
-        ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody), []
+    Resp = send_if_enabled(
+        doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 200, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -140,8 +133,7 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, 
PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
-    is_pid(ConnPid),
+update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -156,25 +148,21 @@ update_doc_async(ConnPid, #index{} = Index, DocId, 
MatchSeq, UpdateSeq, Partitio
         partition => Partition,
         fields => Fields
     },
-    send_direct_if_enabled(
-        ConnPid,
-        doc_url(Index, DocId),
+    send_if_enabled(
+        doc_path(Index, DocId),
         [?JSON_CONTENT_TYPE],
-        put,
-        jiffy:encode(ReqBody),
-        [
-            {stream_to, self()}
-        ]
+        <<"PUT">>,
+        jiffy:encode(ReqBody)
     ).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
-        search_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(QueryArgs)
+        search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(QueryArgs)
     ),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             {ok, jiffy:decode(RespBody, [return_maps])};
-        {ok, "409", _, _} ->
+        {ok, 409, _, _} ->
             %% Index was not current enough.
             {error, stale_index};
         {ok, StatusCode, _, RespBody} ->
@@ -183,26 +171,26 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) ->
+set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
     ReqBody = #{
         match_update_seq => MatchSeq,
         update_seq => UpdateSeq
     },
-    set_seq(ConnPid, Index, ReqBody).
+    set_seq(Index, ReqBody).
 
-set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) ->
+set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
     ReqBody = #{
         match_purge_seq => MatchSeq,
         purge_seq => PurgeSeq
     },
-    set_seq(ConnPid, Index, ReqBody).
+    set_seq(Index, ReqBody).
 
-set_seq(ConnPid, #index{} = Index, ReqBody) ->
-    Resp = send_direct_if_enabled(
-        ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, 
jiffy:encode(ReqBody), []
+set_seq(#index{} = Index, ReqBody) ->
+    Resp = send_if_enabled(
+        index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 200, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -210,90 +198,54 @@ set_seq(ConnPid, #index{} = Index, ReqBody) ->
             send_error(Reason)
     end.
 
-%% wait for enough async responses to reduce the Queue to Min length.
-drain_async_responses(Queue0, Min) when Min >= 0 ->
-    case queue:len(Queue0) > Min of
-        true ->
-            {{value, ReqId}, Queue1} = queue:out(Queue0),
-            wait_for_response(ReqId),
-            drain_async_responses(Queue1, Min);
-        false ->
-            Queue0
-    end.
-
-wait_for_response(ReqId) ->
-    case drain_async_response(ReqId) of
-        {ok, "204", _Headers, _Body} ->
-            ok;
-        {ok, StatusCode, _Headers, RespBody} ->
-            exit({error, jaxrs_error(StatusCode, RespBody)})
-    end.
-
-drain_async_response(ReqId) ->
-    drain_async_response(ReqId, undefined, undefined, undefined).
-
-drain_async_response(ReqId, Code0, Headers0, Body0) ->
-    receive
-        {ibrowse_async_headers, ReqId, Code1, Headers1} ->
-            drain_async_response(ReqId, Code1, Headers1, Body0);
-        {ibrowse_async_response, ReqId, Body1} ->
-            drain_async_response(ReqId, Code0, Headers0, Body1);
-        {ibrowse_async_response_end, ReqId} ->
-            {ok, Code0, Headers0, Body0}
-    end.
-
 %% private functions
 
-index_path(Path) ->
+index_path(Path) when is_binary(Path) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s",
+            "/index/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(Path)
             ]
         )
-    ).
-
-index_url(#index{} = Index) ->
+    );
+index_path(#index{} = Index) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s",
+            "/index/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(nouveau_util:index_name(Index))
             ]
         )
     ).
 
-doc_url(#index{} = Index, DocId) ->
+doc_path(#index{} = Index, DocId) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s/doc/~s",
+            "/index/~s/doc/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(nouveau_util:index_name(Index)),
                 couch_util:url_encode(DocId)
             ]
         )
     ).
 
-search_url(IndexName) ->
-    index_url(IndexName) ++ "/search".
+search_path(IndexName) ->
+    index_path(IndexName) ++ "/search".
 
-jaxrs_error("400", Body) ->
+jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
-jaxrs_error("404", Body) ->
+jaxrs_error(404, Body) ->
     {not_found, message(Body)};
-jaxrs_error("405", Body) ->
+jaxrs_error(405, Body) ->
     {method_not_allowed, message(Body)};
-jaxrs_error("409", Body) ->
+jaxrs_error(409, Body) ->
     {conflict, message(Body)};
-jaxrs_error("417", Body) ->
+jaxrs_error(417, Body) ->
     {expectation_failed, message(Body)};
-jaxrs_error("422", Body) ->
+jaxrs_error(422, Body) ->
     {bad_request, lists:join(" and ", errors(Body))};
-jaxrs_error("500", Body) ->
+jaxrs_error(500, Body) ->
     {internal_server_error, message(Body)}.
 
 send_error({conn_failed, _}) ->
@@ -309,71 +261,33 @@ errors(Body) ->
     Json = jiffy:decode(Body, [return_maps]),
     maps:get(<<"errors">>, Json).
 
-send_if_enabled(Url, Header, Method) ->
-    send_if_enabled(Url, Header, Method, []).
-
-send_if_enabled(Url, Header, Method, Body) ->
-    send_if_enabled(Url, Header, Method, Body, []).
-
-send_if_enabled(Url, Header, Method, Body, Options0) ->
-    case nouveau:enabled() of
-        true ->
-            Options1 = ibrowse_options(Options0),
-            retry_if_connection_closes(fun() ->
-                ibrowse:send_req(Url, Header, Method, Body, Options1)
-            end);
-        false ->
-            {error, nouveau_not_enabled}
-    end.
+send_if_enabled(Path, ReqHeaders, Method) ->
+    send_if_enabled(Path, ReqHeaders, Method, <<>>).
 
-send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options0) ->
+send_if_enabled(Path, ReqHeaders, Method, ReqBody) ->
     case nouveau:enabled() of
         true ->
-            Options1 = ibrowse_options(Options0),
-            retry_if_connection_closes(fun() ->
-                ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, 
Options1)
-            end);
+            case nouveau_gun:conn_info() of
+                disconnected ->
+                    {error, <<"no connection to nouveau server">>};
+                {ConnPid, MRef} ->
+                    StreamRef = gun:request(
+                        ConnPid,
+                        Method,
+                        Path,
+                        ReqHeaders,
+                        ReqBody
+                    ),
+                    case gun:await(ConnPid, StreamRef, MRef) of
+                        {response, fin, Status, RespHeaders} ->
+                            {ok, Status, RespHeaders, []};
+                        {response, nofin, Status, RespHeaders} ->
+                            {ok, RespBody} = gun:await_body(ConnPid, 
StreamRef, MRef),
+                            {ok, Status, RespHeaders, RespBody};
+                        {error, Reason} ->
+                            {error, Reason}
+                    end
+            end;
         false ->
             {error, nouveau_not_enabled}
     end.
-
-retry_if_connection_closes(Fun) ->
-    MaxRetries = max(1, config:get_integer("nouveau", "max_retries", 5)),
-    retry_if_connection_closes(Fun, MaxRetries).
-
-retry_if_connection_closes(_Fun, 0) ->
-    {error, connection_closed};
-retry_if_connection_closes(Fun, N) when is_integer(N), N > 0 ->
-    case Fun() of
-        {error, connection_closed} ->
-            couch_stats:increment_counter([nouveau, connection_closed_errors]),
-            timer:sleep(1000),
-            retry_if_connection_closes(Fun, N - 1);
-        Else ->
-            Else
-    end.
-
-ibrowse_options(BaseOptions) when is_list(BaseOptions) ->
-    CACertFile = config:get("nouveau", "ssl_cacert_file"),
-    KeyFile = config:get("nouveau", "ssl_key_file"),
-    CertFile = config:get("nouveau", "ssl_cert_file"),
-    Password = config:get("nouveau", "ssl_password"),
-    if
-        KeyFile /= undefined andalso CertFile /= undefined ->
-            CertKeyConf0 = #{
-                certfile => CertFile,
-                keyfile => KeyFile,
-                password => Password,
-                cacertfile => CACertFile
-            },
-            CertKeyConf1 = maps:filter(fun remove_undefined/2, CertKeyConf0),
-            SSLOptions = [{certs_keys, [CertKeyConf1]}],
-            [{ssl_options, SSLOptions} | BaseOptions];
-        true ->
-            BaseOptions
-    end.
-
-remove_undefined(_Key, undefined) ->
-    false;
-remove_undefined(_Key, _Value) ->
-    true.
diff --git a/src/nouveau/src/nouveau_gun.erl b/src/nouveau/src/nouveau_gun.erl
new file mode 100644
index 000000000..4de1c6189
--- /dev/null
+++ b/src/nouveau/src/nouveau_gun.erl
@@ -0,0 +1,142 @@
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% index manager ensures only one process is updating a nouveau index at a 
time.
+%% calling update_index will block until at least one attempt has been made to
+%% make the index as current as the database at the time update_index was 
called.
+
+-module(nouveau_gun).
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([start_link/0]).
+-export([conn_info/0]).
+
+%%% gen_server callbacks
+-export([init/1]).
+-export([handle_call/3]).
+-export([handle_cast/2]).
+-export([handle_info/2]).
+
+% config_listener callbacks
+-export([handle_config_change/5]).
+-export([handle_config_terminate/3]).
+
+-record(state, {conn_pid, mref}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+conn_info() ->
+    persistent_term:get(?MODULE, disconnected).
+
+init(_) ->
+    {ConnPid, MRef} = start_gun(),
+    ok = config:listen_for_changes(?MODULE, nil),
+    {ok, #state{conn_pid = ConnPid, mref = MRef}}.
+
+handle_call(_Msg, _From, State) ->
+    {reply, unexpected_msg, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(restart_gun, State) ->
+    {NewConnPid, NewMRef} = start_gun(),
+    {noreply, State#state{conn_pid = NewConnPid, mref = NewMRef}};
+handle_info({gun_up, ConnPid, http2}, #state{conn_pid = ConnPid} = State) ->
+    persistent_term:put(?MODULE, {ConnPid, State#state.mref}),
+    couch_log:info("Connection to nouveau server established", []),
+    {noreply, State};
+handle_info({gun_down, _Pid, http2, Reason, _KilledStreams}, State) ->
+    couch_log:info("Connection to nouveau server lost: ~p", [Reason]),
+    {noreply, State};
+handle_info({'DOWN', MRef, process, _ConnPid, Reason}, #state{mref = MRef} = 
State) ->
+    persistent_term:erase(?MODULE),
+    couch_log:warning("Connection to nouveau server down for reason: ~p", 
[Reason]),
+    erlang:send_after(
+        500,
+        whereis(?MODULE),
+        restart_gun
+    ),
+    {noreply, State};
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
+handle_info(Msg, State) ->
+    couch_log:warning("~p received unexpected message: ~p", [?MODULE, Msg]),
+    {noreply, State}.
+
+handle_config_change("nouveau", "url", URL, _Persist, State) ->
+    start_gun(URL),
+    case conn_info() of
+        disconnected ->
+            ok;
+        {ConnPid, _MRef} ->
+            gun:close(ConnPid)
+    end,
+    {ok, State};
+handle_config_change(_Section, _Key, _Value, _Persist, State) ->
+    {ok, State}.
+
+handle_config_terminate(_Server, stop, _State) ->
+    ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+    erlang:send_after(
+        500,
+        whereis(?MODULE),
+        restart_config_listener
+    ).
+
+%% private functions
+
+start_gun() ->
+    start_gun(nouveau_util:nouveau_url()).
+
+start_gun(URL) ->
+    #{host := Host, port := Port, scheme := Scheme} = uri_string:parse(URL),
+    CACertFile = config:get("nouveau", "ssl_cacert_file"),
+    KeyFile = config:get("nouveau", "ssl_key_file"),
+    CertFile = config:get("nouveau", "ssl_cert_file"),
+    Password = config:get("nouveau", "ssl_password"),
+    Transport = scheme_to_transport(Scheme),
+    BaseOptions = #{transport => Transport, protocols => [http2]},
+    Options =
+        if
+            Transport == tls andalso KeyFile /= undefined andalso CertFile /= 
undefined ->
+                CertKeyConf0 = #{
+                    certfile => CertFile,
+                    keyfile => KeyFile,
+                    password => Password,
+                    cacertfile => CACertFile
+                },
+                CertKeyConf1 = maps:filter(fun remove_undefined/2, 
CertKeyConf0),
+                BaseOptions#{
+                    tls_opts => [{certs_keys, [CertKeyConf1]}]
+                };
+            true ->
+                BaseOptions
+        end,
+    {ok, ConnPid} = gun:open(Host, Port, Options),
+    MRef = monitor(process, ConnPid),
+    {ConnPid, MRef}.
+
+remove_undefined(_Key, Value) ->
+    Value /= undefined.
+
+scheme_to_transport("http") ->
+    tcp;
+scheme_to_transport("https") ->
+    tls.
diff --git a/src/nouveau/src/nouveau_index_manager.erl 
b/src/nouveau/src/nouveau_index_manager.erl
index eb18bc6f7..45f7a1e8d 100644
--- a/src/nouveau/src/nouveau_index_manager.erl
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -36,9 +36,6 @@
     handle_info/2
 ]).
 
-% config_listener api
--export([handle_config_change/5, handle_config_terminate/3]).
-
 -export([handle_db_event/3]).
 
 -define(BY_DBSIG, nouveau_by_dbsig).
@@ -60,8 +57,6 @@ init(_) ->
     ets:new(?BY_DBSIG, [set, named_table]),
     ets:new(?BY_REF, [set, named_table]),
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
-    configure_ibrowse(nouveau_util:nouveau_url()),
-    ok = config:listen_for_changes(?MODULE, nil),
     {ok, nil}.
 
 handle_call({update, #index{} = Index0}, From, State) ->
@@ -131,31 +126,3 @@ handle_db_event(DbName, deleted, State) ->
     {ok, State};
 handle_db_event(_DbName, _Event, State) ->
     {ok, State}.
-
-handle_config_change("nouveau", "url", URL, _Persist, State) ->
-    configure_ibrowse(URL),
-    {ok, State};
-handle_config_change(_Section, _Key, _Value, _Persist, State) ->
-    {ok, State}.
-
-handle_config_terminate(_Server, stop, _State) ->
-    ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(
-        5000,
-        whereis(?MODULE),
-        restart_config_listener
-    ).
-
-configure_ibrowse(URL) ->
-    #{host := Host, port := Port} = uri_string:parse(URL),
-    ibrowse:set_max_sessions(
-        Host,
-        Port,
-        nouveau_util:max_sessions()
-    ),
-    ibrowse:set_max_pipeline_size(
-        Host,
-        Port,
-        nouveau_util:max_pipeline_size()
-    ).
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index efed245db..8514de0ed 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -33,10 +33,7 @@
     changes_done,
     total_changes,
     exclude_idrevs,
-    reqids,
-    conn_pid,
-    update_seq,
-    max_pipeline_size
+    update_seq
 }).
 
 -record(purge_acc, {
@@ -79,12 +76,11 @@ update(#index{} = Index) ->
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
-                {ok, ConnPid} = 
ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()),
                 PurgeAcc0 = #purge_acc{
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0),
+                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
@@ -98,18 +94,13 @@ update(#index{} = Index) ->
                         changes_done = 0,
                         total_changes = TotalChanges,
                         exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
-                        reqids = queue:new(),
-                        conn_pid = ConnPid,
-                        update_seq = PurgeAcc1#purge_acc.index_update_seq,
-                        max_pipeline_size = nouveau_util:max_pipeline_size()
+                        update_seq = PurgeAcc1#purge_acc.index_update_seq
                     },
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
-                    nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
-                    exit(nouveau_api:set_update_seq(ConnPid, Index, 
Acc1#acc.update_seq, NewCurSeq))
+                    exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
-                    ibrowse:stop_worker_process(ConnPid),
                     ret_os_process(Proc)
                 end
         end
@@ -119,11 +110,7 @@ update(#index{} = Index) ->
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
     {ok, Acc};
-load_docs(FDI, #acc{} = Acc0) ->
-    %% block for responses so we stay under the max pipeline size
-    ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, 
Acc0#acc.max_pipeline_size),
-    Acc1 = Acc0#acc{reqids = ReqIds1},
-
+load_docs(FDI, #acc{} = Acc1) ->
     couch_task_status:update([
         {changes_done, Acc1#acc.changes_done},
         {progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes}
@@ -138,7 +125,6 @@ load_docs(FDI, #acc{} = Acc0) ->
             false ->
                 case
                     update_or_delete_index(
-                        Acc1#acc.conn_pid,
                         Acc1#acc.db,
                         Acc1#acc.index,
                         Acc1#acc.update_seq,
@@ -146,10 +132,9 @@ load_docs(FDI, #acc{} = Acc0) ->
                         Acc1#acc.proc
                     )
                 of
-                    {ibrowse_req_id, ReqId} ->
+                    {ok, _, _, _} ->
                         Acc1#acc{
-                            update_seq = DI#doc_info.high_seq,
-                            reqids = queue:in(ReqId, Acc1#acc.reqids)
+                            update_seq = DI#doc_info.high_seq
                         };
                     {error, Reason} ->
                         exit({error, Reason})
@@ -157,11 +142,11 @@ load_docs(FDI, #acc{} = Acc0) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = 
DI, Proc) ->
+update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq);
+            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -175,10 +160,10 @@ update_or_delete_index(ConnPid, Db, #index{} = Index, 
MatchSeq, #doc_info{} = DI
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, 
Seq);
+                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
                 _ ->
-                    nouveau_api:update_doc_async(
-                        ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields
+                    nouveau_api:update_doc(
+                        Index, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
@@ -223,7 +208,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -232,7 +217,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
                         ok = nouveau_api:purge_doc(
-                            ConnPid, Index, Id, 
PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
+                            Index, Id, PurgeAcc1#purge_acc.index_purge_seq, 
PurgeSeq
                         ),
                         PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
                     FDI ->
@@ -243,7 +228,6 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
                                 PurgeAcc1;
                             false ->
                                 update_or_delete_index(
-                                    ConnPid,
                                     Db,
                                     Index,
                                     PurgeAcc1#purge_acc.index_update_seq,
@@ -265,7 +249,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
         ),
         DbPurgeSeq = couch_db:get_purge_seq(Db),
         ok = nouveau_api:set_purge_seq(
-            ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
+            Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
         ),
         update_local_doc(Db, Index, DbPurgeSeq),
         {ok, PurgeAcc3}
diff --git a/src/nouveau/src/nouveau_sup.erl b/src/nouveau/src/nouveau_sup.erl
index 3547b43fa..65afe744a 100644
--- a/src/nouveau/src/nouveau_sup.erl
+++ b/src/nouveau/src/nouveau_sup.erl
@@ -23,6 +23,7 @@ start_link() ->
 
 init(_Args) ->
     Children = [
+        child(nouveau_gun),
         child(nouveau_index_manager)
     ],
     {ok, {{one_for_one, 10, 1}, couch_epi:register_service(nouveau_epi, 
Children)}}.
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index b6dd0fcbd..ad0ce6663 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -27,9 +27,7 @@
     maybe_create_local_purge_doc/2,
     get_local_purge_doc_id/1,
     get_local_purge_doc_body/3,
-    nouveau_url/0,
-    max_sessions/0,
-    max_pipeline_size/0
+    nouveau_url/0
 ]).
 
 index_name(Path) when is_binary(Path) ->
@@ -198,9 +196,3 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
 
 nouveau_url() ->
     config:get("nouveau", "url", "http://127.0.0.1:5987";).
-
-max_sessions() ->
-    config:get_integer("nouveau", "max_sessions", 100).
-
-max_pipeline_size() ->
-    config:get_integer("nouveau", "max_pipeline_size", 1000).


Reply via email to