This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-gun-http2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a20968f6ef2ce9194c6f1bba6650772ee245cd5b
Author: Robert Newson <[email protected]>
AuthorDate: Thu Jun 26 22:16:09 2025 +0100

    nouveau: switch from ibrowse to gun
    
    This allows http/2 if nouveau is confirmed for it
    
    we lose the pipelining benefit for indexing as http/2 does not
    support it (and we need the requests that update the index to happen
    in the order we issue them).
---
 .gitignore                                |   2 +
 dev/run                                   |  36 ++++-
 rebar.config.script                       |   2 +
 rel/nouveau.yaml                          |   8 +-
 rel/reltool.config                        |   4 +
 src/nouveau/src/nouveau.app.src           |   2 +-
 src/nouveau/src/nouveau_api.erl           | 240 ++++++++++--------------------
 src/nouveau/src/nouveau_gun.erl           | 106 +++++++++++++
 src/nouveau/src/nouveau_index_manager.erl |  33 ----
 src/nouveau/src/nouveau_index_updater.erl |  46 ++----
 src/nouveau/src/nouveau_sup.erl           |   1 +
 src/nouveau/src/nouveau_util.erl          |   6 +-
 12 files changed, 248 insertions(+), 238 deletions(-)

diff --git a/.gitignore b/.gitignore
index 080a7dd6f..d46777512 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,10 +53,12 @@ src/couch/priv/couchspawnkillable
 src/couch/priv/couch_ejson_compare/couch_ejson_compare.d
 src/couch/priv/couch_js/**/*.d
 src/couch/priv/icu_driver/couch_icu_driver.d
+src/cowlib/
 src/mango/src/mango_cursor_text.nocompile
 src/excoveralls/
 src/fauxton/
 src/folsom/
+src/gun/
 src/hackney/
 src/hqueue/
 src/ibrowse/
diff --git a/dev/run b/dev/run
index 478fb2ae9..729b071df 100755
--- a/dev/run
+++ b/dev/run
@@ -367,7 +367,7 @@ def setup_configs(ctx):
             "prometheus_port": prometheus_port,
             "uuid": "fake_uuid_for_dev",
             "nouveau_enable": str(ctx["with_nouveau"]).lower(),
-            "nouveau_url": "http://127.0.0.1:5987";,
+            "nouveau_url": "https://127.0.0.1:5987";,
             "_default": "",
         }
         write_config(ctx, node, env)
@@ -422,10 +422,44 @@ def generate_nouveau_config(ctx):
     src = os.path.join(ctx["rootdir"], "rel", "nouveau.yaml")
     tgt = os.path.join(ctx["devdir"], "lib", "nouveau.yaml")
 
+    # Generate certificates for nouveau server
+    sp.run([
+        "openssl",
+        "req", "-x509",
+        "-newkey", "rsa:2048",
+        "-keyout", "lib/nouveau.key",
+        "-out", "lib/nouveau.cert",
+        "-sha256",
+        "-days", "3650",
+        "-nodes",
+        "-subj", "/CN=127.0.0.1"],
+           capture_output=True,
+           check=True,
+           cwd=ctx["devdir"])
+    password = str(uuid.uuid4())
+    sp.run([
+        "openssl",
+        "pkcs12",
+        "-export",
+        "-in", "lib/nouveau.cert",
+        "-inkey", "lib/nouveau.key",
+        "-out", "lib/nouveau.pkcs12",
+        "-passout", "stdin",
+        "-name", "nouveau",
+        "-noiter",
+        "-nomaciter"],
+           input=password,
+           text=True,
+           capture_output=True,
+           check=True,
+           cwd=ctx["devdir"])
+
     config = {
         "nouveau_index_dir": os.path.join(ctx["devdir"], "lib", "nouveau"),
         "nouveau_port": 5987,
         "nouveau_admin_port": 5988,
+        "keystore_path": os.path.join(ctx["devdir"], "lib", "nouveau.pkcs12"),
+        "keystore_password": password
     }
 
     with open(src) as handle:
diff --git a/rebar.config.script b/rebar.config.script
index 1f82036cd..0ac822c38 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -158,6 +158,8 @@ DepDescs = [
 {fauxton,          {url, "https://github.com/apache/couchdb-fauxton"},
                    {tag, "v1.3.4"}, [raw]},
 {ibrowse,          "ibrowse",          {tag, "CouchDB-4.4.2-6"}},
+{gun,              {url, "https://github.com/ninenines/gun"},
+                   {tag, "2.2.0"}},
 {jiffy,            "jiffy",            {tag, "1.1.2"}},
 {mochiweb,         "mochiweb",         {tag, "v3.2.2"}},
 {meck,             "meck",             {tag, "1.0.0"}},
diff --git a/rel/nouveau.yaml b/rel/nouveau.yaml
index 40837f12c..a547b9fdb 100644
--- a/rel/nouveau.yaml
+++ b/rel/nouveau.yaml
@@ -8,15 +8,19 @@ logging:
 
 server:
   applicationConnectors:
-    - type: http
+    - type: h2
       bindHost: 127.0.0.1
       port: {{nouveau_port}}
       useDateHeader: false
+      keyStorePath: {{keystore_path}}
+      keyStorePassword: {{keystore_password}}
   adminConnectors:
-    - type: http
+    - type: h2
       bindHost: 127.0.0.1
       port: {{nouveau_admin_port}}
       useDateHeader: false
+      keyStorePath: {{keystore_path}}
+      keyStorePassword: {{keystore_password}}
   gzip:
     includedMethods:
       - GET
diff --git a/rel/reltool.config b/rel/reltool.config
index c1f0ea070..b85bd49b6 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -48,6 +48,8 @@
         ets_lru,
         fabric,
         global_changes,
+        gun,
+        cowlib,
         ibrowse,
         ioq,
         jiffy,
@@ -111,6 +113,8 @@
     {app, ets_lru, [{incl_cond, include}]},
     {app, fabric, [{incl_cond, include}]},
     {app, global_changes, [{incl_cond, include}]},
+    {app, gun, [{incl_cond, include}]},
+    {app, cowlib, [{incl_cond, include}]},
     {app, ibrowse, [{incl_cond, include}]},
     {app, ioq, [{incl_cond, include}]},
     {app, jiffy, [{incl_cond, include}]},
diff --git a/src/nouveau/src/nouveau.app.src b/src/nouveau/src/nouveau.app.src
index 0828437c1..e8ea54915 100644
--- a/src/nouveau/src/nouveau.app.src
+++ b/src/nouveau/src/nouveau.app.src
@@ -18,7 +18,7 @@
     {vsn, git},
     {applications, [
         config,
-        ibrowse,
+        gun,
         kernel,
         stdlib,
         mem3,
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index b700524f7..044251ec5 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -23,13 +23,12 @@
     create_index/2,
     delete_path/1,
     delete_path/2,
-    delete_doc_async/5,
-    purge_doc/5,
-    update_doc_async/7,
+    delete_doc/4,
+    purge_doc/4,
+    update_doc/6,
     search/2,
-    set_purge_seq/4,
-    set_update_seq/4,
-    drain_async_responses/2,
+    set_purge_seq/3,
+    set_update_seq/3,
     jaxrs_error/2
 ]).
 
@@ -40,13 +39,13 @@ analyze(Text, Analyzer) when
 ->
     ReqBody = {[{<<"text">>, Text}, {<<"analyzer">>, Analyzer}]},
     Resp = send_if_enabled(
-        nouveau_util:nouveau_url() ++ "/analyze",
+        "/analyze",
         [?JSON_CONTENT_TYPE],
-        post,
+        <<"POST">>,
         jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             Json = jiffy:decode(RespBody, [return_maps]),
             {ok, maps:get(<<"tokens">>, Json)};
         {ok, StatusCode, _, RespBody} ->
@@ -58,9 +57,9 @@ analyze(_, _) ->
     {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty 
strings">>}}.
 
 index_info(#index{} = Index) ->
-    Resp = send_if_enabled(index_url(Index), [], get),
+    Resp = send_if_enabled(index_path(Index), [], <<"GET">>),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             {ok, jiffy:decode(RespBody, [return_maps])};
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -70,10 +69,10 @@ index_info(#index{} = Index) ->
 
 create_index(#index{} = Index, IndexDefinition) ->
     Resp = send_if_enabled(
-        index_url(Index), [?JSON_CONTENT_TYPE], put, 
jiffy:encode(IndexDefinition)
+        index_path(Index), [?JSON_CONTENT_TYPE], <<"PUT">>, 
jiffy:encode(IndexDefinition)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 204, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -88,10 +87,10 @@ delete_path(Path, Exclusions) when
     is_binary(Path), is_list(Exclusions)
 ->
     Resp = send_if_enabled(
-        index_path(Path), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(Exclusions)
+        index_path(Path), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(Exclusions)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 204, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -99,8 +98,7 @@ delete_path(Path, Exclusions) when
             send_error(Reason)
     end.
 
-delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when
-    is_pid(ConnPid),
+delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -108,19 +106,14 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, 
MatchSeq, UpdateSeq) when
     UpdateSeq > 0
 ->
     ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
-    send_direct_if_enabled(
-        ConnPid,
-        doc_url(Index, DocId),
+    send_if_enabled(
+        doc_path(Index, DocId),
         [?JSON_CONTENT_TYPE],
-        delete,
-        jiffy:encode(ReqBody),
-        [
-            {stream_to, self()}
-        ]
+        <<"DELETE">>,
+        jiffy:encode(ReqBody)
     ).
 
-purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when
-    is_pid(ConnPid),
+purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -128,11 +121,11 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, 
PurgeSeq) when
     PurgeSeq > 0
 ->
     ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
-    Resp = send_direct_if_enabled(
-        ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, 
jiffy:encode(ReqBody), []
+    Resp = send_if_enabled(
+        doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 204, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -140,8 +133,7 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, 
PurgeSeq) when
             send_error(Reason)
     end.
 
-update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, 
Partition, Fields) when
-    is_pid(ConnPid),
+update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) 
when
     is_binary(DocId),
     is_integer(MatchSeq),
     MatchSeq >= 0,
@@ -156,25 +148,21 @@ update_doc_async(ConnPid, #index{} = Index, DocId, 
MatchSeq, UpdateSeq, Partitio
         partition => Partition,
         fields => Fields
     },
-    send_direct_if_enabled(
-        ConnPid,
-        doc_url(Index, DocId),
+    send_if_enabled(
+        doc_path(Index, DocId),
         [?JSON_CONTENT_TYPE],
-        put,
-        jiffy:encode(ReqBody),
-        [
-            {stream_to, self()}
-        ]
+        <<"PUT">>,
+        jiffy:encode(ReqBody)
     ).
 
 search(#index{} = Index, QueryArgs) ->
     Resp = send_if_enabled(
-        search_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(QueryArgs)
+        search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(QueryArgs)
     ),
     case Resp of
-        {ok, "200", _, RespBody} ->
+        {ok, 200, _, RespBody} ->
             {ok, jiffy:decode(RespBody, [return_maps])};
-        {ok, "409", _, _} ->
+        {ok, 409, _, _} ->
             %% Index was not current enough.
             {error, stale_index};
         {ok, StatusCode, _, RespBody} ->
@@ -183,26 +171,26 @@ search(#index{} = Index, QueryArgs) ->
             send_error(Reason)
     end.
 
-set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) ->
+set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) ->
     ReqBody = #{
         match_update_seq => MatchSeq,
         update_seq => UpdateSeq
     },
-    set_seq(ConnPid, Index, ReqBody).
+    set_seq(Index, ReqBody).
 
-set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) ->
+set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) ->
     ReqBody = #{
         match_purge_seq => MatchSeq,
         purge_seq => PurgeSeq
     },
-    set_seq(ConnPid, Index, ReqBody).
+    set_seq(Index, ReqBody).
 
-set_seq(ConnPid, #index{} = Index, ReqBody) ->
-    Resp = send_direct_if_enabled(
-        ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, 
jiffy:encode(ReqBody), []
+set_seq(#index{} = Index, ReqBody) ->
+    Resp = send_if_enabled(
+        index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, 
jiffy:encode(ReqBody)
     ),
     case Resp of
-        {ok, "204", _, _} ->
+        {ok, 204, _, _} ->
             ok;
         {ok, StatusCode, _, RespBody} ->
             {error, jaxrs_error(StatusCode, RespBody)};
@@ -210,90 +198,54 @@ set_seq(ConnPid, #index{} = Index, ReqBody) ->
             send_error(Reason)
     end.
 
-%% wait for enough async responses to reduce the Queue to Min length.
-drain_async_responses(Queue0, Min) when Min >= 0 ->
-    case queue:len(Queue0) > Min of
-        true ->
-            {{value, ReqId}, Queue1} = queue:out(Queue0),
-            wait_for_response(ReqId),
-            drain_async_responses(Queue1, Min);
-        false ->
-            Queue0
-    end.
-
-wait_for_response(ReqId) ->
-    case drain_async_response(ReqId) of
-        {ok, "204", _Headers, _Body} ->
-            ok;
-        {ok, StatusCode, _Headers, RespBody} ->
-            exit({error, jaxrs_error(StatusCode, RespBody)})
-    end.
-
-drain_async_response(ReqId) ->
-    drain_async_response(ReqId, undefined, undefined, undefined).
-
-drain_async_response(ReqId, Code0, Headers0, Body0) ->
-    receive
-        {ibrowse_async_headers, ReqId, Code1, Headers1} ->
-            drain_async_response(ReqId, Code1, Headers1, Body0);
-        {ibrowse_async_response, ReqId, Body1} ->
-            drain_async_response(ReqId, Code0, Headers0, Body1);
-        {ibrowse_async_response_end, ReqId} ->
-            {ok, Code0, Headers0, Body0}
-    end.
-
 %% private functions
 
-index_path(Path) ->
+index_path(Path) when is_binary(Path) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s",
+            "/index/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(Path)
             ]
         )
-    ).
-
-index_url(#index{} = Index) ->
+    );
+index_path(#index{} = Index) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s",
+            "/index/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(nouveau_util:index_name(Index))
             ]
         )
     ).
 
-doc_url(#index{} = Index, DocId) ->
+doc_path(#index{} = Index, DocId) ->
     lists:flatten(
         io_lib:format(
-            "~s/index/~s/doc/~s",
+            "/index/~s/doc/~s",
             [
-                nouveau_util:nouveau_url(),
                 couch_util:url_encode(nouveau_util:index_name(Index)),
                 couch_util:url_encode(DocId)
             ]
         )
     ).
 
-search_url(IndexName) ->
-    index_url(IndexName) ++ "/search".
+search_path(IndexName) ->
+    index_path(IndexName) ++ "/search".
 
-jaxrs_error("400", Body) ->
+jaxrs_error(400, Body) ->
     {bad_request, message(Body)};
-jaxrs_error("404", Body) ->
+jaxrs_error(404, Body) ->
     {not_found, message(Body)};
-jaxrs_error("405", Body) ->
+jaxrs_error(405, Body) ->
     {method_not_allowed, message(Body)};
-jaxrs_error("409", Body) ->
+jaxrs_error(409, Body) ->
     {conflict, message(Body)};
-jaxrs_error("417", Body) ->
+jaxrs_error(417, Body) ->
     {expectation_failed, message(Body)};
-jaxrs_error("422", Body) ->
+jaxrs_error(422, Body) ->
     {bad_request, lists:join(" and ", errors(Body))};
-jaxrs_error("500", Body) ->
+jaxrs_error(500, Body) ->
     {internal_server_error, message(Body)}.
 
 send_error({conn_failed, _}) ->
@@ -309,71 +261,29 @@ errors(Body) ->
     Json = jiffy:decode(Body, [return_maps]),
     maps:get(<<"errors">>, Json).
 
-send_if_enabled(Url, Header, Method) ->
-    send_if_enabled(Url, Header, Method, []).
-
-send_if_enabled(Url, Header, Method, Body) ->
-    send_if_enabled(Url, Header, Method, Body, []).
-
-send_if_enabled(Url, Header, Method, Body, Options0) ->
-    case nouveau:enabled() of
-        true ->
-            Options1 = ibrowse_options(Options0),
-            retry_if_connection_closes(fun() ->
-                ibrowse:send_req(Url, Header, Method, Body, Options1)
-            end);
-        false ->
-            {error, nouveau_not_enabled}
-    end.
+send_if_enabled(Path, ReqHeaders, Method) ->
+    send_if_enabled(Path, ReqHeaders, Method, <<>>).
 
-send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options0) ->
+send_if_enabled(Path, ReqHeaders, Method, ReqBody) ->
     case nouveau:enabled() of
         true ->
-            Options1 = ibrowse_options(Options0),
-            retry_if_connection_closes(fun() ->
-                ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, 
Options1)
-            end);
+            {ok, ConnPid} = nouveau_gun:conn_pid(),
+            StreamRef = gun:request(
+                ConnPid,
+                Method,
+                Path,
+                ReqHeaders,
+                ReqBody
+            ),
+            case gun:await(ConnPid, StreamRef) of
+                {response, fin, Status, RespHeaders} ->
+                    {ok, Status, RespHeaders, []};
+                {response, nofin, Status, RespHeaders} ->
+                    {ok, RespBody} = gun:await_body(ConnPid, StreamRef),
+                    {ok, Status, RespHeaders, RespBody};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
         false ->
             {error, nouveau_not_enabled}
     end.
-
-retry_if_connection_closes(Fun) ->
-    MaxRetries = max(1, config:get_integer("nouveau", "max_retries", 5)),
-    retry_if_connection_closes(Fun, MaxRetries).
-
-retry_if_connection_closes(_Fun, 0) ->
-    {error, connection_closed};
-retry_if_connection_closes(Fun, N) when is_integer(N), N > 0 ->
-    case Fun() of
-        {error, connection_closed} ->
-            couch_stats:increment_counter([nouveau, connection_closed_errors]),
-            timer:sleep(1000),
-            retry_if_connection_closes(Fun, N - 1);
-        Else ->
-            Else
-    end.
-
-ibrowse_options(BaseOptions) when is_list(BaseOptions) ->
-    CACertFile = config:get("nouveau", "ssl_cacert_file"),
-    KeyFile = config:get("nouveau", "ssl_key_file"),
-    CertFile = config:get("nouveau", "ssl_cert_file"),
-    Password = config:get("nouveau", "ssl_password"),
-    if
-        KeyFile /= undefined andalso CertFile /= undefined ->
-            CertKeyConf0 = #{
-                certfile => CertFile,
-                keyfile => KeyFile,
-                password => Password,
-                cacertfile => CACertFile
-            },
-            CertKeyConf1 = maps:filter(fun remove_undefined/2, CertKeyConf0),
-            SSLOptions = [{certs_keys, [CertKeyConf1]}],
-            [{ssl_options, SSLOptions} | BaseOptions];
-        true ->
-            BaseOptions
-    end.
-
-remove_undefined(_Key, undefined) ->
-    false;
-remove_undefined(_Key, _Value) ->
-    true.
diff --git a/src/nouveau/src/nouveau_gun.erl b/src/nouveau/src/nouveau_gun.erl
new file mode 100644
index 000000000..6d6995d84
--- /dev/null
+++ b/src/nouveau/src/nouveau_gun.erl
@@ -0,0 +1,106 @@
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% index manager ensures only one process is updating a nouveau index at a 
time.
+%% calling update_index will block until at least one attempt has been made to
+%% make the index as current as the database at the time update_index was 
called.
+
+-module(nouveau_gun).
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([start_link/0]).
+-export([conn_pid/0]).
+
+%%% gen_server callbacks
+-export([init/1]).
+-export([handle_call/3]).
+-export([handle_cast/2]).
+-export([handle_info/2]).
+
+% config_listener callbacks
+-export([handle_config_change/5]).
+-export([handle_config_terminate/3]).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+conn_pid() ->
+    persistent_term:get(?MODULE).
+
+init(_) ->
+    ConnPid = start_gun(nouveau_util:nouveau_url()),
+    ok = config:listen_for_changes(?MODULE, ConnPid),
+    persistent_term:put(?MODULE, ConnPid),
+    {ok, ConnPid}.
+
+handle_call(_Msg, _From, State) ->
+    {reply, unexpected_msg, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+handle_config_change("nouveau", "url", URL, _Persist, OldConnPid) ->
+    gun:stop(OldConnPid),
+    NewConnPid = start_gun(URL),
+    persistent_term:put(?MODULE, NewConnPid),
+    {ok, NewConnPid};
+handle_config_change(_Section, _Key, _Value, _Persist, ConnPid) ->
+    {ok, ConnPid}.
+
+handle_config_terminate(_Server, stop, _State) ->
+    ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+    erlang:send_after(
+        500,
+        whereis(?MODULE),
+        restart_config_listener
+    ).
+
+%% private functions
+
+start_gun(URL) ->
+    #{host := Host, port := Port} = uri_string:parse(URL),
+    CACertFile = config:get("nouveau", "ssl_cacert_file"),
+    KeyFile = config:get("nouveau", "ssl_key_file"),
+    CertFile = config:get("nouveau", "ssl_cert_file"),
+    Password = config:get("nouveau", "ssl_password"),
+    Verify = list_to_existing_atom(config:get("nouveau", "ssl_verify", 
"verify_none")),
+    BaseOptions = #{transport => tls},
+    BaseTLSOptions = [{verify, Verify}],
+    Options =
+        if
+            KeyFile /= undefined andalso CertFile /= undefined ->
+                CertKeyConf0 = #{
+                    certfile => CertFile,
+                    keyfile => KeyFile,
+                    password => Password,
+                    cacertfile => CACertFile
+                },
+                CertKeyConf1 = maps:filter(fun remove_undefined/2, 
CertKeyConf0),
+                BaseOptions#{tls_opts => [{certs_keys, [CertKeyConf1]} | 
BaseTLSOptions]};
+            true ->
+                BaseOptions#{tls_opts => BaseTLSOptions}
+        end,
+    gun:open(Host, Port, Options).
+
+remove_undefined(_Key, Value) ->
+    Value /= undefined.
diff --git a/src/nouveau/src/nouveau_index_manager.erl 
b/src/nouveau/src/nouveau_index_manager.erl
index eb18bc6f7..45f7a1e8d 100644
--- a/src/nouveau/src/nouveau_index_manager.erl
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -36,9 +36,6 @@
     handle_info/2
 ]).
 
-% config_listener api
--export([handle_config_change/5, handle_config_terminate/3]).
-
 -export([handle_db_event/3]).
 
 -define(BY_DBSIG, nouveau_by_dbsig).
@@ -60,8 +57,6 @@ init(_) ->
     ets:new(?BY_DBSIG, [set, named_table]),
     ets:new(?BY_REF, [set, named_table]),
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
-    configure_ibrowse(nouveau_util:nouveau_url()),
-    ok = config:listen_for_changes(?MODULE, nil),
     {ok, nil}.
 
 handle_call({update, #index{} = Index0}, From, State) ->
@@ -131,31 +126,3 @@ handle_db_event(DbName, deleted, State) ->
     {ok, State};
 handle_db_event(_DbName, _Event, State) ->
     {ok, State}.
-
-handle_config_change("nouveau", "url", URL, _Persist, State) ->
-    configure_ibrowse(URL),
-    {ok, State};
-handle_config_change(_Section, _Key, _Value, _Persist, State) ->
-    {ok, State}.
-
-handle_config_terminate(_Server, stop, _State) ->
-    ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(
-        5000,
-        whereis(?MODULE),
-        restart_config_listener
-    ).
-
-configure_ibrowse(URL) ->
-    #{host := Host, port := Port} = uri_string:parse(URL),
-    ibrowse:set_max_sessions(
-        Host,
-        Port,
-        nouveau_util:max_sessions()
-    ),
-    ibrowse:set_max_pipeline_size(
-        Host,
-        Port,
-        nouveau_util:max_pipeline_size()
-    ).
diff --git a/src/nouveau/src/nouveau_index_updater.erl 
b/src/nouveau/src/nouveau_index_updater.erl
index efed245db..8514de0ed 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -33,10 +33,7 @@
     changes_done,
     total_changes,
     exclude_idrevs,
-    reqids,
-    conn_pid,
-    update_seq,
-    max_pipeline_size
+    update_seq
 }).
 
 -record(purge_acc, {
@@ -79,12 +76,11 @@ update(#index{} = Index) ->
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
-                {ok, ConnPid} = 
ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()),
                 PurgeAcc0 = #purge_acc{
                     index_update_seq = IndexUpdateSeq,
                     index_purge_seq = IndexPurgeSeq
                 },
-                {ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0),
+                {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0),
 
                 NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
@@ -98,18 +94,13 @@ update(#index{} = Index) ->
                         changes_done = 0,
                         total_changes = TotalChanges,
                         exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
-                        reqids = queue:new(),
-                        conn_pid = ConnPid,
-                        update_seq = PurgeAcc1#purge_acc.index_update_seq,
-                        max_pipeline_size = nouveau_util:max_pipeline_size()
+                        update_seq = PurgeAcc1#purge_acc.index_update_seq
                     },
                     {ok, Acc1} = couch_db:fold_changes(
                         Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
                     ),
-                    nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
-                    exit(nouveau_api:set_update_seq(ConnPid, Index, 
Acc1#acc.update_seq, NewCurSeq))
+                    exit(nouveau_api:set_update_seq(Index, 
Acc1#acc.update_seq, NewCurSeq))
                 after
-                    ibrowse:stop_worker_process(ConnPid),
                     ret_os_process(Proc)
                 end
         end
@@ -119,11 +110,7 @@ update(#index{} = Index) ->
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
     {ok, Acc};
-load_docs(FDI, #acc{} = Acc0) ->
-    %% block for responses so we stay under the max pipeline size
-    ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, 
Acc0#acc.max_pipeline_size),
-    Acc1 = Acc0#acc{reqids = ReqIds1},
-
+load_docs(FDI, #acc{} = Acc1) ->
     couch_task_status:update([
         {changes_done, Acc1#acc.changes_done},
         {progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes}
@@ -138,7 +125,6 @@ load_docs(FDI, #acc{} = Acc0) ->
             false ->
                 case
                     update_or_delete_index(
-                        Acc1#acc.conn_pid,
                         Acc1#acc.db,
                         Acc1#acc.index,
                         Acc1#acc.update_seq,
@@ -146,10 +132,9 @@ load_docs(FDI, #acc{} = Acc0) ->
                         Acc1#acc.proc
                     )
                 of
-                    {ibrowse_req_id, ReqId} ->
+                    {ok, _, _, _} ->
                         Acc1#acc{
-                            update_seq = DI#doc_info.high_seq,
-                            reqids = queue:in(ReqId, Acc1#acc.reqids)
+                            update_seq = DI#doc_info.high_seq
                         };
                     {error, Reason} ->
                         exit({error, Reason})
@@ -157,11 +142,11 @@ load_docs(FDI, #acc{} = Acc0) ->
         end,
     {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
 
-update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = 
DI, Proc) ->
+update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) 
->
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} 
= DI,
     case Del of
         true ->
-            nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq);
+            nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
         false ->
             {ok, Doc} = couch_db:open_doc(Db, DI, []),
             Json = couch_doc:to_json_obj(Doc, []),
@@ -175,10 +160,10 @@ update_or_delete_index(ConnPid, Db, #index{} = Index, 
MatchSeq, #doc_info{} = DI
                 end,
             case Fields of
                 [] ->
-                    nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, 
Seq);
+                    nouveau_api:delete_doc(Index, Id, MatchSeq, Seq);
                 _ ->
-                    nouveau_api:update_doc_async(
-                        ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields
+                    nouveau_api:update_doc(
+                        Index, Id, MatchSeq, Seq, Partition, Fields
                     )
             end
     end.
@@ -223,7 +208,7 @@ index_definition(#index{} = Index) ->
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
 
-purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
+purge_index(Db, Index, #purge_acc{} = PurgeAcc0) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, 
<<"nouveau">>]),
@@ -232,7 +217,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
                 case couch_db:get_full_doc_info(Db, Id) of
                     not_found ->
                         ok = nouveau_api:purge_doc(
-                            ConnPid, Index, Id, 
PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
+                            Index, Id, PurgeAcc1#purge_acc.index_purge_seq, 
PurgeSeq
                         ),
                         PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
                     FDI ->
@@ -243,7 +228,6 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
                                 PurgeAcc1;
                             false ->
                                 update_or_delete_index(
-                                    ConnPid,
                                     Db,
                                     Index,
                                     PurgeAcc1#purge_acc.index_update_seq,
@@ -265,7 +249,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
         ),
         DbPurgeSeq = couch_db:get_purge_seq(Db),
         ok = nouveau_api:set_purge_seq(
-            ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
+            Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
         ),
         update_local_doc(Db, Index, DbPurgeSeq),
         {ok, PurgeAcc3}
diff --git a/src/nouveau/src/nouveau_sup.erl b/src/nouveau/src/nouveau_sup.erl
index 3547b43fa..65afe744a 100644
--- a/src/nouveau/src/nouveau_sup.erl
+++ b/src/nouveau/src/nouveau_sup.erl
@@ -23,6 +23,7 @@ start_link() ->
 
 init(_Args) ->
     Children = [
+        child(nouveau_gun),
         child(nouveau_index_manager)
     ],
     {ok, {{one_for_one, 10, 1}, couch_epi:register_service(nouveau_epi, 
Children)}}.
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index b6dd0fcbd..b467b54a9 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -28,8 +28,7 @@
     get_local_purge_doc_id/1,
     get_local_purge_doc_body/3,
     nouveau_url/0,
-    max_sessions/0,
-    max_pipeline_size/0
+    max_sessions/0
 ]).
 
 index_name(Path) when is_binary(Path) ->
@@ -201,6 +200,3 @@ nouveau_url() ->
 
 max_sessions() ->
     config:get_integer("nouveau", "max_sessions", 100).
-
-max_pipeline_size() ->
-    config:get_integer("nouveau", "max_pipeline_size", 1000).

Reply via email to