This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-gun-http2 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 61c03f93cf9f2b3ae37ad0287f0e5f97857c4e68 Author: Robert Newson <[email protected]> AuthorDate: Thu Jun 26 22:16:09 2025 +0100 nouveau: use http/2 we lose the pipelining benefit for indexing as http/2 does not support it (and we need the requests that update the index to happen in the order we issue them). --- .gitignore | 2 + .../java/org/apache/couchdb/nouveau/api/Ok.java | 26 +++ .../couchdb/nouveau/resources/IndexResource.java | 26 ++- rebar.config.script | 2 + rel/nouveau.yaml | 4 +- rel/reltool.config | 4 + src/nouveau/src/nouveau.app.src | 2 +- src/nouveau/src/nouveau_api.erl | 244 +++++++-------------- src/nouveau/src/nouveau_gun.erl | 142 ++++++++++++ src/nouveau/src/nouveau_index_manager.erl | 33 --- src/nouveau/src/nouveau_index_updater.erl | 46 ++-- src/nouveau/src/nouveau_sup.erl | 1 + src/nouveau/src/nouveau_util.erl | 10 +- 13 files changed, 289 insertions(+), 253 deletions(-) diff --git a/.gitignore b/.gitignore index 080a7dd6f..d46777512 100644 --- a/.gitignore +++ b/.gitignore @@ -53,10 +53,12 @@ src/couch/priv/couchspawnkillable src/couch/priv/couch_ejson_compare/couch_ejson_compare.d src/couch/priv/couch_js/**/*.d src/couch/priv/icu_driver/couch_icu_driver.d +src/cowlib/ src/mango/src/mango_cursor_text.nocompile src/excoveralls/ src/fauxton/ src/folsom/ +src/gun/ src/hackney/ src/hqueue/ src/ibrowse/ diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java new file mode 100644 index 000000000..b393e1978 --- /dev/null +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/Ok.java @@ -0,0 +1,26 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Ok { + + public static final Ok INSTANCE = new Ok(); + + @JsonProperty + public boolean ok() { + return true; + } +} diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index a6ca2c47b..a52e00da9 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -35,6 +35,7 @@ import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexDefinition; import org.apache.couchdb.nouveau.api.IndexInfo; import org.apache.couchdb.nouveau.api.IndexInfoRequest; +import org.apache.couchdb.nouveau.api.Ok; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.core.IndexManager; @@ -54,27 +55,29 @@ public final class IndexResource { } @PUT - public void createIndex(@PathParam("name") String name, @NotNull @Valid IndexDefinition indexDefinition) + public Ok createIndex(@PathParam("name") String name, @NotNull @Valid IndexDefinition indexDefinition) throws IOException { indexManager.create(name, indexDefinition); + return Ok.INSTANCE; } @DELETE @Path("/doc/{docId}") - public void deleteDoc( + public Ok deleteDoc( @PathParam("name") String name, @PathParam("docId") String docId, @NotNull @Valid DocumentDeleteRequest request) throws Exception { - indexManager.with(name, (index) -> { + return indexManager.with(name, (index) -> { index.delete(docId, request); - return null; + return Ok.INSTANCE; }); } @DELETE - public void deletePath(@PathParam("name") String path, @Valid final List<String> exclusions) throws IOException { + public Ok deletePath(@PathParam("name") String path, @Valid final List<String> exclusions) throws IOException { indexManager.deleteAll(path, exclusions); + return Ok.INSTANCE; } @GET @@ -85,9 +88,8 @@ public final class IndexResource { } @POST - public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) - throws Exception { - indexManager.with(name, (index) -> { + public Ok setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) throws Exception { + return indexManager.with(name, (index) -> { if (request.getMatchUpdateSeq().isPresent() && request.getUpdateSeq().isPresent()) { index.setUpdateSeq( @@ -99,7 +101,7 @@ public final class IndexResource { request.getMatchPurgeSeq().getAsLong(), request.getPurgeSeq().getAsLong()); } - return null; + return Ok.INSTANCE; }); } @@ -114,14 +116,14 @@ public final class IndexResource { @PUT @Path("/doc/{docId}") - public void updateDoc( + public Ok updateDoc( @PathParam("name") String name, @PathParam("docId") String docId, @NotNull @Valid DocumentUpdateRequest request) throws Exception { - indexManager.with(name, (index) -> { + return indexManager.with(name, (index) -> { index.update(docId, request); - return null; + return Ok.INSTANCE; }); } } diff --git a/rebar.config.script b/rebar.config.script index 1f82036cd..0ac822c38 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -158,6 +158,8 @@ DepDescs = [ {fauxton, {url, "https://github.com/apache/couchdb-fauxton"}, {tag, "v1.3.4"}, [raw]}, {ibrowse, "ibrowse", {tag, "CouchDB-4.4.2-6"}}, +{gun, {url, "https://github.com/ninenines/gun"}, + {tag, "2.2.0"}}, {jiffy, "jiffy", {tag, "1.1.2"}}, {mochiweb, "mochiweb", {tag, "v3.2.2"}}, {meck, "meck", {tag, "1.0.0"}}, diff --git a/rel/nouveau.yaml b/rel/nouveau.yaml index 40837f12c..0f33d5f25 100644 --- a/rel/nouveau.yaml +++ b/rel/nouveau.yaml @@ -8,12 +8,12 @@ logging: server: applicationConnectors: - - type: http + - type: h2c bindHost: 127.0.0.1 port: {{nouveau_port}} useDateHeader: false adminConnectors: - - type: http + - type: h2c bindHost: 127.0.0.1 port: {{nouveau_admin_port}} useDateHeader: false diff --git a/rel/reltool.config b/rel/reltool.config index c1f0ea070..b85bd49b6 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -48,6 +48,8 @@ ets_lru, fabric, global_changes, + gun, + cowlib, ibrowse, ioq, jiffy, @@ -111,6 +113,8 @@ {app, ets_lru, [{incl_cond, include}]}, {app, fabric, [{incl_cond, include}]}, {app, global_changes, [{incl_cond, include}]}, + {app, gun, [{incl_cond, include}]}, + {app, cowlib, [{incl_cond, include}]}, {app, ibrowse, [{incl_cond, include}]}, {app, ioq, [{incl_cond, include}]}, {app, jiffy, [{incl_cond, include}]}, diff --git a/src/nouveau/src/nouveau.app.src b/src/nouveau/src/nouveau.app.src index 0828437c1..e8ea54915 100644 --- a/src/nouveau/src/nouveau.app.src +++ b/src/nouveau/src/nouveau.app.src @@ -18,7 +18,7 @@ {vsn, git}, {applications, [ config, - ibrowse, + gun, kernel, stdlib, mem3, diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index b700524f7..6ae85d0b9 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -23,13 +23,12 @@ create_index/2, delete_path/1, delete_path/2, - delete_doc_async/5, - purge_doc/5, - update_doc_async/7, + delete_doc/4, + purge_doc/4, + update_doc/6, search/2, - set_purge_seq/4, - set_update_seq/4, - drain_async_responses/2, + set_purge_seq/3, + set_update_seq/3, jaxrs_error/2 ]). @@ -40,13 +39,13 @@ analyze(Text, Analyzer) when -> ReqBody = {[{<<"text">>, Text}, {<<"analyzer">>, Analyzer}]}, Resp = send_if_enabled( - nouveau_util:nouveau_url() ++ "/analyze", + "/analyze", [?JSON_CONTENT_TYPE], - post, + <<"POST">>, jiffy:encode(ReqBody) ), case Resp of - {ok, "200", _, RespBody} -> + {ok, 200, _, RespBody} -> Json = jiffy:decode(RespBody, [return_maps]), {ok, maps:get(<<"tokens">>, Json)}; {ok, StatusCode, _, RespBody} -> @@ -58,9 +57,9 @@ analyze(_, _) -> {error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty strings">>}}. index_info(#index{} = Index) -> - Resp = send_if_enabled(index_url(Index), [], get), + Resp = send_if_enabled(index_path(Index), [], <<"GET">>), case Resp of - {ok, "200", _, RespBody} -> + {ok, 200, _, RespBody} -> {ok, jiffy:decode(RespBody, [return_maps])}; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; @@ -70,10 +69,10 @@ index_info(#index{} = Index) -> create_index(#index{} = Index, IndexDefinition) -> Resp = send_if_enabled( - index_url(Index), [?JSON_CONTENT_TYPE], put, jiffy:encode(IndexDefinition) + index_path(Index), [?JSON_CONTENT_TYPE], <<"PUT">>, jiffy:encode(IndexDefinition) ), case Resp of - {ok, "204", _, _} -> + {ok, 200, _, _} -> ok; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; @@ -88,10 +87,10 @@ delete_path(Path, Exclusions) when is_binary(Path), is_list(Exclusions) -> Resp = send_if_enabled( - index_path(Path), [?JSON_CONTENT_TYPE], delete, jiffy:encode(Exclusions) + index_path(Path), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(Exclusions) ), case Resp of - {ok, "204", _, _} -> + {ok, 200, _, _} -> ok; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; @@ -99,8 +98,7 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when - is_pid(ConnPid), +delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -108,19 +106,14 @@ delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when UpdateSeq > 0 -> ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, - send_direct_if_enabled( - ConnPid, - doc_url(Index, DocId), + send_if_enabled( + doc_path(Index, DocId), [?JSON_CONTENT_TYPE], - delete, - jiffy:encode(ReqBody), - [ - {stream_to, self()} - ] + <<"DELETE">>, + jiffy:encode(ReqBody) ). -purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when - is_pid(ConnPid), +purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -128,11 +121,11 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when PurgeSeq > 0 -> ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, - Resp = send_direct_if_enabled( - ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody), [] + Resp = send_if_enabled( + doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) ), case Resp of - {ok, "204", _, _} -> + {ok, 200, _, _} -> ok; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; @@ -140,8 +133,7 @@ purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when send_error(Reason) end. -update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when - is_pid(ConnPid), +update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -156,25 +148,21 @@ update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, Partitio partition => Partition, fields => Fields }, - send_direct_if_enabled( - ConnPid, - doc_url(Index, DocId), + send_if_enabled( + doc_path(Index, DocId), [?JSON_CONTENT_TYPE], - put, - jiffy:encode(ReqBody), - [ - {stream_to, self()} - ] + <<"PUT">>, + jiffy:encode(ReqBody) ). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( - search_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(QueryArgs) + search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(QueryArgs) ), case Resp of - {ok, "200", _, RespBody} -> + {ok, 200, _, RespBody} -> {ok, jiffy:decode(RespBody, [return_maps])}; - {ok, "409", _, _} -> + {ok, 409, _, _} -> %% Index was not current enough. {error, stale_index}; {ok, StatusCode, _, RespBody} -> @@ -183,26 +171,26 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) -> +set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) -> ReqBody = #{ match_update_seq => MatchSeq, update_seq => UpdateSeq }, - set_seq(ConnPid, Index, ReqBody). + set_seq(Index, ReqBody). -set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) -> +set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) -> ReqBody = #{ match_purge_seq => MatchSeq, purge_seq => PurgeSeq }, - set_seq(ConnPid, Index, ReqBody). + set_seq(Index, ReqBody). -set_seq(ConnPid, #index{} = Index, ReqBody) -> - Resp = send_direct_if_enabled( - ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody), [] +set_seq(#index{} = Index, ReqBody) -> + Resp = send_if_enabled( + index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(ReqBody) ), case Resp of - {ok, "204", _, _} -> + {ok, 200, _, _} -> ok; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; @@ -210,90 +198,54 @@ set_seq(ConnPid, #index{} = Index, ReqBody) -> send_error(Reason) end. -%% wait for enough async responses to reduce the Queue to Min length. -drain_async_responses(Queue0, Min) when Min >= 0 -> - case queue:len(Queue0) > Min of - true -> - {{value, ReqId}, Queue1} = queue:out(Queue0), - wait_for_response(ReqId), - drain_async_responses(Queue1, Min); - false -> - Queue0 - end. - -wait_for_response(ReqId) -> - case drain_async_response(ReqId) of - {ok, "204", _Headers, _Body} -> - ok; - {ok, StatusCode, _Headers, RespBody} -> - exit({error, jaxrs_error(StatusCode, RespBody)}) - end. - -drain_async_response(ReqId) -> - drain_async_response(ReqId, undefined, undefined, undefined). - -drain_async_response(ReqId, Code0, Headers0, Body0) -> - receive - {ibrowse_async_headers, ReqId, Code1, Headers1} -> - drain_async_response(ReqId, Code1, Headers1, Body0); - {ibrowse_async_response, ReqId, Body1} -> - drain_async_response(ReqId, Code0, Headers0, Body1); - {ibrowse_async_response_end, ReqId} -> - {ok, Code0, Headers0, Body0} - end. - %% private functions -index_path(Path) -> +index_path(Path) when is_binary(Path) -> lists:flatten( io_lib:format( - "~s/index/~s", + "/index/~s", [ - nouveau_util:nouveau_url(), couch_util:url_encode(Path) ] ) - ). - -index_url(#index{} = Index) -> + ); +index_path(#index{} = Index) -> lists:flatten( io_lib:format( - "~s/index/~s", + "/index/~s", [ - nouveau_util:nouveau_url(), couch_util:url_encode(nouveau_util:index_name(Index)) ] ) ). -doc_url(#index{} = Index, DocId) -> +doc_path(#index{} = Index, DocId) -> lists:flatten( io_lib:format( - "~s/index/~s/doc/~s", + "/index/~s/doc/~s", [ - nouveau_util:nouveau_url(), couch_util:url_encode(nouveau_util:index_name(Index)), couch_util:url_encode(DocId) ] ) ). -search_url(IndexName) -> - index_url(IndexName) ++ "/search". +search_path(IndexName) -> + index_path(IndexName) ++ "/search". -jaxrs_error("400", Body) -> +jaxrs_error(400, Body) -> {bad_request, message(Body)}; -jaxrs_error("404", Body) -> +jaxrs_error(404, Body) -> {not_found, message(Body)}; -jaxrs_error("405", Body) -> +jaxrs_error(405, Body) -> {method_not_allowed, message(Body)}; -jaxrs_error("409", Body) -> +jaxrs_error(409, Body) -> {conflict, message(Body)}; -jaxrs_error("417", Body) -> +jaxrs_error(417, Body) -> {expectation_failed, message(Body)}; -jaxrs_error("422", Body) -> +jaxrs_error(422, Body) -> {bad_request, lists:join(" and ", errors(Body))}; -jaxrs_error("500", Body) -> +jaxrs_error(500, Body) -> {internal_server_error, message(Body)}. send_error({conn_failed, _}) -> @@ -309,71 +261,33 @@ errors(Body) -> Json = jiffy:decode(Body, [return_maps]), maps:get(<<"errors">>, Json). -send_if_enabled(Url, Header, Method) -> - send_if_enabled(Url, Header, Method, []). - -send_if_enabled(Url, Header, Method, Body) -> - send_if_enabled(Url, Header, Method, Body, []). - -send_if_enabled(Url, Header, Method, Body, Options0) -> - case nouveau:enabled() of - true -> - Options1 = ibrowse_options(Options0), - retry_if_connection_closes(fun() -> - ibrowse:send_req(Url, Header, Method, Body, Options1) - end); - false -> - {error, nouveau_not_enabled} - end. +send_if_enabled(Path, ReqHeaders, Method) -> + send_if_enabled(Path, ReqHeaders, Method, <<>>). -send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options0) -> +send_if_enabled(Path, ReqHeaders, Method, ReqBody) -> case nouveau:enabled() of true -> - Options1 = ibrowse_options(Options0), - retry_if_connection_closes(fun() -> - ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, Options1) - end); + case nouveau_gun:conn_info() of + disconnected -> + {error, <<"no connection to nouveau server">>}; + {ConnPid, MRef} -> + StreamRef = gun:request( + ConnPid, + Method, + Path, + ReqHeaders, + ReqBody + ), + case gun:await(ConnPid, StreamRef, MRef) of + {response, fin, Status, RespHeaders} -> + {ok, Status, RespHeaders, []}; + {response, nofin, Status, RespHeaders} -> + {ok, RespBody} = gun:await_body(ConnPid, StreamRef, MRef), + {ok, Status, RespHeaders, RespBody}; + {error, Reason} -> + {error, Reason} + end + end; false -> {error, nouveau_not_enabled} end. - -retry_if_connection_closes(Fun) -> - MaxRetries = max(1, config:get_integer("nouveau", "max_retries", 5)), - retry_if_connection_closes(Fun, MaxRetries). - -retry_if_connection_closes(_Fun, 0) -> - {error, connection_closed}; -retry_if_connection_closes(Fun, N) when is_integer(N), N > 0 -> - case Fun() of - {error, connection_closed} -> - couch_stats:increment_counter([nouveau, connection_closed_errors]), - timer:sleep(1000), - retry_if_connection_closes(Fun, N - 1); - Else -> - Else - end. - -ibrowse_options(BaseOptions) when is_list(BaseOptions) -> - CACertFile = config:get("nouveau", "ssl_cacert_file"), - KeyFile = config:get("nouveau", "ssl_key_file"), - CertFile = config:get("nouveau", "ssl_cert_file"), - Password = config:get("nouveau", "ssl_password"), - if - KeyFile /= undefined andalso CertFile /= undefined -> - CertKeyConf0 = #{ - certfile => CertFile, - keyfile => KeyFile, - password => Password, - cacertfile => CACertFile - }, - CertKeyConf1 = maps:filter(fun remove_undefined/2, CertKeyConf0), - SSLOptions = [{certs_keys, [CertKeyConf1]}], - [{ssl_options, SSLOptions} | BaseOptions]; - true -> - BaseOptions - end. - -remove_undefined(_Key, undefined) -> - false; -remove_undefined(_Key, _Value) -> - true. diff --git a/src/nouveau/src/nouveau_gun.erl b/src/nouveau/src/nouveau_gun.erl new file mode 100644 index 000000000..4de1c6189 --- /dev/null +++ b/src/nouveau/src/nouveau_gun.erl @@ -0,0 +1,142 @@ +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +%% index manager ensures only one process is updating a nouveau index at a time. +%% calling update_index will block until at least one attempt has been made to +%% make the index as current as the database at the time update_index was called. + +-module(nouveau_gun). +-behaviour(gen_server). +-behaviour(config_listener). + +-export([start_link/0]). +-export([conn_info/0]). + +%%% gen_server callbacks +-export([init/1]). +-export([handle_call/3]). +-export([handle_cast/2]). +-export([handle_info/2]). + +% config_listener callbacks +-export([handle_config_change/5]). +-export([handle_config_terminate/3]). + +-record(state, {conn_pid, mref}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +conn_info() -> + persistent_term:get(?MODULE, disconnected). + +init(_) -> + {ConnPid, MRef} = start_gun(), + ok = config:listen_for_changes(?MODULE, nil), + {ok, #state{conn_pid = ConnPid, mref = MRef}}. + +handle_call(_Msg, _From, State) -> + {reply, unexpected_msg, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(restart_gun, State) -> + {NewConnPid, NewMRef} = start_gun(), + {noreply, State#state{conn_pid = NewConnPid, mref = NewMRef}}; +handle_info({gun_up, ConnPid, http2}, #state{conn_pid = ConnPid} = State) -> + persistent_term:put(?MODULE, {ConnPid, State#state.mref}), + couch_log:info("Connection to nouveau server established", []), + {noreply, State}; +handle_info({gun_down, _Pid, http2, Reason, _KilledStreams}, State) -> + couch_log:info("Connection to nouveau server lost: ~p", [Reason]), + {noreply, State}; +handle_info({'DOWN', MRef, process, _ConnPid, Reason}, #state{mref = MRef} = State) -> + persistent_term:erase(?MODULE), + couch_log:warning("Connection to nouveau server down for reason: ~p", [Reason]), + erlang:send_after( + 500, + whereis(?MODULE), + restart_gun + ), + {noreply, State}; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; +handle_info(Msg, State) -> + couch_log:warning("~p received unexpected message: ~p", [?MODULE, Msg]), + {noreply, State}. + +handle_config_change("nouveau", "url", URL, _Persist, State) -> + start_gun(URL), + case conn_info() of + disconnected -> + ok; + {ConnPid, _MRef} -> + gun:close(ConnPid) + end, + {ok, State}; +handle_config_change(_Section, _Key, _Value, _Persist, State) -> + {ok, State}. + +handle_config_terminate(_Server, stop, _State) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after( + 500, + whereis(?MODULE), + restart_config_listener + ). + +%% private functions + +start_gun() -> + start_gun(nouveau_util:nouveau_url()). + +start_gun(URL) -> + #{host := Host, port := Port, scheme := Scheme} = uri_string:parse(URL), + CACertFile = config:get("nouveau", "ssl_cacert_file"), + KeyFile = config:get("nouveau", "ssl_key_file"), + CertFile = config:get("nouveau", "ssl_cert_file"), + Password = config:get("nouveau", "ssl_password"), + Transport = scheme_to_transport(Scheme), + BaseOptions = #{transport => Transport, protocols => [http2]}, + Options = + if + Transport == tls andalso KeyFile /= undefined andalso CertFile /= undefined -> + CertKeyConf0 = #{ + certfile => CertFile, + keyfile => KeyFile, + password => Password, + cacertfile => CACertFile + }, + CertKeyConf1 = maps:filter(fun remove_undefined/2, CertKeyConf0), + BaseOptions#{ + tls_opts => [{certs_keys, [CertKeyConf1]}] + }; + true -> + BaseOptions + end, + {ok, ConnPid} = gun:open(Host, Port, Options), + MRef = monitor(process, ConnPid), + {ConnPid, MRef}. + +remove_undefined(_Key, Value) -> + Value /= undefined. + +scheme_to_transport("http") -> + tcp; +scheme_to_transport("https") -> + tls. diff --git a/src/nouveau/src/nouveau_index_manager.erl b/src/nouveau/src/nouveau_index_manager.erl index eb18bc6f7..45f7a1e8d 100644 --- a/src/nouveau/src/nouveau_index_manager.erl +++ b/src/nouveau/src/nouveau_index_manager.erl @@ -36,9 +36,6 @@ handle_info/2 ]). -% config_listener api --export([handle_config_change/5, handle_config_terminate/3]). - -export([handle_db_event/3]). -define(BY_DBSIG, nouveau_by_dbsig). @@ -60,8 +57,6 @@ init(_) -> ets:new(?BY_DBSIG, [set, named_table]), ets:new(?BY_REF, [set, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), - configure_ibrowse(nouveau_util:nouveau_url()), - ok = config:listen_for_changes(?MODULE, nil), {ok, nil}. handle_call({update, #index{} = Index0}, From, State) -> @@ -131,31 +126,3 @@ handle_db_event(DbName, deleted, State) -> {ok, State}; handle_db_event(_DbName, _Event, State) -> {ok, State}. - -handle_config_change("nouveau", "url", URL, _Persist, State) -> - configure_ibrowse(URL), - {ok, State}; -handle_config_change(_Section, _Key, _Value, _Persist, State) -> - {ok, State}. - -handle_config_terminate(_Server, stop, _State) -> - ok; -handle_config_terminate(_Server, _Reason, _State) -> - erlang:send_after( - 5000, - whereis(?MODULE), - restart_config_listener - ). - -configure_ibrowse(URL) -> - #{host := Host, port := Port} = uri_string:parse(URL), - ibrowse:set_max_sessions( - Host, - Port, - nouveau_util:max_sessions() - ), - ibrowse:set_max_pipeline_size( - Host, - Port, - nouveau_util:max_pipeline_size() - ). diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index efed245db..8514de0ed 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -33,10 +33,7 @@ changes_done, total_changes, exclude_idrevs, - reqids, - conn_pid, - update_seq, - max_pipeline_size + update_seq }). -record(purge_acc, { @@ -79,12 +76,11 @@ update(#index{} = Index) -> %% update status every half second couch_task_status:set_update_frequency(500), - {ok, ConnPid} = ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()), PurgeAcc0 = #purge_acc{ index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0), + {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), @@ -98,18 +94,13 @@ update(#index{} = Index) -> changes_done = 0, total_changes = TotalChanges, exclude_idrevs = PurgeAcc1#purge_acc.exclude_list, - reqids = queue:new(), - conn_pid = ConnPid, - update_seq = PurgeAcc1#purge_acc.index_update_seq, - max_pipeline_size = nouveau_util:max_pipeline_size() + update_seq = PurgeAcc1#purge_acc.index_update_seq }, {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), - nouveau_api:drain_async_responses(Acc1#acc.reqids, 0), - exit(nouveau_api:set_update_seq(ConnPid, Index, Acc1#acc.update_seq, NewCurSeq)) + exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) after - ibrowse:stop_worker_process(ConnPid), ret_os_process(Proc) end end @@ -119,11 +110,7 @@ update(#index{} = Index) -> load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) -> {ok, Acc}; -load_docs(FDI, #acc{} = Acc0) -> - %% block for responses so we stay under the max pipeline size - ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, Acc0#acc.max_pipeline_size), - Acc1 = Acc0#acc{reqids = ReqIds1}, - +load_docs(FDI, #acc{} = Acc1) -> couch_task_status:update([ {changes_done, Acc1#acc.changes_done}, {progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes} @@ -138,7 +125,6 @@ load_docs(FDI, #acc{} = Acc0) -> false -> case update_or_delete_index( - Acc1#acc.conn_pid, Acc1#acc.db, Acc1#acc.index, Acc1#acc.update_seq, @@ -146,10 +132,9 @@ load_docs(FDI, #acc{} = Acc0) -> Acc1#acc.proc ) of - {ibrowse_req_id, ReqId} -> + {ok, _, _, _} -> Acc1#acc{ - update_seq = DI#doc_info.high_seq, - reqids = queue:in(ReqId, Acc1#acc.reqids) + update_seq = DI#doc_info.high_seq }; {error, Reason} -> exit({error, Reason}) @@ -157,11 +142,11 @@ load_docs(FDI, #acc{} = Acc0) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -175,10 +160,10 @@ update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = DI end, case Fields of [] -> - nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); _ -> - nouveau_api:update_doc_async( - ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields + nouveau_api:update_doc( + Index, Id, MatchSeq, Seq, Partition, Fields ) end end. @@ -223,7 +208,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -232,7 +217,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> ok = nouveau_api:purge_doc( - ConnPid, Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; FDI -> @@ -243,7 +228,6 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> PurgeAcc1; false -> update_or_delete_index( - ConnPid, Db, Index, PurgeAcc1#purge_acc.index_update_seq, @@ -265,7 +249,7 @@ purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) -> ), DbPurgeSeq = couch_db:get_purge_seq(Db), ok = nouveau_api:set_purge_seq( - ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq + Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq ), update_local_doc(Db, Index, DbPurgeSeq), {ok, PurgeAcc3} diff --git a/src/nouveau/src/nouveau_sup.erl b/src/nouveau/src/nouveau_sup.erl index 3547b43fa..65afe744a 100644 --- a/src/nouveau/src/nouveau_sup.erl +++ b/src/nouveau/src/nouveau_sup.erl @@ -23,6 +23,7 @@ start_link() -> init(_Args) -> Children = [ + child(nouveau_gun), child(nouveau_index_manager) ], {ok, {{one_for_one, 10, 1}, couch_epi:register_service(nouveau_epi, Children)}}. diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl index b6dd0fcbd..ad0ce6663 100644 --- a/src/nouveau/src/nouveau_util.erl +++ b/src/nouveau/src/nouveau_util.erl @@ -27,9 +27,7 @@ maybe_create_local_purge_doc/2, get_local_purge_doc_id/1, get_local_purge_doc_body/3, - nouveau_url/0, - max_sessions/0, - max_pipeline_size/0 + nouveau_url/0 ]). index_name(Path) when is_binary(Path) -> @@ -198,9 +196,3 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) -> nouveau_url() -> config:get("nouveau", "url", "http://127.0.0.1:5987"). - -max_sessions() -> - config:get_integer("nouveau", "max_sessions", 100). - -max_pipeline_size() -> - config:get_integer("nouveau", "max_pipeline_size", 1000).
