Merge pull request #35 from aeternity/uw-different-mutex

rewrite transaction retry mutex
This commit is contained in:
Ulf Wiger 2022-11-04 10:06:58 +01:00 committed by GitHub
commit 0aecf5ef01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 474 additions and 106 deletions

View File

@ -2,4 +2,5 @@
{application,mnesia_rocksdb}.
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.

View File

@ -326,7 +326,6 @@ call(Alias, Req, Timeout) ->
end.
start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
@ -408,12 +407,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
%% We need to store the persistent ref explicitly here,
%% since mnesia knows nothing of our admin table.
AdminTab = {admin, Alias},
Stats = mrdb_stats:new(),
CfI = update_cf_info(AdminTab, #{ status => open
, name => AdminTab
, vsn => ?VSN
, encoding => {sext,{value,term}}
, attr_pos => #{key => 1,
value => 2}
, stats => Stats
, mountpoint => MP
, properties =>
#{ attributes => [key, val]

View File

@ -42,4 +42,5 @@ start_link() ->
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.

View File

@ -117,7 +117,9 @@
| index()
| retainer().
-type retries() :: non_neg_integer().
-type inner() :: non_neg_integer().
-type outer() :: non_neg_integer().
-type retries() :: outer() | {inner(), outer()}.
%% activity type 'ets' makes no sense in this context
-type mnesia_activity_type() :: transaction
@ -143,7 +145,7 @@
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := non_neg_integer() }.
, attempt := 'undefined' | retries() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
@ -256,6 +258,14 @@ release_snapshot(SHandle) ->
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
%% the commit set. It will then be re-run again, until the retry count is exhausted.
%%
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
%% attempt, but only on outer retries (the first retry is always an outer retry).
%%
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
%%
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
@ -277,32 +287,66 @@ activity(Type, Alias, F) ->
, alias => Alias
, db_ref => DbRef }
end,
do_activity(F, Alias, Ctxt, false).
do_activity(F, Alias, Ctxt).
do_activity(F, Alias, Ctxt, WithLock) ->
try run_f(F, Ctxt, WithLock, Alias) of
Res ->
try commit_and_pop(Res)
do_activity(F, Alias, Ctxt) ->
try try_f(F, Ctxt)
catch
throw:{?MODULE, busy} ->
do_activity(F, Alias, Ctxt, true)
end
retry_activity(F, Alias, Ctxt)
end.
try_f(F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
Cat:Err ->
abort_and_pop(Cat, Err)
end.
-spec run_f(_, #{'activity':=#{'handle':=_, 'type':='batch' | 'tx', 'attempt'=>1, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, 'alias':=_, 'db_ref':=_, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, boolean(), _) -> any().
run_f(F, Ctxt, false, _) ->
run_f(F, Ctxt) ->
push_ctxt(Ctxt),
F();
run_f(F, Ctxt, true, Alias) ->
push_ctxt(incr_attempt(Ctxt)),
mrdb_mutex:do(Alias, F).
F().
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
incr_attempt(0, {_,O}) when O > 0 ->
{outer, {0,1}};
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
is_integer(Ri), is_integer(Ro) ->
if I < Ri -> {inner, {I+1, O}};
O < Ro -> {outer, {0, O+1}};
true ->
error
end;
incr_attempt(_, _) ->
error.
retry_activity(F, Alias, #{activity := #{ type := Type
, attempt := A
, retries := R} = Act} = Ctxt) ->
case incr_attempt(A, R) of
{RetryCtxt, A1} ->
Act1 = Act#{attempt := A1},
Ctxt1 = Ctxt#{activity := Act1},
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt1)
end;
error ->
return_abort(Type, error, retry_limit)
end.
retry_activity_(inner, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, inner_retries, 1),
try_f(F, Ctxt);
retry_activity_(outer, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, outer_retries, 1),
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Act1 = Act#{attempt := A+1, handle := TxH},
Act1 = Act#{handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of
true ->
@ -375,9 +419,13 @@ check_tx_opts(Opts) ->
end.
check_retries(#{retries := Retries} = Opts) ->
if is_integer(Retries), Retries >= 0 ->
case Retries of
_ when is_integer(Retries), Retries >= 0 ->
Opts#{retries := {0, Retries}};
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
Inner >= 0, Outer >= 0 ->
Opts;
true ->
_ ->
error({invalid_tx_option, {retries, Retries}})
end.
@ -393,7 +441,7 @@ create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 1})}.
, attempt => 0 })}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of
@ -413,8 +461,7 @@ commit_and_pop(Res) ->
Res;
{error, {error, "Resource busy" ++ _ = Busy}} ->
case A of
#{retries := Retries, attempt := Att}
when Att =< Retries ->
#{retries := {I,O}} when I > 0; O > 0 ->
throw({?MODULE, busy});
_ ->
error({error, Busy})
@ -530,7 +577,7 @@ new_tx(#{activity := _}, _) ->
new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) ->
@ -540,7 +587,7 @@ tx_ref(Tab, TxH) ->
#{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH});
R ->
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
end.
-spec tx_commit(tx_handle() | db_ref()) -> ok.

View File

@ -3,79 +3,107 @@
-export([ do/2 ]).
-export([ ensure_tab/0 ]).
-include_lib("eunit/include/eunit.hrl").
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
%% critical section.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
%% Releasing the resource is done by notifying the server.
%%
%% Previous implementations tested:
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
%% This is a pretty simple implementation, but it lacks ordering, and appeared
%% to sometimes lead to starvation.
%% * A duplicate_bag ets table: These are defined such that insertion order is
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
%% Unfortunately, performance plummeted when tested with > 25 concurrent
%% processes. While this is likely a very high number, given that we're talking
%% about contention in a system using optimistic concurrency, it's never good
%% if performance falls off a cliff.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
try F()
after
release(Rsrc)
mrdb_mutex_serializer:done(Rsrc, Ref)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
-ifdef(TEST).
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
setup() ->
case whereis(mrdb_mutex_serializer) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
cleanup(Pid) ->
unlink(Pid),
exit(Pid, kill).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
swarm_do() ->
Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() ->
send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,1000)],
await_pids(Pids),
Results = fetch(Pid),
{incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
Pid ! {self(), result, N};
false ->
exit(not_even)
end
end).
-endif.

View File

@ -0,0 +1,98 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.

74
src/mrdb_stats.erl Normal file
View File

@ -0,0 +1,74 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Statistics API for the mnesia_rocksdb plugin
%%
%% Some counters are maintained for each active alias. Currently, the following
%% counters are supported:
%% * inner_retries
%% * outer_retries
%%
-module(mrdb_stats).
-export([new/0]).
-export([incr/3,
get/1,
get/2]).
-type alias() :: mnesia_rocksdb:alias().
-type db_ref() :: mrdb:db_ref().
-type counter() :: atom().
-type increment() :: integer().
-type counters() :: #{ counter() := integer() }.
new() ->
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
, meta => ctr_meta()}.
ctr_meta() ->
#{ inner_retries => 1
, outer_retries => 2 }.
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
%%
%% Note that the first argument may also be a `db_ref()' map,
%% corresponding to `mrdb:get_ref({admin, Alias})'.
%% @end
incr(Alias, Ctr, N) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
incr_(Ref, Meta, Ctr, N);
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
incr_(Ref, Meta, Ctr, N).
-spec get(alias() | db_ref(), counter()) -> integer().
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
%% @end
get(Alias, Ctr) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
get_(Ref, Meta, Ctr);
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
get_(Ref, Meta, Ctr).
-spec get(alias() | db_ref()) -> counters().
%% @doc Fetches all known counters for `Alias', in the form of a map,
%% `#{Counter => Value}'.
%% @end
get(Alias) when is_atom(Alias) ->
get_(mrdb:get_ref({admin, Alias}));
get(Ref) when is_map(Ref) ->
get_(Ref).
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
lists:foldl(
fun({K, P}, M) ->
M#{K := counters:get(Ref, P)}
end, Meta, maps:to_list(Meta)).
get_(Ref, Meta, Attr) ->
Ix = maps:get(Attr, Meta),
counters:get(Ref, Ix).
incr_(Ref, Meta, Attr, N) ->
Ix = maps:get(Attr, Meta),
counters:add(Ref, Ix, N).

View File

@ -24,6 +24,7 @@
, mrdb_abort/1
, mrdb_two_procs/1
, mrdb_two_procs_tx_restart/1
, mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1
, mrdb_three_procs/1
]).
@ -53,6 +54,7 @@ groups() ->
, mrdb_abort
, mrdb_two_procs
, mrdb_two_procs_tx_restart
, mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap
, mrdb_three_procs ]}
].
@ -287,10 +289,17 @@ mrdb_abort(Config) ->
mrdb_two_procs(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
tr_flags(
{self(), [call, sos, p]},
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) ->
@ -314,11 +323,16 @@ mrdb_two_procs_(Config) ->
Pre = mrdb:read(R, a),
go_ahead_other(POther),
await_other_down(POther, MRef, ?LINE),
%% The test proc is still inside the transaction, and POther
%% has terminated/committed. If we now read 'a', we should see
%% the value that POther wrote (no isolation).
[{R, a, 17}] = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
%% Our transaction should fail due to the resource conflict, and because
%% we're not using retries.
try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of
ok -> error(unexpected)
@ -339,6 +353,7 @@ mrdb_two_procs_tx_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
check_stats(rdb),
mrdb:insert(R, {R, a, 1}),
Pre = mrdb:read(R, a),
F0 = fun() ->
@ -354,7 +369,7 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = [{R, a, 17}],
Att = get_attempt(),
Expected = case Att of
1 -> Pre;
0 -> Pre;
_ -> OtherWrite
end,
Expected = mrdb:read(R, a),
@ -363,14 +378,88 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
check_stats(rdb),
[{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created),
ok.
mrdb_two_procs_tx_inner_restart(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
dbg_tr_opts()).
mrdb_two_procs_tx_inner_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
mrdb:insert(R, {R, a, 1}),
mrdb:insert(R, {R, b, 1}),
PreA = mrdb:read(R, a),
PreB = mrdb:read(R, b),
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
F0 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, a, 17}),
wait_for_other(Parent, ?LINE)
end,
F1 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, b, 147}),
wait_for_other(Parent, ?LINE)
end,
Spawn = fun(F) ->
Res = spawn_opt(
fun() ->
ok = mrdb:activity(tx, rdb, F)
end, [monitor]),
Res
end,
{P1, MRef1} = Spawn(F0),
{P2, MRef2} = Spawn(F1),
F2 = fun() ->
PostA = [{R, a, 17}], % once F0 (P1) has committed
PostB = [{R, b, 147}], % once F1 (P2) has committed
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
BRes = mrdb:read(R, b),
Att = get_attempt(),
ct:log("Att = ~p", [Att]),
case Att of
0 -> % This is the first run (no retry yet)
ARes = PreA,
BRes = PreB,
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
await_other_down(P1, MRef1, ?LINE),
PostA = mrdb:read(R, a); % now, P1's write should leak through here
{0, 1} -> % This is the first (outer) retry
go_ahead_other(0, P2), % Let P2 write
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
await_other_down(P2, MRef2, ?LINE),
PostA = mrdb:read(R, a), % now we should see writes from both P1
PostB = mrdb:read(R, b); % ... and P2
{1, 1} ->
PostA = mrdb:read(R, a),
PostB = mrdb:read(R, b),
ok
end,
mrdb:insert(R, {R, a, 18}),
mrdb:insert(R, {R, b, 18})
end,
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
check_stats(rdb),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b),
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
delete_tabs(Created),
ok.
%
%% For testing purposes, we use side-effects inside the transactions
@ -383,7 +472,7 @@ mrdb_two_procs_tx_restart_(Config) ->
%% attempt, and ignore the sync ops on retries.
%%
-define(IF_FIRST(N, Expr),
if N == 1 ->
if N == 0 ->
Expr;
true ->
ok
@ -413,8 +502,8 @@ mrdb_two_procs_snap(Config) ->
go_ahead_other(Att, POther),
ARes = mrdb:read(R, a),
ARes = case Att of
1 -> Pre;
2 -> [{R, a, 17}]
0 -> Pre;
_ -> [{R, a, 17}]
end,
await_other_down(POther, MRef, ?LINE),
PreB = mrdb:read(R, b),
@ -434,7 +523,7 @@ mrdb_two_procs_snap(Config) ->
%% We make sure that P2 commits before finishing the other two, and P3 and the
%% main thread sync, so as to maximize the contention for the retry lock.
mrdb_three_procs(Config) ->
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
mrdb_three_procs_(Config) ->
R = ?FUNCTION_NAME,
@ -452,7 +541,7 @@ mrdb_three_procs_(Config) ->
spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows(
1, Parent, ?LINE,
0, Parent, ?LINE,
fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end),
@ -488,8 +577,8 @@ mrdb_three_procs_(Config) ->
fun() ->
Att = get_attempt(),
ARes = case Att of
1 -> [A0];
2 -> [A1]
0 -> [A0];
_ -> [A1]
end,
%% First, ensure that P2 tx is running
go_ahead_other(Att, P2),
@ -519,6 +608,7 @@ tr_opts() ->
, {?MODULE, await_other_down, 3, x}
, {?MODULE, do_when_p_allows, 4, x}
, {?MODULE, allow_p, 3, x}
, {?MODULE, check_stats, 1, x}
]}.
light_tr_opts() ->
@ -529,6 +619,22 @@ light_tr_opts() ->
, {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts())).
dbg_tr_opts() ->
tr_flags(
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb, current_context, 0, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x} ], tr_opts())).
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
Opts#{patterns => Ps ++ Pats1}.
@ -542,12 +648,13 @@ wait_for_other(Parent, L) ->
wait_for_other(Att, Parent, L) ->
wait_for_other(Att, Parent, 1000, L).
wait_for_other(1, Parent, Timeout, L) ->
wait_for_other(0, Parent, Timeout, L) ->
MRef = monitor(process, Parent),
Parent ! {self(), ready},
receive
{Parent, cont} ->
demonitor(MRef),
Parent ! {self(), cont_ack},
ok;
{'DOWN', MRef, _, _, Reason} ->
ct:log("Parent died, Reason = ~p", [Reason]),
@ -586,7 +693,13 @@ go_ahead_other(Att, POther, Timeout) ->
go_ahead_other_(POther, Timeout) ->
receive
{POther, ready} ->
POther ! {self(), cont}
POther ! {self(), cont},
receive
{POther, cont_ack} ->
ok
after Timeout ->
error(cont_ack_timeout)
end
after Timeout ->
error(go_ahead_timeout)
end.
@ -645,3 +758,8 @@ dictionary_unchanged(Old) ->
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.
check_stats(Alias) ->
Stats = mrdb_stats:get(Alias),
ct:log("Stats: ~p", [Stats]),
Stats.

View File

@ -1,6 +1,6 @@
-module(mrdb_bench).
-compile(export_all).
-compile([export_all, nowarn_export_all]).
init() ->
mnesia:delete_schema([node()]),