diff --git a/src/mnesia_rocksdb.app.src b/src/mnesia_rocksdb.app.src index ec1ad74..023ea02 100644 --- a/src/mnesia_rocksdb.app.src +++ b/src/mnesia_rocksdb.app.src @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- {application, mnesia_rocksdb, [ {description, "RocksDB backend plugin for Mnesia"}, diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 1fccb72..3ba11a5 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% @@ -257,8 +258,8 @@ show_table(Tab) -> show_table(Tab, Limit) -> Ref = get_ref(Tab), mrdb:with_rdb_iterator(Ref, fun(I) -> - i_show_table(I, first, Limit, Ref) - end). + i_show_table(I, first, Limit, Ref) + end). i_show_table(_, _, 0, _) -> {error, skipped_some}; @@ -941,7 +942,7 @@ write_error(_Op, _Args, _Error, #st{on_error = OnErr}) when OnErr =/= fatal -> ok; write_error(Op, Args, Error, _) -> mnesia_lib:fatal("mnesia_rocksdb write_error: ~p ~p -> ~p", - [Op, Args, Error]). + [Op, Args, Error]). %% ---------------------------------------------------------------------------- %% COMMON PRIVATE diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 4d11bf3..944c65e 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mnesia_rocksdb_admin). -behaviour(gen_server). @@ -993,7 +994,7 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0, Opts = rocksdb_opts_from_trec(TRec0), case open_db_(MP, Alias, Opts, [], CreateIfMissing) of {ok, #{ cf_info := CfI }} -> - DbRec = maps:get({ext,Alias,"default"}, CfI), + DbRec = maps:get({ext,Alias,"default"}, CfI), CfNames = maps:keys(CfI), DbRec1 = DbRec#{ cfs => CfNames, mountpoint => MP }, diff --git a/src/mnesia_rocksdb_app.erl b/src/mnesia_rocksdb_app.erl index 9f84291..7e602a1 100644 --- a/src/mnesia_rocksdb_app.erl +++ b/src/mnesia_rocksdb_app.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% diff --git a/src/mnesia_rocksdb_int.hrl b/src/mnesia_rocksdb_int.hrl index 3bfcfe7..7bf7697 100644 --- a/src/mnesia_rocksdb_int.hrl +++ b/src/mnesia_rocksdb_int.hrl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -include_lib("hut/include/hut.hrl"). %% enable debugging messages through mnesia:set_debug_level(debug) diff --git a/src/mnesia_rocksdb_params.erl b/src/mnesia_rocksdb_params.erl index 1730b04..33419c0 100644 --- a/src/mnesia_rocksdb_params.erl +++ b/src/mnesia_rocksdb_params.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% diff --git a/src/mnesia_rocksdb_sup.erl b/src/mnesia_rocksdb_sup.erl index 3b6b23c..1b1706a 100644 --- a/src/mnesia_rocksdb_sup.erl +++ b/src/mnesia_rocksdb_sup.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% diff --git a/src/mnesia_rocksdb_tuning.erl b/src/mnesia_rocksdb_tuning.erl index 0112e47..88d75c9 100644 --- a/src/mnesia_rocksdb_tuning.erl +++ b/src/mnesia_rocksdb_tuning.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% diff --git a/src/mnesia_rocksdb_tuning.hrl b/src/mnesia_rocksdb_tuning.hrl index bc3b563..175e539 100644 --- a/src/mnesia_rocksdb_tuning.hrl +++ b/src/mnesia_rocksdb_tuning.hrl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %%---------------------------------------------------------------- %% Copyright (c) 2013-2016 Klarna AB %% diff --git a/src/mrdb.erl b/src/mrdb.erl index c390b5d..6d29d8a 100644 --- a/src/mrdb.erl +++ b/src/mrdb.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %% @doc Mid-level access API for Mnesia-managed rocksdb tables %% %% This module implements access functions for the mnesia_rocksdb @@ -14,8 +15,7 @@ %% #{ name := %% , db_ref := %% , cf_handle := -%% , batch := -%% , tx_handle := +%% , activity := Ongoing batch or transaction, if any (map()) %% , attr_pos := #{AttrName := Pos} %% , mode := %% , properties := @@ -37,7 +37,7 @@ -export([ get_ref/1 , ensure_ref/1 , ensure_ref/2 - , alias_of/1 + , alias_of/1 , new_tx/1 , new_tx/2 , tx_ref/2 , tx_commit/1 @@ -138,6 +138,13 @@ -type itr_handle() :: rocksdb:itr_handle(). -type batch_handle() :: rocksdb:batch_handle(). +-type tx_activity() :: #{ type := 'tx' + , handle := tx_handle() + , attempt := non_neg_integer() }. +-type batch_activity() :: #{ type := 'batch' + , handle := batch_handle() }. +-type activity() :: tx_activity() | batch_activity(). + -type pos() :: non_neg_integer(). -type properties() :: #{ record_name := atom() @@ -165,8 +172,7 @@ , properties := properties() , mode => mnesia , ix_vals_f => fun( (tuple()) -> [any()] ) - , batch => batch_handle() - , tx_handle => tx_handle() + , activity => activity() , _ => _}. -type error() :: {error, any()}. @@ -216,10 +222,10 @@ patterns() -> -spec snapshot(alias() | ref_or_tab()) -> {ok, snapshot_handle()} | error(). snapshot(Name) when is_atom(Name) -> case mnesia_rocksdb_admin:get_ref(Name, error) of - error -> - snapshot(get_ref({admin, Name})); - Ref -> - snapshot(Ref) + error -> + snapshot(get_ref({admin, Name})); + Ref -> + snapshot(Ref) end; snapshot(#{db_ref := DbRef}) -> rocksdb:snapshot(DbRef); @@ -259,15 +265,14 @@ activity(Type, Alias, F) -> {tx, TxOpts} -> TxCtxt = new_tx_context(TxOpts, DbRef), maps:merge( - #{ type => tx - , alias => Alias + #{ alias => Alias , db_ref => DbRef }, TxCtxt); batch -> - {ok, Batch} = rdb_batch(), - #{ type => batch + Batch = get_batch_(DbRef), + #{ activity => #{ type => batch + , handle => Batch } , alias => Alias - , db_ref => DbRef - , handle => Batch } + , db_ref => DbRef } end, do_activity(F, Alias, Ctxt, false). @@ -295,9 +300,10 @@ run_f(F, Ctxt, true, Alias) -> F() end). -incr_attempt(#{ type := tx, db_ref := DbRef, attempt := A } = C) -> +incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) -> {ok, TxH} = rdb_transaction(DbRef, []), - C1 = C#{ attempt := A+1, handle := TxH }, + Act1 = Act#{attempt := A+1, handle := TxH}, + C1 = C#{ activity := Act1 }, case maps:is_key(snapshot, C) of true -> {ok, SH} = rocksdb:snapshot(DbRef), @@ -363,7 +369,12 @@ apply_tx_opts(Opts0) when is_map(Opts0) -> check_tx_opts(maps:merge(default_tx_opts(), Opts0)). check_tx_opts(Opts) -> - check_retries(check_nosnap(Opts)). + case maps:without([no_snapshot, retries], Opts) of + Other when map_size(Other) > 0 -> + abort({invalid_tx_opts, maps:keys(Other)}); + _ -> + check_retries(check_nosnap(Opts)) + end. check_retries(#{retries := Retries} = Opts) -> if is_integer(Retries), Retries >= 0 -> @@ -382,7 +393,9 @@ new_tx_context(Opts, DbRef) -> create_tx(Opts, DbRef) -> {ok, TxH} = rdb_transaction(DbRef, []), - Opts#{handle => TxH, attempt => 1}. + Opts#{activity => maps:merge(Opts, #{ type => tx + , handle => TxH + , attempt => 1})}. maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) -> case NoSnap of @@ -394,14 +407,14 @@ maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) -> end. commit_and_pop(Res) -> - #{type := Type, handle := H, db_ref := DbRef} = Ctxt = current_context(), + #{activity := #{type := Type, handle := H} = A} = C = current_context(), case Type of tx -> case rdb_transaction_commit_and_pop(H) of ok -> Res; {error, {error, "Resource busy" ++ _ = Busy}} -> - case Ctxt of + case A of #{retries := Retries, attempt := Att} when Att =< Retries -> throw({?MODULE, busy}); @@ -412,7 +425,7 @@ commit_and_pop(Res) -> error(Reason) end; batch -> - case rdb_write_batch_and_pop(DbRef, H) of + case rdb_write_batch_and_pop(H, C) of ok -> Res; Other -> Other @@ -422,10 +435,10 @@ commit_and_pop(Res) -> abort_and_pop(Cat, Err) -> %% We can pop the context right away, since there is no %% complex failure handling (like retry-on-busy) for rollback. - #{type := Type, handle := H} = pop_ctxt(), + #{activity := #{type := Type, handle := H}} = pop_ctxt(), case Type of tx -> ok = rdb_transaction_rollback(H); - batch -> ok = rdb_release_batch(H) + batch -> ok = release_batches(H) end, case Cat of error -> error(Err); @@ -451,9 +464,9 @@ rdb_transaction_rollback(H) -> rdb_batch() -> rocksdb:batch(). -rdb_write_batch_and_pop(DbRef, H) -> +rdb_write_batch_and_pop(BatchRef, C) -> %% TODO: derive write_opts(R) - try rocksdb:write_batch(DbRef, H, []) + try write_batches(BatchRef, write_opts(C, [])) after pop_ctxt() end. @@ -470,24 +483,26 @@ new_tx(Tab) -> new_tx(Tab, []). -spec new_tx(ref_or_tab(), write_options()) -> db_ref(). +new_tx(#{activity := _}, _) -> + abort(nested_context); new_tx(Tab, Opts) -> #{db_ref := DbRef} = R = ensure_ref(Tab), {ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)), - R#{tx_handle => TxH}. + R#{activity => #{type => tx, handle => TxH, attempt => 1}}. -spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref(). tx_ref(Tab, TxH) -> case ensure_ref(Tab) of - #{tx_handle := TxH} = R -> + #{activity := #{type := tx, handle := TxH}} = R -> R; - #{tx_handle := OtherTxH} -> + #{activity := #{type := tx, handle := OtherTxH}} -> error({tx_handle_conflict, OtherTxH}); R -> - R#{tx_handle => TxH} + R#{activity => #{type => tx, handle => TxH, attempt => 1}} end. -spec tx_commit(tx_handle() | db_ref()) -> ok. -tx_commit(#{tx_handle := TxH}) -> +tx_commit(#{activity := #{type := tx, handle := TxH}}) -> rdb_transaction_commit(TxH); tx_commit(TxH) -> rdb_transaction_commit(TxH). @@ -497,8 +512,9 @@ get_ref(Tab) -> mnesia_rocksdb_admin:get_ref(Tab). -spec ensure_ref(ref_or_tab()) -> db_ref(). +ensure_ref(#{activity := _} = R) -> R; ensure_ref(Ref) when is_map(Ref) -> - Ref; + maybe_tx_ctxt(get(ctxt()), Ref); ensure_ref(Other) -> maybe_tx_ctxt(get(ctxt()), get_ref(Other)). @@ -507,19 +523,18 @@ ensure_ref(Ref, R) when is_map(Ref) -> ensure_ref(Other, R) -> inherit_ctxt(get_ref(Other), R). -maybe_tx_ctxt(undefined, R) -> R; -maybe_tx_ctxt(_, #{batch := _} = R) -> R; -maybe_tx_ctxt(_, #{tx_handle := _} = R) -> R; -maybe_tx_ctxt([#{type := Type, handle := H} = C|_], R) -> +maybe_tx_ctxt(undefined, R) -> R; +maybe_tx_ctxt(_, #{activity := _} = R) -> R; +maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) -> case Type of tx -> - maps:merge(maps:with([snapshot], C), R#{tx_handle => H}); - batch -> - R#{batch => H} + maps:merge(maps:with([snapshot], C), R#{activity => A}); + _ -> + R#{activity => A} end. inherit_ctxt(Ref, R) -> - maps:merge(Ref, maps:with([batch, tx_handle], R)). + maps:merge(Ref, maps:with([snapshot, activity], R)). -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res. with_iterator(Tab, Fun) -> @@ -915,12 +930,12 @@ as_batch(Tab, F) -> as_batch(Tab, F, Opts) when is_function(F, 1), is_list(Opts) -> as_batch_(ensure_ref(Tab), F, Opts). -as_batch_(#{batch := _} = R, F, _) -> +as_batch_(#{activity := #{type := batch}} = R, F, _) -> %% If already inside a batch, add to that batch (batches don't seem to nest) F(R); as_batch_(#{db_ref := DbRef} = R, F, Opts) -> BatchRef = get_batch_(DbRef), - try F(R#{batch => BatchRef}) of + try F(R#{activity => #{type => batch, handle => BatchRef}}) of Res -> case write_batches(BatchRef, write_opts(R, Opts)) of ok -> @@ -936,48 +951,53 @@ as_batch_(#{db_ref := DbRef} = R, F, Opts) -> get_batch(#{db_ref := DbRef, batch := BatchRef}) -> try {ok, get_batch_(DbRef, BatchRef)} catch - error:Reason -> - {error, Reason} + error:Reason -> + {error, Reason} end; get_batch(_) -> {error, badarg}. get_batch_(DbRef) -> Ref = make_ref(), - {ok, Batch} = rocksdb:batch(), + {ok, Batch} = rdb_batch(), put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref. get_batch_(DbRef, BatchRef) -> - Key = {mrdb_batch, BatchRef}, + Key = batch_ref_key(BatchRef), case get(Key) of - undefined -> - error(stale_batch_ref); - #{DbRef := Batch} -> - Batch; - Map -> - {ok, Batch} = rocksdb:batch(), - put(Key, Map#{DbRef => Batch}), - Batch + undefined -> + error(stale_batch_ref); + #{DbRef := Batch} -> + Batch; + Map -> + {ok, Batch} = rdb:batch(), + put(Key, Map#{DbRef => Batch}), + Batch end. +batch_ref_key(BatchRef) -> + {mrdb_batch, BatchRef}. + write_batches(BatchRef, Opts) -> - case get({mrdb_batch, BatchRef}) of - undefined -> - error(stale_batch_ref); - Map -> - %% Some added complication since we deal with potentially - %% multiple DbRefs, and will want to return errors. - ret_batch_write_acc( - maps:fold( - fun(DbRef, Batch, Acc) -> - case rocksdb:write_batch(DbRef, Batch, Opts) of - ok -> - Acc; - {error,E} -> - acc_batch_write_error(E, DbRef, Acc) - end - end, ok, Map)) + Key = batch_ref_key(BatchRef), + case get(Key) of + undefined -> + error(stale_batch_ref); + Map -> + %% Some added complication since we deal with potentially + %% multiple DbRefs, and will want to return errors. + erase(Key), + ret_batch_write_acc( + maps:fold( + fun(DbRef, Batch, Acc) -> + case rocksdb:write_batch(DbRef, Batch, Opts) of + ok -> + Acc; + {error,E} -> + acc_batch_write_error(E, DbRef, Acc) + end + end, ok, Map)) end. ret_batch_write_acc(ok) -> @@ -992,16 +1012,17 @@ acc_batch_write_error(E, DbRef, Es) when is_list(Es) -> [{DbRef, E}|Es]. release_batches(BatchRef) -> - case get({mrdb_batch, BatchRef}) of - undefined -> - error(stale_batch_ref); - Map -> - maps_foreach( - fun(_, Batch) -> - rocksdb:release_batch(Batch) - end, Map), - erase(BatchRef), - ok + Key = batch_ref_key(BatchRef), + case get(Key) of + undefined -> + ok; + Map -> + erase(Key), + maps_foreach( + fun(_, Batch) -> + rdb_release_batch(Batch) + end, Map), + ok end. %% maps:foreach/2 doesn't exist in OTP 22 ... @@ -1347,12 +1368,12 @@ rdb_put(R, K, V) -> rdb_put(R, K, V, []). rdb_put(R, K, V, Opts) -> rdb_put_(R, K, V, write_opts(R, Opts)). -rdb_put_(#{batch := BatchRef, - db_ref := DbRef, - cf_handle := CfH}, K, V, _Opts) -> +rdb_put_(#{activity := #{type := batch, handle := BatchRef}, + db_ref := DbRef, + cf_handle := CfH}, K, V, _Opts) -> Batch = get_batch_(DbRef, BatchRef), rocksdb:batch_put(Batch, CfH, K, V); -rdb_put_(#{tx_handle := TxH, cf_handle := CfH}, K, V, _Opts) -> +rdb_put_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, V, _Opts) -> rocksdb:transaction_put(TxH, CfH, K, V); rdb_put_(#{db_ref := DbRef, cf_handle := CfH}, K, V, WOpts) -> rocksdb:put(DbRef, CfH, K, V, WOpts). @@ -1361,9 +1382,9 @@ rdb_get(R, K) -> rdb_get(R, K, []). rdb_get(R, K, Opts) -> rdb_get_(R, K, read_opts(R, Opts)). -rdb_get_(#{tx_handle := TxH, cf_handle := CfH, snapshot := SH}, K, _Opts) -> +rdb_get_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH, snapshot := SH}, K, _Opts) -> rocksdb:transaction_get(TxH, CfH, K, [{snapshot, SH}]); -rdb_get_(#{tx_handle := TxH, cf_handle := CfH}, K, _Opts) -> +rdb_get_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, _Opts) -> rocksdb:transaction_get(TxH, CfH, K, []); rdb_get_(#{db_ref := DbRef, cf_handle := CfH}, K, ROpts) -> rocksdb:get(DbRef, CfH, K, ROpts). @@ -1372,12 +1393,12 @@ rdb_delete(R, K) -> rdb_delete(R, K, []). rdb_delete(R, K, Opts) -> rdb_delete_(R, K, write_opts(R, Opts)). -rdb_delete_(#{batch := BatchRef, - db_ref := DbRef, - cf_handle := CfH}, K, _Opts) -> +rdb_delete_(#{activity := #{type := batch, handle := BatchRef}, + db_ref := DbRef, + cf_handle := CfH}, K, _Opts) -> Batch = get_batch_(DbRef, BatchRef), rocksdb:batch_delete(Batch, CfH, K); -rdb_delete_(#{tx_handle := TxH, cf_handle := CfH}, K, _Opts) -> +rdb_delete_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, _Opts) -> rocksdb:transaction_delete(TxH, CfH, K); rdb_delete_(#{db_ref := DbRef, cf_handle := CfH}, K, WOpts) -> rocksdb:delete(DbRef, CfH, K, WOpts). @@ -1386,7 +1407,7 @@ rdb_iterator(R) -> rdb_iterator(R, []). rdb_iterator(R, Opts) -> rdb_iterator_(R, read_opts(R, Opts)). -rdb_iterator_(#{db_ref := DbRef, tx_handle := TxH, cf_handle := CfH}, ROpts) -> +rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) -> rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts); rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) -> rocksdb:iterator(DbRef, CfH, ROpts). diff --git a/src/mrdb_index.erl b/src/mrdb_index.erl index bf42842..fcf1f6d 100644 --- a/src/mrdb_index.erl +++ b/src/mrdb_index.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mrdb_index). -export([ diff --git a/src/mrdb_mutex.erl b/src/mrdb_mutex.erl index 98eae78..d745ae1 100644 --- a/src/mrdb_mutex.erl +++ b/src/mrdb_mutex.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mrdb_mutex). -export([ do/2 ]). diff --git a/src/mrdb_select.erl b/src/mrdb_select.erl index 316a4f7..1dbcc5a 100644 --- a/src/mrdb_select.erl +++ b/src/mrdb_select.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mrdb_select). -export([ select/3 %% (Ref, MatchSpec, Limit) diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl index 64ddfd0..4fcc452 100644 --- a/test/mnesia_rocksdb_SUITE.erl +++ b/test/mnesia_rocksdb_SUITE.erl @@ -1,3 +1,4 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mnesia_rocksdb_SUITE). -export([ @@ -16,7 +17,8 @@ , encoding_binary_binary/1 , encoding_defaults/1 ]). --export([ mrdb_transactions/1 +-export([ mrdb_batch/1 + , mrdb_transactions/1 , mrdb_repeated_transactions/1 , mrdb_abort/1 , mrdb_two_procs/1 @@ -43,7 +45,8 @@ groups() -> , {checks, [sequence], [ encoding_sext_attrs , encoding_binary_binary , encoding_defaults ]} - , {mrdb, [sequence], [ mrdb_transactions + , {mrdb, [sequence], [ mrdb_batch + , mrdb_transactions , mrdb_repeated_transactions , mrdb_abort , mrdb_two_procs @@ -139,6 +142,43 @@ expect_error(F, Line, Type, Expected) -> ok end. +mrdb_batch(Config) -> + Created = create_tabs([{b, []}], Config), + D0 = get_dict(), + mrdb:activity( + batch, rdb, + fun() -> + [mrdb:insert(b, {b, K, K}) + || K <- lists:seq(1, 10)] + end), + dictionary_unchanged(D0), + [[{b,K,K}] = mrdb:read(b, K) || K <- lists:seq(1, 10)], + expect_error( + fun() -> mrdb:activity( + batch, rdb, + fun() -> + mrdb:insert(b, {b, 11, 11}), + error(willful_abort) + end) + end, ?LINE, error, '_'), + dictionary_unchanged(D0), + [] = mrdb:read(b, 11), + TRef = mrdb:get_ref(b), + mrdb:activity( + batch, rdb, + fun() -> + mrdb:insert(TRef, {b, 12, 12}) + end), + dictionary_unchanged(D0), + [{b, 12, 12}] = mrdb:read(b, 12), + mrdb:as_batch(b, fun(R) -> + mrdb:insert(R, {b, 13, 13}) + end), + dictionary_unchanged(D0), + [{b, 13, 13}] = mrdb:read(b, 13), + delete_tabs(Created), + ok. + mrdb_transactions(Config) -> tr_ct:with_trace(fun mrdb_transactions_/1, Config, tr_patterns( @@ -149,13 +189,17 @@ mrdb_transactions_(Config) -> Created = create_tabs([{tx, []}], Config), mrdb:insert(tx, {tx, a, 1}), [_] = mrdb:read(tx, a), + D0 = get_dict(), mrdb:activity( tx, rdb, fun() -> [{tx,a,N}] = mrdb:read(tx, a), N1 = N+1, - ok = mrdb:insert(tx, {tx,a,N1}) + ok = mrdb:insert(tx, {tx,a,N1}), + [{tx,a,N1}] = mrdb:read(tx, a), + ok end), + dictionary_unchanged(D0), [{tx,a,2}] = mrdb:read(tx,a), delete_tabs(Created), ok. @@ -169,7 +213,9 @@ mrdb_repeated_transactions(Config) -> N1 = N+1, ok = mrdb:insert(rtx, {rtx, a, N1}) end, + D0 = get_dict(), [ok = mrdb:activity(tx, rdb, Fun) || _ <- lists:seq(1,100)], + dictionary_unchanged(D0), [{rtx,a,100}] = mrdb:read(rtx, a), delete_tabs(Created), ok. @@ -178,6 +224,7 @@ mrdb_abort(Config) -> Created = create_tabs([{tx_abort, []}], Config), mrdb:insert(tx_abort, {tx_abort, a, 1}), Pre = mrdb:read(tx_abort, a), + D0 = get_dict(), TRes = try mrdb:activity( tx, rdb, fun() -> @@ -190,6 +237,7 @@ mrdb_abort(Config) -> error:abort_here -> ok end, + dictionary_unchanged(D0), ok = TRes, Pre = mrdb:read(tx_abort, a), delete_tabs(Created), @@ -202,7 +250,7 @@ mrdb_two_procs(Config) -> tr_patterns( mrdb, [ {mrdb, insert, 2, x} , {mrdb, read, 2, x} - , {mrdb, activity, x} ], tr_opts()))). + , {mrdb, activity, x}], tr_opts()))). mrdb_two_procs_(Config) -> R = ?FUNCTION_NAME, @@ -217,7 +265,9 @@ mrdb_two_procs_(Config) -> end, {POther, MRef} = spawn_opt( fun() -> - ok = mrdb:activity(tx, rdb, F0) + D0 = get_dict(), + ok = mrdb:activity(tx, rdb, F0), + dictionary_unchanged(D0) end, [monitor]), F1 = fun() -> Pre = mrdb:read(R, a), @@ -227,6 +277,7 @@ mrdb_two_procs_(Config) -> ok = mrdb:insert(R, {R, a, 18}) end, go_ahead_other(1, POther), + Do0 = get_dict(), try mrdb:activity({tx, #{no_snapshot => true, retries => 0}}, rdb, F1) of ok -> error(unexpected) @@ -234,6 +285,7 @@ mrdb_two_procs_(Config) -> error:{error, "Resource busy" ++ _} -> ok end, + dictionary_unchanged(Do0), [{R, a, 17}] = mrdb:read(R, a), delete_tabs(Created), ok. @@ -271,7 +323,9 @@ mrdb_two_procs_tx_restart_(Config) -> ok = mrdb:insert(R, {R, a, 18}) end, go_ahead_other(1, POther), + Do0 = get_dict(), mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1), + dictionary_unchanged(Do0), [{R, a, 18}] = mrdb:read(R, a), delete_tabs(Created), ok. @@ -309,7 +363,9 @@ mrdb_two_procs_snap(Config) -> end, {POther, MRef} = spawn_opt(fun() -> - ok = mrdb:activity(tx, rdb, F0) + D0 = get_dict(), + ok = mrdb:activity(tx, rdb, F0), + dictionary_unchanged(D0) end, [monitor]), F1 = fun() -> Att = get_attempt(), @@ -324,7 +380,9 @@ mrdb_two_procs_snap(Config) -> mrdb:insert(R, {R, b, 18}), 1477 end, + Do0 = get_dict(), 1477 = mrdb:activity(tx, rdb, F1), + dictionary_unchanged(Do0), [{R, a, 17}] = mrdb:read(R, a), [{R, b, 18}] = mrdb:read(R, b), delete_tabs(Created), @@ -351,11 +409,13 @@ mrdb_three_procs_(Config) -> end, {P1, MRef1} = spawn_opt(fun() -> + D0 = get_dict(), do_when_p_allows( 1, Parent, ?LINE, fun() -> ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1) - end) + end), + dictionary_unchanged(D0) end, [monitor]), F2 = fun() -> [A0] = mrdb:read(R, a), @@ -371,6 +431,7 @@ mrdb_three_procs_(Config) -> end, {P2, MRef2} = spawn_opt(fun() -> + D0 = get_dict(), try mrdb:activity( {tx, #{retries => 0, no_snapshot => true}}, rdb, F2) of @@ -378,8 +439,10 @@ mrdb_three_procs_(Config) -> catch error:{error, "Resource busy" ++ _} -> ok - end + end, + dictionary_unchanged(D0) end, [monitor]), + Do0 = get_dict(), ok = mrdb:activity(tx, rdb, fun() -> Att = get_attempt(), @@ -398,6 +461,7 @@ mrdb_three_procs_(Config) -> await_other_down(P2, MRef2, ?LINE), ok = mrdb:insert(R, {R, p0, 1}) end), + dictionary_unchanged(Do0), [{R, p1, 1}] = mrdb:read(R, p1), [] = mrdb:read(R, p2), [A1] = mrdb:read(R, a), @@ -511,7 +575,7 @@ await_other_down_(P, MRef, Line) -> end. get_attempt() -> - #{attempt := Attempt} = mrdb:current_context(), + #{activity := #{attempt := Attempt}} = mrdb:current_context(), Attempt. create_tabs(Tabs, Config) -> @@ -529,3 +593,14 @@ delete_tabs(Tabs) -> [{atomic,ok} = mnesia:delete_table(T) || T <- Tabs], ok. +get_dict() -> + {dictionary, D} = process_info(self(), dictionary), + [X || {K,_} = X <- D, + K =/= log_timestamp]. + +dictionary_unchanged(Old) -> + New = get_dict(), + #{ deleted := [] + , added := [] } = #{ deleted => Old -- New + , added => New -- Old }, + ok.