diff --git a/doc/edoc-info b/doc/edoc-info index b425e49..9993c3f 100644 --- a/doc/edoc-info +++ b/doc/edoc-info @@ -2,4 +2,5 @@ {application,mnesia_rocksdb}. {modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app, mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup, - mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}. + mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex, + mrdb_mutex_serializer,mrdb_select,mrdb_stats]}. diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 5c1cd43..b2be125 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -326,7 +326,6 @@ call(Alias, Req, Timeout) -> end. start_link() -> - mrdb_mutex:ensure_tab(), gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> @@ -408,12 +407,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs %% We need to store the persistent ref explicitly here, %% since mnesia knows nothing of our admin table. AdminTab = {admin, Alias}, + Stats = mrdb_stats:new(), CfI = update_cf_info(AdminTab, #{ status => open , name => AdminTab , vsn => ?VSN , encoding => {sext,{value,term}} , attr_pos => #{key => 1, value => 2} + , stats => Stats , mountpoint => MP , properties => #{ attributes => [key, val] diff --git a/src/mnesia_rocksdb_sup.erl b/src/mnesia_rocksdb_sup.erl index 1b1706a..4d33130 100644 --- a/src/mnesia_rocksdb_sup.erl +++ b/src/mnesia_rocksdb_sup.erl @@ -42,4 +42,5 @@ start_link() -> %% =================================================================== init([]) -> - {ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }. + {ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker) + , ?CHILD(mnesia_rocksdb_params, worker)]} }. diff --git a/src/mrdb.erl b/src/mrdb.erl index 4b982a6..99a2a81 100644 --- a/src/mrdb.erl +++ b/src/mrdb.erl @@ -117,7 +117,9 @@ | index() | retainer(). --type retries() :: non_neg_integer(). +-type inner() :: non_neg_integer(). +-type outer() :: non_neg_integer(). +-type retries() :: outer() | {inner(), outer()}. %% activity type 'ets' makes no sense in this context -type mnesia_activity_type() :: transaction @@ -143,7 +145,7 @@ -type tx_activity() :: #{ type := 'tx' , handle := tx_handle() - , attempt := non_neg_integer() }. + , attempt := 'undefined' | retries() }. -type batch_activity() :: #{ type := 'batch' , handle := batch_handle() }. -type activity() :: tx_activity() | batch_activity(). @@ -256,6 +258,14 @@ release_snapshot(SHandle) -> %% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with %% the commit set. It will then be re-run again, until the retry count is exhausted. %% +%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a +%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by +%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries +%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last +%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts +%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first +%% attempt, but only on outer retries (the first retry is always an outer retry). +%% %% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'. %% %% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and @@ -277,32 +287,66 @@ activity(Type, Alias, F) -> , alias => Alias , db_ref => DbRef } end, - do_activity(F, Alias, Ctxt, false). + do_activity(F, Alias, Ctxt). -do_activity(F, Alias, Ctxt, WithLock) -> - try run_f(F, Ctxt, WithLock, Alias) of +do_activity(F, Alias, Ctxt) -> + try try_f(F, Ctxt) + catch + throw:{?MODULE, busy} -> + retry_activity(F, Alias, Ctxt) + end. + +try_f(F, Ctxt) -> + try run_f(F, Ctxt) of Res -> - try commit_and_pop(Res) - catch - throw:{?MODULE, busy} -> - do_activity(F, Alias, Ctxt, true) - end + commit_and_pop(Res) catch Cat:Err -> abort_and_pop(Cat, Err) end. --spec run_f(_, #{'activity':=#{'handle':=_, 'type':='batch' | 'tx', 'attempt'=>1, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, 'alias':=_, 'db_ref':=_, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, boolean(), _) -> any(). -run_f(F, Ctxt, false, _) -> +run_f(F, Ctxt) -> push_ctxt(Ctxt), - F(); -run_f(F, Ctxt, true, Alias) -> - push_ctxt(incr_attempt(Ctxt)), - mrdb_mutex:do(Alias, F). + F(). -incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) -> +incr_attempt(0, {_,O}) when O > 0 -> + {outer, {0,1}}; +incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O), + is_integer(Ri), is_integer(Ro) -> + if I < Ri -> {inner, {I+1, O}}; + O < Ro -> {outer, {0, O+1}}; + true -> + error + end; +incr_attempt(_, _) -> + error. + +retry_activity(F, Alias, #{activity := #{ type := Type + , attempt := A + , retries := R} = Act} = Ctxt) -> + case incr_attempt(A, R) of + {RetryCtxt, A1} -> + Act1 = Act#{attempt := A1}, + Ctxt1 = Ctxt#{activity := Act1}, + try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1)) + catch + throw:{?MODULE, busy} -> + retry_activity(F, Alias, Ctxt1) + end; + error -> + return_abort(Type, error, retry_limit) + end. + +retry_activity_(inner, F, Alias, Ctxt) -> + mrdb_stats:incr(Alias, inner_retries, 1), + try_f(F, Ctxt); +retry_activity_(outer, F, Alias, Ctxt) -> + mrdb_stats:incr(Alias, outer_retries, 1), + mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end). + +restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) -> {ok, TxH} = rdb_transaction(DbRef, []), - Act1 = Act#{attempt := A+1, handle := TxH}, + Act1 = Act#{handle := TxH}, C1 = C#{ activity := Act1 }, case maps:is_key(snapshot, C) of true -> @@ -375,10 +419,14 @@ check_tx_opts(Opts) -> end. check_retries(#{retries := Retries} = Opts) -> - if is_integer(Retries), Retries >= 0 -> - Opts; - true -> - error({invalid_tx_option, {retries, Retries}}) + case Retries of + _ when is_integer(Retries), Retries >= 0 -> + Opts#{retries := {0, Retries}}; + {Inner, Outer} when is_integer(Inner), is_integer(Outer), + Inner >= 0, Outer >= 0 -> + Opts; + _ -> + error({invalid_tx_option, {retries, Retries}}) end. check_nosnap(#{no_snapshot := NoSnap} = Opts) -> @@ -393,7 +441,7 @@ create_tx(Opts, DbRef) -> {ok, TxH} = rdb_transaction(DbRef, []), Opts#{activity => maps:merge(Opts, #{ type => tx , handle => TxH - , attempt => 1})}. + , attempt => 0 })}. maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) -> case NoSnap of @@ -413,8 +461,7 @@ commit_and_pop(Res) -> Res; {error, {error, "Resource busy" ++ _ = Busy}} -> case A of - #{retries := Retries, attempt := Att} - when Att =< Retries -> + #{retries := {I,O}} when I > 0; O > 0 -> throw({?MODULE, busy}); _ -> error({error, Busy}) @@ -530,7 +577,7 @@ new_tx(#{activity := _}, _) -> new_tx(Tab, Opts) -> #{db_ref := DbRef} = R = ensure_ref(Tab), {ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)), - R#{activity => #{type => tx, handle => TxH, attempt => 1}}. + R#{activity => #{type => tx, handle => TxH, attempt => 0}}. -spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref(). tx_ref(Tab, TxH) -> @@ -540,7 +587,7 @@ tx_ref(Tab, TxH) -> #{activity := #{type := tx, handle := OtherTxH}} -> error({tx_handle_conflict, OtherTxH}); R -> - R#{activity => #{type => tx, handle => TxH, attempt => 1}} + R#{activity => #{type => tx, handle => TxH, attempt => 0}} end. -spec tx_commit(tx_handle() | db_ref()) -> ok. diff --git a/src/mrdb_mutex.erl b/src/mrdb_mutex.erl index d745ae1..b32aca4 100644 --- a/src/mrdb_mutex.erl +++ b/src/mrdb_mutex.erl @@ -3,79 +3,107 @@ -export([ do/2 ]). --export([ ensure_tab/0 ]). +-include_lib("eunit/include/eunit.hrl"). --define(LOCK_TAB, ?MODULE). - -%% We use a wrapping ets counter (default: 0) as a form of semaphor. -%% The claim operation is done using an atomic list of two updates: -%% first, incrementing with 0 - this returns the previous value -%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back, -%% regardless of previous value. This means that if [0,1] is returned, the resource -%% was not locked previously; if [1,1] is returned, it was. +%% We use a gen_server-based FIFO queue (one queue per alias) to manage the +%% critical section. %% -%% Releasing the resource is done by deleting the resource. If we just decrement, -%% we will end up with lingering unlocked resources, so we might as well delete. -%% Either operation is atomic, and the claim op creates the object if it's missing. +%% Releasing the resource is done by notifying the server. +%% +%% Previous implementations tested: +%% * A counter (in ets), incremented atomically first with 0, then 1. This lets +%% the caller know if the 'semaphore' was 1 or zero before. If it was already +%% 1, busy-loop over the counter until it reads as a transition from 0 to 1. +%% This is a pretty simple implementation, but it lacks ordering, and appeared +%% to sometimes lead to starvation. +%% * A duplicate_bag ets table: These are defined such that insertion order is +%% preserved. Thus, they can serve as a mutex with FIFO characteristics. +%% Unfortunately, performance plummeted when tested with > 25 concurrent +%% processes. While this is likely a very high number, given that we're talking +%% about contention in a system using optimistic concurrency, it's never good +%% if performance falls off a cliff. do(Rsrc, F) when is_function(F, 0) -> - true = claim(Rsrc), + {ok, Ref} = mrdb_mutex_serializer:wait(Rsrc), try F() after - release(Rsrc) + mrdb_mutex_serializer:done(Rsrc, Ref) end. -claim(Rsrc) -> - case claim_(Rsrc) of - true -> true; - false -> busy_wait(Rsrc, 1000) +-ifdef(TEST). + +mutex_test_() -> + {foreach, + fun setup/0, + fun cleanup/1, + [ + {"Check that all operations complete", fun swarm_do/0} + ]}. + +setup() -> + case whereis(mrdb_mutex_serializer) of + undefined -> + {ok, Pid} = mrdb_mutex_serializer:start_link(), + Pid; + Pid -> + Pid end. -claim_(Rsrc) -> - case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0}, - {2, 1, 1, 1}], {Rsrc, 0}) of - [0, 1] -> - %% have lock - true; - [1, 1] -> - false - end. +cleanup(Pid) -> + unlink(Pid), + exit(Pid, kill). -%% The busy-wait function makes use of the fact that we can read a timer to find out -%% if it still has time remaining. This reduces the need for selective receive, looking -%% for a timeout message. We yield, then retry the claim op. Yielding at least used to -%% also be necessary for the `read_timer/1` value to refresh. -%% -busy_wait(Rsrc, Timeout) -> - Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}), - do_wait(Rsrc, Ref). - -do_wait(Rsrc, Ref) -> - erlang:yield(), - case erlang:read_timer(Ref) of - false -> - erlang:cancel_timer(Ref), - error(lock_wait_timeout); - _ -> - case claim_(Rsrc) of - true -> - erlang:cancel_timer(Ref), - ok; - false -> - do_wait(Rsrc, Ref) - end - end. - -release(Rsrc) -> - ets:delete(?LOCK_TAB, Rsrc), +swarm_do() -> + Rsrc = ?LINE, + Pid = spawn(fun() -> collect([]) end), + L = lists:seq(1, 1000), + Evens = [X || X <- L, is_even(X)], + Pids = [spawn_monitor(fun() -> + send_even(Rsrc, N, Pid) + end) || N <- lists:seq(1,1000)], + await_pids(Pids), + Results = fetch(Pid), + {incorrect_results, []} = {incorrect_results, Results -- Evens}, + {missing_correct_results, []} = {missing_correct_results, Evens -- Results}, ok. - -%% Called by the process holding the ets table. -ensure_tab() -> - case ets:info(?LOCK_TAB, name) of - undefined -> - ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]); - _ -> - true +collect(Acc) -> + receive + {_, result, N} -> + collect([N|Acc]); + {From, fetch} -> + From ! {fetch_reply, Acc}, + done end. + +fetch(Pid) -> + Pid ! {self(), fetch}, + receive + {fetch_reply, Result} -> + Result + end. + +is_even(N) -> + (N rem 2) =:= 0. + +await_pids([{_, MRef}|Pids]) -> + receive + {'DOWN', MRef, _, _, _} -> + await_pids(Pids) + after 10000 -> + error(timeout) + end; +await_pids([]) -> + ok. + +send_even(Rsrc, N, Pid) -> + do(Rsrc, fun() -> + case is_even(N) of + true -> + Pid ! {self(), result, N}; + false -> + exit(not_even) + end + end). + +-endif. diff --git a/src/mrdb_mutex_serializer.erl b/src/mrdb_mutex_serializer.erl new file mode 100644 index 0000000..eb2e5ac --- /dev/null +++ b/src/mrdb_mutex_serializer.erl @@ -0,0 +1,98 @@ +-module(mrdb_mutex_serializer). + +-behavior(gen_server). + +-export([wait/1, + done/2]). + +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(st, { queues = #{} + , empty_q = queue:new() }). %% perhaps silly optimization + +wait(Rsrc) -> + gen_server:call(?MODULE, {wait, Rsrc}, infinity). + +done(Rsrc, Ref) -> + gen_server:cast(?MODULE, {done, Rsrc, Ref}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #st{}}. + +handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues + , empty_q = NewQ } = St) -> + MRef = erlang:monitor(process, Pid), + Q0 = maps:get(Rsrc, Queues, NewQ), + WasEmpty = queue:is_empty(Q0), + Q1 = queue:in({From, MRef}, Q0), + St1 = St#st{ queues = Queues#{Rsrc => Q1} }, + case WasEmpty of + true -> + {reply, {ok, MRef}, St1}; + false -> + {noreply, St1} + end. + +handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) -> + case maps:find(Rsrc, Queues) of + {ok, Q} -> + case queue:out(Q) of + {{value, {_From, MRef}}, Q1} -> + erlang:demonitor(MRef), + ok = maybe_dispatch_one(Q1), + {noreply, St#st{ queues = Queues#{Rsrc := Q1} }}; + {_, _} -> + %% Not the lock holder + {noreply, St} + end; + error -> + {noreply, St} + end. + +handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) -> + Queues1 = + maps:map( + fun(_Rsrc, Q) -> + drop_or_filter(Q, MRef) + end, Queues), + {noreply, St#st{ queues = Queues1 }}; +handle_info(_, St) -> + {noreply, St}. + +terminate(_, _) -> + ok. + +code_change(_FromVsn, St, _Extra) -> + {ok, St}. + +%% In this function, we don't actually pop +maybe_dispatch_one(Q) -> + case queue:peek(Q) of + empty -> + ok; + {value, {From, MRef}} -> + gen_server:reply(From, {ok, MRef}), + ok + end. + +drop_or_filter(Q, MRef) -> + case queue:peek(Q) of + {value, {_, MRef}} -> + Q1 = queue:drop(Q), + ok = maybe_dispatch_one(Q1), + Q1; + {value, _Other} -> + queue:filter(fun({_,M}) -> M =/= MRef end, Q); + empty -> + Q + end. diff --git a/src/mrdb_stats.erl b/src/mrdb_stats.erl new file mode 100644 index 0000000..78c5d3d --- /dev/null +++ b/src/mrdb_stats.erl @@ -0,0 +1,74 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% @doc Statistics API for the mnesia_rocksdb plugin +%% +%% Some counters are maintained for each active alias. Currently, the following +%% counters are supported: +%% * inner_retries +%% * outer_retries +%% +-module(mrdb_stats). + +-export([new/0]). + +-export([incr/3, + get/1, + get/2]). + +-type alias() :: mnesia_rocksdb:alias(). +-type db_ref() :: mrdb:db_ref(). +-type counter() :: atom(). +-type increment() :: integer(). + +-type counters() :: #{ counter() := integer() }. + +new() -> + #{ ref => counters:new(map_size(ctr_meta()), [write_concurrency]) + , meta => ctr_meta()}. + +ctr_meta() -> + #{ inner_retries => 1 + , outer_retries => 2 }. + +-spec incr(alias() | db_ref(), counter(), increment()) -> ok. +%% @doc Increment `Ctr' counter for `Alias` with increment `N'. +%% +%% Note that the first argument may also be a `db_ref()' map, +%% corresponding to `mrdb:get_ref({admin, Alias})'. +%% @end +incr(Alias, Ctr, N) when is_atom(Alias) -> + #{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}), + incr_(Ref, Meta, Ctr, N); +incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) -> + incr_(Ref, Meta, Ctr, N). + +-spec get(alias() | db_ref(), counter()) -> integer(). +%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'. +%% @end +get(Alias, Ctr) when is_atom(Alias) -> + #{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}), + get_(Ref, Meta, Ctr); +get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) -> + get_(Ref, Meta, Ctr). + +-spec get(alias() | db_ref()) -> counters(). +%% @doc Fetches all known counters for `Alias', in the form of a map, +%% `#{Counter => Value}'. +%% @end +get(Alias) when is_atom(Alias) -> + get_(mrdb:get_ref({admin, Alias})); +get(Ref) when is_map(Ref) -> + get_(Ref). + +get_(#{stats := #{ref := Ref, meta := Meta}}) -> + lists:foldl( + fun({K, P}, M) -> + M#{K := counters:get(Ref, P)} + end, Meta, maps:to_list(Meta)). + +get_(Ref, Meta, Attr) -> + Ix = maps:get(Attr, Meta), + counters:get(Ref, Ix). + +incr_(Ref, Meta, Attr, N) -> + Ix = maps:get(Attr, Meta), + counters:add(Ref, Ix, N). diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl index 5c4d1eb..3927646 100644 --- a/test/mnesia_rocksdb_SUITE.erl +++ b/test/mnesia_rocksdb_SUITE.erl @@ -24,6 +24,7 @@ , mrdb_abort/1 , mrdb_two_procs/1 , mrdb_two_procs_tx_restart/1 + , mrdb_two_procs_tx_inner_restart/1 , mrdb_two_procs_snap/1 , mrdb_three_procs/1 ]). @@ -53,6 +54,7 @@ groups() -> , mrdb_abort , mrdb_two_procs , mrdb_two_procs_tx_restart + , mrdb_two_procs_tx_inner_restart , mrdb_two_procs_snap , mrdb_three_procs ]} ]. @@ -287,10 +289,17 @@ mrdb_abort(Config) -> mrdb_two_procs(Config) -> tr_ct:with_trace(fun mrdb_two_procs_/1, Config, tr_flags( - {self(), [call, sos, p]}, + {self(), [call, sos, p, 'receive']}, tr_patterns( mrdb, [ {mrdb, insert, 2, x} , {mrdb, read, 2, x} + , {mrdb, retry_activity, 3, x} + , {mrdb, try_f, 2, x} + , {mrdb, incr_attempt, 2, x} + , {mrdb_mutex, do, 2, x} + , {mrdb_mutex_serializer, do, 2, x} + , {?MODULE, wait_for_other, 2, x} + , {?MODULE, go_ahead_other, 1, x} , {mrdb, activity, x}], tr_opts()))). mrdb_two_procs_(Config) -> @@ -314,11 +323,16 @@ mrdb_two_procs_(Config) -> Pre = mrdb:read(R, a), go_ahead_other(POther), await_other_down(POther, MRef, ?LINE), + %% The test proc is still inside the transaction, and POther + %% has terminated/committed. If we now read 'a', we should see + %% the value that POther wrote (no isolation). [{R, a, 17}] = mrdb:read(R, a), ok = mrdb:insert(R, {R, a, 18}) end, - go_ahead_other(1, POther), + go_ahead_other(0, POther), Do0 = get_dict(), + %% Our transaction should fail due to the resource conflict, and because + %% we're not using retries. try mrdb:activity({tx, #{no_snapshot => true, retries => 0}}, rdb, F1) of ok -> error(unexpected) @@ -339,6 +353,7 @@ mrdb_two_procs_tx_restart_(Config) -> R = ?FUNCTION_NAME, Parent = self(), Created = create_tabs([{R, []}], Config), + check_stats(rdb), mrdb:insert(R, {R, a, 1}), Pre = mrdb:read(R, a), F0 = fun() -> @@ -354,7 +369,7 @@ mrdb_two_procs_tx_restart_(Config) -> OtherWrite = [{R, a, 17}], Att = get_attempt(), Expected = case Att of - 1 -> Pre; + 0 -> Pre; _ -> OtherWrite end, Expected = mrdb:read(R, a), @@ -363,14 +378,88 @@ mrdb_two_procs_tx_restart_(Config) -> OtherWrite = mrdb:read(R, a), ok = mrdb:insert(R, {R, a, 18}) end, - go_ahead_other(1, POther), + go_ahead_other(0, POther), Do0 = get_dict(), mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1), dictionary_unchanged(Do0), + check_stats(rdb), [{R, a, 18}] = mrdb:read(R, a), delete_tabs(Created), ok. +mrdb_two_procs_tx_inner_restart(Config) -> + tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config, + dbg_tr_opts()). + +mrdb_two_procs_tx_inner_restart_(Config) -> + R = ?FUNCTION_NAME, + Parent = self(), + Created = create_tabs([{R, []}], Config), + mrdb:insert(R, {R, a, 1}), + mrdb:insert(R, {R, b, 1}), + PreA = mrdb:read(R, a), + PreB = mrdb:read(R, b), + #{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb), + F0 = fun() -> + wait_for_other(Parent, ?LINE), + ok = mrdb:insert(R, {R, a, 17}), + wait_for_other(Parent, ?LINE) + end, + F1 = fun() -> + wait_for_other(Parent, ?LINE), + ok = mrdb:insert(R, {R, b, 147}), + wait_for_other(Parent, ?LINE) + end, + + Spawn = fun(F) -> + Res = spawn_opt( + fun() -> + ok = mrdb:activity(tx, rdb, F) + end, [monitor]), + Res + end, + {P1, MRef1} = Spawn(F0), + {P2, MRef2} = Spawn(F1), + F2 = fun() -> + PostA = [{R, a, 17}], % once F0 (P1) has committed + PostB = [{R, b, 147}], % once F1 (P2) has committed + ARes = mrdb:read(R, a), % We need to read first to establish a pre-image + BRes = mrdb:read(R, b), + Att = get_attempt(), + ct:log("Att = ~p", [Att]), + case Att of + 0 -> % This is the first run (no retry yet) + ARes = PreA, + BRes = PreB, + go_ahead_other(0, P1), % Having our pre-image, now let P1 write + go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal) + await_other_down(P1, MRef1, ?LINE), + PostA = mrdb:read(R, a); % now, P1's write should leak through here + {0, 1} -> % This is the first (outer) retry + go_ahead_other(0, P2), % Let P2 write + go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal) + await_other_down(P2, MRef2, ?LINE), + PostA = mrdb:read(R, a), % now we should see writes from both P1 + PostB = mrdb:read(R, b); % ... and P2 + {1, 1} -> + PostA = mrdb:read(R, a), + PostB = mrdb:read(R, b), + ok + end, + mrdb:insert(R, {R, a, 18}), + mrdb:insert(R, {R, b, 18}) + end, + Do0 = get_dict(), + mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2), + check_stats(rdb), + dictionary_unchanged(Do0), + [{R, a, 18}] = mrdb:read(R, a), + [{R, b, 18}] = mrdb:read(R, b), + #{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb), + {restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}}, + delete_tabs(Created), + ok. + % %% For testing purposes, we use side-effects inside the transactions @@ -383,7 +472,7 @@ mrdb_two_procs_tx_restart_(Config) -> %% attempt, and ignore the sync ops on retries. %% -define(IF_FIRST(N, Expr), - if N == 1 -> + if N == 0 -> Expr; true -> ok @@ -413,8 +502,8 @@ mrdb_two_procs_snap(Config) -> go_ahead_other(Att, POther), ARes = mrdb:read(R, a), ARes = case Att of - 1 -> Pre; - 2 -> [{R, a, 17}] + 0 -> Pre; + _ -> [{R, a, 17}] end, await_other_down(POther, MRef, ?LINE), PreB = mrdb:read(R, b), @@ -434,7 +523,7 @@ mrdb_two_procs_snap(Config) -> %% We make sure that P2 commits before finishing the other two, and P3 and the %% main thread sync, so as to maximize the contention for the retry lock. mrdb_three_procs(Config) -> - tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()). + tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()). mrdb_three_procs_(Config) -> R = ?FUNCTION_NAME, @@ -452,7 +541,7 @@ mrdb_three_procs_(Config) -> spawn_opt(fun() -> D0 = get_dict(), do_when_p_allows( - 1, Parent, ?LINE, + 0, Parent, ?LINE, fun() -> ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1) end), @@ -488,8 +577,8 @@ mrdb_three_procs_(Config) -> fun() -> Att = get_attempt(), ARes = case Att of - 1 -> [A0]; - 2 -> [A1] + 0 -> [A0]; + _ -> [A1] end, %% First, ensure that P2 tx is running go_ahead_other(Att, P2), @@ -519,6 +608,7 @@ tr_opts() -> , {?MODULE, await_other_down, 3, x} , {?MODULE, do_when_p_allows, 4, x} , {?MODULE, allow_p, 3, x} + , {?MODULE, check_stats, 1, x} ]}. light_tr_opts() -> @@ -529,6 +619,22 @@ light_tr_opts() -> , {mrdb, read, 2, x} , {mrdb, activity, x} ], tr_opts())). +dbg_tr_opts() -> + tr_flags( + {self(), [call, sos, p, 'receive']}, + tr_patterns( + mrdb, [ {mrdb, insert, 2, x} + , {mrdb, read, 2, x} + , {mrdb, retry_activity, 3, x} + , {mrdb, try_f, 2, x} + , {mrdb, incr_attempt, 2, x} + , {mrdb, current_context, 0, x} + , {mrdb_mutex, do, 2, x} + , {mrdb_mutex_serializer, do, 2, x} + , {?MODULE, wait_for_other, 2, x} + , {?MODULE, go_ahead_other, 1, x} + , {mrdb, activity, x} ], tr_opts())). + tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) -> Pats1 = [P || P <- Pats, element(1,P) =/= Mod], Opts#{patterns => Ps ++ Pats1}. @@ -542,12 +648,13 @@ wait_for_other(Parent, L) -> wait_for_other(Att, Parent, L) -> wait_for_other(Att, Parent, 1000, L). -wait_for_other(1, Parent, Timeout, L) -> +wait_for_other(0, Parent, Timeout, L) -> MRef = monitor(process, Parent), Parent ! {self(), ready}, receive {Parent, cont} -> demonitor(MRef), + Parent ! {self(), cont_ack}, ok; {'DOWN', MRef, _, _, Reason} -> ct:log("Parent died, Reason = ~p", [Reason]), @@ -586,7 +693,13 @@ go_ahead_other(Att, POther, Timeout) -> go_ahead_other_(POther, Timeout) -> receive {POther, ready} -> - POther ! {self(), cont} + POther ! {self(), cont}, + receive + {POther, cont_ack} -> + ok + after Timeout -> + error(cont_ack_timeout) + end after Timeout -> error(go_ahead_timeout) end. @@ -645,3 +758,8 @@ dictionary_unchanged(Old) -> , added := [] } = #{ deleted => Old -- New , added => New -- Old }, ok. + +check_stats(Alias) -> + Stats = mrdb_stats:get(Alias), + ct:log("Stats: ~p", [Stats]), + Stats. diff --git a/test/mrdb_bench.erl b/test/mrdb_bench.erl index 2ddc23e..ed76644 100644 --- a/test/mrdb_bench.erl +++ b/test/mrdb_bench.erl @@ -1,6 +1,6 @@ -module(mrdb_bench). --compile(export_all). +-compile([export_all, nowarn_export_all]). init() -> mnesia:delete_schema([node()]),