This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 6e4d6c31e Implement db doc updating
6e4d6c31e is described below
commit 6e4d6c31e071a115dabdb16651313de86fb470b8
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Sep 30 01:54:21 2025 -0400
Implement db doc updating
Resharding implements db doc updates with extra steps to ensure some
resilience
in case the replication ring is broken and to avoid generating conflicts.
However, the implementation was specific to resharding used the `#job{}`
record
as an argument, so change it make it more general and expose it as a top
level
mem3 API.
The pattern is the same as used in the resharding logic:
* Changes, reading and writing happen on the first live node -- the leader
* Before reading, replicate all the update from other nodes to the leader.
This is to handle the case when the replication ring may be broken and
to
avoid conflicts.
* After updating, to ensure all copies get the update immediately, the
change is
broadcast to all the live copies.
---
src/mem3/src/mem3.erl | 7 ++
src/mem3/src/mem3_db_doc_updater.erl | 107 ++++++++++++++++++++++++
src/mem3/src/mem3_reshard_dbdoc.erl | 135 +++++++------------------------
src/mem3/src/mem3_reshard_sup.erl | 3 -
src/mem3/src/mem3_sup.erl | 1 +
src/mem3/test/eunit/mem3_shards_test.erl | 69 +++++++++-------
6 files changed, 183 insertions(+), 139 deletions(-)
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index f9978e892..ddd507210 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -39,6 +39,7 @@
-export([db_is_current/1]).
-export([shard_creation_time/1]).
-export([generate_shard_suffix/0]).
+-export([get_db_doc/1, update_db_doc/1]).
%% For mem3 use only.
-export([name/1, node/1, range/1]).
@@ -579,6 +580,12 @@ strip_shard_suffix(DbName) when is_binary(DbName) ->
filename:rootname(DbName)
end.
+get_db_doc(DocId) ->
+ mem3_db_doc_updater:get_db_doc(DocId).
+
+update_db_doc(Doc) ->
+ mem3_db_doc_updater:update_db_doc(Doc).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/mem3/src/mem3_db_doc_updater.erl
b/src/mem3/src/mem3_db_doc_updater.erl
new file mode 100644
index 000000000..f5537f8c9
--- /dev/null
+++ b/src/mem3/src/mem3_db_doc_updater.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_db_doc_updater).
+
+-behaviour(gen_server).
+
+-export([
+ get_db_doc/1,
+ update_db_doc/1,
+
+ start_link/0,
+
+ init/1,
+ handle_call/3,
+ handle_cast/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+% Early return shortcut
+%
+-define(THROW(RES), throw({reply, RES, nil})).
+
+get_db_doc(DocId) when is_binary(DocId) ->
+ Timeout = shard_update_timeout_msec(),
+ gen_server:call(first_node(), {get_db_doc, DocId}, Timeout).
+
+update_db_doc(#doc{} = Doc) ->
+ Timeout = shard_update_timeout_msec(),
+ gen_server:call(first_node(), {update_db_doc, Doc}, Timeout).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+ {ok, nil}.
+
+handle_call({get_db_doc, DocId}, _From, nil = St) ->
+ {reply, get_db_doc_int(DocId), St};
+handle_call({update_db_doc, #doc{} = Doc}, _From, nil = St) ->
+ {reply, update_db_doc_int(Doc), St};
+handle_call(Msg, _From, nil = St) ->
+ {stop, {invalid_call, Msg}, invalid_call, St}.
+
+handle_cast(Msg, nil = St) ->
+ {stop, {invalid_cast, Msg}, St}.
+
+% Private
+
+update_db_doc_int(#doc{} = Doc) ->
+ ok = validate_coordinator(),
+ couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
+ try
+ Res = couch_db:update_doc(Db, Doc, [?ADMIN_CTX]),
+ ok = replicate_to_all_nodes(shard_update_timeout_msec()),
+ Res
+ catch
+ conflict ->
+ ?THROW({error, conflict})
+ end
+ end).
+
+get_db_doc_int(DocId) ->
+ ok = validate_coordinator(),
+ ok = replicate_from_all_nodes(shard_update_timeout_msec()),
+ couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
+ case couch_db:open_doc(Db, DocId, [ejson_body]) of
+ {ok, #doc{deleted = true}} -> ?THROW({error, not_found});
+ {ok, #doc{} = Doc} -> {ok, Doc};
+ {not_found, _} -> ?THROW({error, not_found})
+ end
+ end).
+
+validate_coordinator() ->
+ case hd(mem3_util:live_nodes()) =:= node() of
+ true -> ok;
+ false -> ?THROW({error, coordinator_changed})
+ end.
+
+replicate_from_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> ?THROW({error, Error})
+ end.
+
+replicate_to_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> ?THROW({error, Error})
+ end.
+
+shard_update_timeout_msec() ->
+ config:get_integer("mem3", "shard_update_timeout_msec", 300000).
+
+first_node() ->
+ FirstNode = hd(mem3_util:live_nodes()),
+ {?MODULE, FirstNode}.
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl
b/src/mem3/src/mem3_reshard_dbdoc.erl
index c39444806..e2f0d0c50 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -12,129 +12,48 @@
-module(mem3_reshard_dbdoc).
--behaviour(gen_server).
-
-export([
- update_shard_map/1,
-
- start_link/0,
-
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2
+ update_shard_map/1
]).
-include_lib("couch/include/couch_db.hrl").
-include("mem3_reshard.hrl").
--spec update_shard_map(#job{}) -> no_return | ok.
update_shard_map(#job{source = Source, target = Target} = Job) ->
- Node = hd(mem3_util:live_nodes()),
+ DocId = mem3:dbname(Source#shard.name),
JobStr = mem3_reshard_job:jobfmt(Job),
- LogMsg1 = "~p : ~p calling update_shard_map node:~p",
- couch_log:notice(LogMsg1, [?MODULE, JobStr, Node]),
- ServerRef = {?MODULE, Node},
- CallArg = {update_shard_map, Source, Target},
- TimeoutMSec = shard_update_timeout_msec(),
+ LogMsg1 = "~p : ~p calling update_shard_map",
+ couch_log:notice(LogMsg1, [?MODULE, JobStr]),
try
- case gen_server:call(ServerRef, CallArg, TimeoutMSec) of
- {ok, _} -> ok;
- {error, CallError} -> throw({error, CallError})
+ case mem3:get_db_doc(DocId) of
+ {ok, #doc{} = Doc} ->
+ #doc{body = Body} = Doc,
+ NewBody = update_shard_props(Body, Source, Target),
+ NewDoc = Doc#doc{body = NewBody},
+ case mem3:update_db_doc(NewDoc) of
+ {ok, _} ->
+ ok;
+ {error, UpdateError} ->
+ exit(UpdateError)
+ end,
+ LogMsg2 = "~p : ~p update_shard_map returned",
+ couch_log:notice(LogMsg2, [?MODULE, JobStr]),
+ TimeoutMSec = shard_update_timeout_msec(),
+ UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
+ case wait_source_removed(Source, 5, UntilSec) of
+ true ->
+ ok;
+ false ->
+ exit(shard_update_did_not_propagate)
+ end;
+ Error ->
+ exit(Error)
end
catch
_:Err ->
exit(Err)
- end,
- LogMsg2 = "~p : ~p update_shard_map on node:~p returned",
- couch_log:notice(LogMsg2, [?MODULE, JobStr, Node]),
- UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
- case wait_source_removed(Source, 5, UntilSec) of
- true -> ok;
- false -> exit(shard_update_did_not_propagate)
- end.
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init(_) ->
- couch_log:notice("~p start init()", [?MODULE]),
- {ok, nil}.
-
-handle_call({update_shard_map, Source, Target}, _From, State) ->
- Res =
- try
- update_shard_map(Source, Target)
- catch
- throw:{error, Error} ->
- {error, Error}
- end,
- {reply, Res, State};
-handle_call(Call, From, State) ->
- couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
- {noreply, State}.
-
-handle_cast(Cast, State) ->
- couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
- {noreply, State}.
-
-handle_info(Info, State) ->
- couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
- {noreply, State}.
-
-% Private
-
-update_shard_map(Source, Target) ->
- ok = validate_coordinator(),
- ok = replicate_from_all_nodes(shard_update_timeout_msec()),
- DocId = mem3:dbname(Source#shard.name),
- OldDoc =
- case mem3_util:open_db_doc(DocId) of
- {ok, #doc{deleted = true}} ->
- throw({error, missing_source});
- {ok, #doc{} = Doc} ->
- Doc;
- {not_found, deleted} ->
- throw({error, missing_source});
- OpenErr ->
- throw({error, {shard_doc_open_error, OpenErr}})
- end,
- #doc{body = OldBody} = OldDoc,
- NewBody = update_shard_props(OldBody, Source, Target),
- {ok, _} = write_shard_doc(OldDoc, NewBody),
- ok = replicate_to_all_nodes(shard_update_timeout_msec()),
- {ok, NewBody}.
-
-validate_coordinator() ->
- case hd(mem3_util:live_nodes()) =:= node() of
- true -> ok;
- false -> throw({error, coordinator_changed})
- end.
-
-replicate_from_all_nodes(TimeoutMSec) ->
- case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
- ok -> ok;
- Error -> throw({error, Error})
end.
-replicate_to_all_nodes(TimeoutMSec) ->
- case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
- ok -> ok;
- Error -> throw({error, Error})
- end.
-
-write_shard_doc(#doc{id = Id} = Doc, Body) ->
- UpdatedDoc = Doc#doc{body = Body},
- couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
- try
- {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
- catch
- conflict ->
- throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}})
- end
- end).
-
update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) ->
{ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}),
ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}},
diff --git a/src/mem3/src/mem3_reshard_sup.erl
b/src/mem3/src/mem3_reshard_sup.erl
index 5a28359fb..42da19f7c 100644
--- a/src/mem3/src/mem3_reshard_sup.erl
+++ b/src/mem3/src/mem3_reshard_sup.erl
@@ -24,9 +24,6 @@ start_link() ->
init(_Args) ->
Children = [
- {mem3_reshard_dbdoc, {mem3_reshard_dbdoc, start_link, []}, permanent,
infinity, worker, [
- mem3_reshard_dbdoc
- ]},
{mem3_reshard_job_sup, {mem3_reshard_job_sup, start_link, []},
permanent, infinity,
supervisor, [mem3_reshard_job_sup]},
{mem3_reshard, {mem3_reshard, start_link, []}, permanent, brutal_kill,
worker, [
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index 96e8ac394..dc8bd854e 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -48,6 +48,7 @@ init(_Args) ->
child(mem3_sync),
child(mem3_sync_event_listener),
child(mem3_seeds),
+ child(mem3_db_doc_updater),
child(mem3_reshard_sup)
],
{ok, {{rest_for_one, 10, 1}, couch_epi:register_service(mem3_epi,
Children)}}.
diff --git a/src/mem3/test/eunit/mem3_shards_test.erl
b/src/mem3/test/eunit/mem3_shards_test.erl
index 6d2766fa2..900098d8c 100644
--- a/src/mem3/test/eunit/mem3_shards_test.erl
+++ b/src/mem3/test/eunit/mem3_shards_test.erl
@@ -49,7 +49,8 @@ mem3_shards_db_create_props_test_() ->
fun setup/0,
fun teardown/1,
[
- fun partitioned_shards_recreated_properly/1
+ ?TDEF_FE(partitioned_shards_recreated_properly, ?TIMEOUT),
+ ?TDEF_FE(update_props, ?TIMEOUT)
]
}
}
@@ -61,33 +62,45 @@ mem3_shards_db_create_props_test_() ->
% properties.
% SEE: apache/couchdb#3631
partitioned_shards_recreated_properly(#{dbname := DbName, dbdoc := DbDoc}) ->
- {timeout, ?TIMEOUT,
- ?_test(begin
- #doc{body = {Body0}} = DbDoc,
- Body1 = [{<<"foo">>, <<"bar">>} | Body0],
- Shards = [Shard | _] = lists:sort(mem3:shards(DbName)),
- ShardName = Shard#shard.name,
- ?assert(is_partitioned(Shards)),
- ok = with_proc(fun() -> couch_server:delete(ShardName, []) end),
- ?assertThrow({not_found, no_db_file}, is_partitioned(Shard)),
- ok = mem3_util:update_db_doc(DbDoc#doc{body = {Body1}}),
- Shards =
- [Shard | _] = test_util:wait_value(
- fun() ->
- lists:sort(mem3:shards(DbName))
- end,
- Shards
- ),
- ?assertEqual(
- true,
- test_util:wait_value(
- fun() ->
- catch is_partitioned(Shard)
- end,
- true
- )
- )
- end)}.
+ #doc{body = {Body0}} = DbDoc,
+ Body1 = [{<<"foo">>, <<"bar">>} | Body0],
+ Shards = [Shard | _] = lists:sort(mem3:shards(DbName)),
+ ShardName = Shard#shard.name,
+ ?assert(is_partitioned(Shards)),
+ ok = with_proc(fun() -> couch_server:delete(ShardName, []) end),
+ ?assertThrow({not_found, no_db_file}, is_partitioned(Shard)),
+ ok = mem3_util:update_db_doc(DbDoc#doc{body = {Body1}}),
+ Shards =
+ [Shard | _] = test_util:wait_value(
+ fun() ->
+ lists:sort(mem3:shards(DbName))
+ end,
+ Shards
+ ),
+ ?assertEqual(
+ true,
+ test_util:wait_value(
+ fun() ->
+ catch is_partitioned(Shard)
+ end,
+ true
+ )
+ ).
+
+update_props(#{dbname := DbName, dbdoc := DbDoc}) ->
+ {ok, Doc} = mem3:get_db_doc(DbName),
+ ?assertEqual(DbDoc, Doc),
+ #doc{body = {Body0}} = Doc,
+ {Props} = couch_util:get_value(<<"props">>, Body0, {[]}),
+ Props1 = couch_util:set_value(<<"baz">>, Props, <<"bar">>),
+ Body1 = couch_util:set_value(<<"props">>, Body0, {Props1}),
+ ResUpdate = mem3:update_db_doc(Doc#doc{body = {Body1}}),
+ ?assertMatch({ok, _}, ResUpdate),
+ {ok, Doc2} = mem3:get_db_doc(DbName),
+ #doc{body = {Body2}} = Doc2,
+ {Props2} = couch_util:get_value(<<"props">>, Body2, {[]}),
+ ?assertEqual(<<"bar">>, couch_util:get_value(<<"baz">>, Props2)),
+ ?assertEqual({error, conflict}, mem3:update_db_doc(Doc#doc{body =
{Body1}})).
is_partitioned([#shard{} | _] = Shards) ->
lists:all(fun is_partitioned/1, Shards);