This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-mem3_ops in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3893fe7cb1998812772b454548e9587837fcc511 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Oct 6 14:07:26 2025 -0400 Fix props caching in mem3 Previously, in https://github.com/apache/couchdb/pull/5672 we incorrectly stored some of the properties. The properties for the dbs db were stored correclty as `ets:insert(mem3_opts, {db, opts})` while for the other dbs they were stored as `ets:insert(mem3_opts, opts)`. All the requests would still work fine but the cache wouldn't be used as the values would be loaded from the db doc every time. Fix the bug and also adjust the name of `Opts` variable, we're not storing props but opts so rename it to reflect reality and avoid confusion. While at it, update the test suite to use the `?TDEF_FE` macro and get rid of all the `_test(begin ... end)` boilerplate. --- src/mem3/src/mem3_shards.erl | 270 ++++++++++++++++++++----------------------- 1 file changed, 125 insertions(+), 145 deletions(-) diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 5f3b88b02..6f996d4d4 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -54,9 +54,9 @@ opts_for_db(DbName0) when is_binary(DbName0) -> try ets:lookup(?OPTS, DbName) of [] -> load_opts_from_disk(DbName); - [{_, Props}] -> + [{_, Opts}] -> gen_server:cast(?MODULE, {cache_hit, DbName}), - Props + Opts catch error:badarg -> load_opts_from_disk(DbName) @@ -567,7 +567,7 @@ shard_writer(DbName, DbOpts, Shards, IdleTimeout) -> try receive write -> - true = ets:insert(?OPTS, DbOpts), + true = ets:insert(?OPTS, {DbName, DbOpts}), true = ets:insert(?SHARDS, Shards); cancel -> ok @@ -615,7 +615,7 @@ cache_shards_db_props() -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). -define(DB, <<"eunit_db_name">>). -define(INFINITY, 99999999). @@ -630,16 +630,16 @@ mem3_shards_test_() -> fun setup/0, fun teardown/1, [ - t_maybe_spawn_shard_writer_already_exists(), - t_maybe_spawn_shard_writer_new(), - t_flush_writer_exists_normal(), - t_flush_writer_times_out(), - t_flush_writer_crashes(), - t_writer_deletes_itself_when_done(), - t_writer_does_not_delete_other_writers_for_same_shard(), - t_spawn_writer_in_load_shards_from_db(), - t_cache_insert_takes_new_update(), - t_cache_insert_ignores_stale_update_and_kills_worker() + ?TDEF_FE(t_maybe_spawn_shard_writer_already_exists), + ?TDEF_FE(t_maybe_spawn_shard_writer_new), + ?TDEF_FE(t_flush_writer_exists_normal), + ?TDEF_FE(t_flush_writer_times_out), + ?TDEF_FE(t_flush_writer_crashes), + ?TDEF_FE(t_writer_deletes_itself_when_done), + ?TDEF_FE(t_writer_does_not_delete_other_writers_for_same_shard), + ?TDEF_FE(t_spawn_writer_in_load_shards_from_db), + ?TDEF_FE(t_cache_insert_takes_new_update), + ?TDEF_FE(t_cache_insert_ignores_stale_update_and_kills_worker) ] } }. @@ -671,137 +671,117 @@ setup() -> teardown(_) -> ok. -t_maybe_spawn_shard_writer_already_exists() -> - ?_test(begin - ets:insert(?OPENERS, {?DB, self()}), - Shards = mock_shards(), - WRes = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, ?INFINITY), - ?assertEqual(ignore, WRes) - end). - -t_maybe_spawn_shard_writer_new() -> - ?_test(begin - Shards = mock_shards(), - WPid = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, 1000), - WRef = erlang:monitor(process, WPid), - ?assert(is_pid(WPid)), - ?assert(is_process_alive(WPid)), - WPid ! write, - ?assertEqual(normal, wait_writer_result(WRef)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)), - ?assertEqual([{x, y}], ets:tab2list(?OPTS)) - end). - -t_flush_writer_exists_normal() -> - ?_test(begin - Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), - ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)), - ?assertEqual([{x, y}], ets:tab2list(?OPTS)) - end). - -t_flush_writer_times_out() -> - ?_test(begin - WPid = spawn(fun() -> - receive - will_never_receive_this -> ok - end - end), - Error = {mem3_shards_write_timeout, ?DB}, - ?assertExit(Error, flush_write(?DB, WPid, 100)), - exit(WPid, kill) - end). - -t_flush_writer_crashes() -> - ?_test(begin - WPid = spawn(fun() -> - receive - write -> exit('kapow!') - end - end), - Error = {mem3_shards_bad_write, 'kapow!'}, - ?assertExit(Error, flush_write(?DB, WPid, 1000)) - end). - -t_writer_deletes_itself_when_done() -> - ?_test(begin - Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), - WRef = erlang:monitor(process, WPid), - ets:insert(?OPENERS, {?DB, WPid}), - WPid ! write, - ?assertEqual(normal, wait_writer_result(WRef)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)), - ?assertEqual([{x, y}], ets:tab2list(?OPTS)), - ?assertEqual([], ets:tab2list(?OPENERS)) - end). - -t_writer_does_not_delete_other_writers_for_same_shard() -> - ?_test(begin - Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), - WRef = erlang:monitor(process, WPid), - ets:insert(?OPENERS, {?DB, WPid}), - % should not be deleted - ets:insert(?OPENERS, {?DB, self()}), - WPid ! write, - ?assertEqual(normal, wait_writer_result(WRef)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)), - ?assertEqual([{x, y}], ets:tab2list(?OPTS)), - ?assertEqual(1, ets:info(?OPENERS, size)), - ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS)) - end). - -t_spawn_writer_in_load_shards_from_db() -> - ?_test(begin - meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}), - meck:expect(couch_db, get_update_seq, 1, 1), - meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), - % register to get cache_insert cast - erlang:register(?MODULE, self()), - load_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), - meck:validate(couch_db), - meck:validate(mem3_util), - Cast = - receive - {'$gen_cast', Msg} -> Msg - after 1000 -> - timeout - end, - ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast), - {cache_insert, _, WPid, _} = Cast, - exit(WPid, kill), - ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS)), - meck:unload(couch_db), - meck:unload(mem3_util) - end). - -t_cache_insert_takes_new_update() -> - ?_test(begin - Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), - Msg = {cache_insert, ?DB, WPid, 2}, - {noreply, NewState} = handle_cast(Msg, mock_state(1)), - ?assertMatch(#st{cur_size = 1}, NewState), - ?assertEqual(Shards, ets:tab2list(?SHARDS)), - ?assertEqual([{x, y}], ets:tab2list(?OPTS)), - ?assertEqual([], ets:tab2list(?OPENERS)) - end). - -t_cache_insert_ignores_stale_update_and_kills_worker() -> - ?_test(begin - Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), - WRef = erlang:monitor(process, WPid), - Msg = {cache_insert, ?DB, WPid, 1}, - {noreply, NewState} = handle_cast(Msg, mock_state(2)), - ?assertEqual(normal, wait_writer_result(WRef)), - ?assertMatch(#st{cur_size = 0}, NewState), - ?assertEqual([], ets:tab2list(?SHARDS)), - ?assertEqual([], ets:tab2list(?OPTS)), - ?assertEqual([], ets:tab2list(?OPENERS)) - end). +t_maybe_spawn_shard_writer_already_exists(_) -> + ets:insert(?OPENERS, {?DB, self()}), + Shards = mock_shards(), + WRes = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, ?INFINITY), + ?assertEqual(ignore, WRes). + +t_maybe_spawn_shard_writer_new(_) -> + Shards = mock_shards(), + WPid = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, 1000), + WRef = erlang:monitor(process, WPid), + ?assert(is_pid(WPid)), + ?assert(is_process_alive(WPid)), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{?DB, [{x, y}]}], ets:tab2list(?OPTS)). + +t_flush_writer_exists_normal(_) -> + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), + ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{?DB, [{x, y}]}], ets:tab2list(?OPTS)). + +t_flush_writer_times_out(_) -> + WPid = spawn(fun() -> + receive + will_never_receive_this -> ok + end + end), + Error = {mem3_shards_write_timeout, ?DB}, + ?assertExit(Error, flush_write(?DB, WPid, 100)), + exit(WPid, kill). + +t_flush_writer_crashes(_) -> + WPid = spawn(fun() -> + receive + write -> exit('kapow!') + end + end), + Error = {mem3_shards_bad_write, 'kapow!'}, + ?assertExit(Error, flush_write(?DB, WPid, 1000)). + +t_writer_deletes_itself_when_done(_) -> + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{?DB, [{x, y}]}], ets:tab2list(?OPTS)), + ?assertEqual([], ets:tab2list(?OPENERS)). + +t_writer_does_not_delete_other_writers_for_same_shard(_) -> + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + % should not be deleted + ets:insert(?OPENERS, {?DB, self()}), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{?DB, [{x, y}]}], ets:tab2list(?OPTS)), + ?assertEqual(1, ets:info(?OPENERS, size)), + ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS)). + +t_spawn_writer_in_load_shards_from_db(_) -> + meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}), + meck:expect(couch_db, get_update_seq, 1, 1), + meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), + % register to get cache_insert cast + erlang:register(?MODULE, self()), + load_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), + meck:validate(couch_db), + meck:validate(mem3_util), + Cast = + receive + {'$gen_cast', Msg} -> Msg + after 1000 -> + timeout + end, + ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast), + {cache_insert, _, WPid, _} = Cast, + exit(WPid, kill), + ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS)), + meck:unload(couch_db), + meck:unload(mem3_util). + +t_cache_insert_takes_new_update(_) -> + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), + Msg = {cache_insert, ?DB, WPid, 2}, + {noreply, NewState} = handle_cast(Msg, mock_state(1)), + ?assertMatch(#st{cur_size = 1}, NewState), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{?DB, [{x, y}]}], ets:tab2list(?OPTS)), + ?assertEqual([], ets:tab2list(?OPENERS)). + +t_cache_insert_ignores_stale_update_and_kills_worker(_) -> + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + Msg = {cache_insert, ?DB, WPid, 1}, + {noreply, NewState} = handle_cast(Msg, mock_state(2)), + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertMatch(#st{cur_size = 0}, NewState), + ?assertEqual([], ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPTS)), + ?assertEqual([], ets:tab2list(?OPENERS)). mock_state(UpdateSeq) -> #st{
