Fix batch- and tx-handling refs, formatting annot

This commit is contained in:
Ulf Wiger 2022-06-29 12:19:29 +02:00
parent b70654d0a2
commit e351a02175
14 changed files with 212 additions and 104 deletions

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
{application, mnesia_rocksdb, {application, mnesia_rocksdb,
[ [
{description, "RocksDB backend plugin for Mnesia"}, {description, "RocksDB backend plugin for Mnesia"},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%
@ -257,8 +258,8 @@ show_table(Tab) ->
show_table(Tab, Limit) -> show_table(Tab, Limit) ->
Ref = get_ref(Tab), Ref = get_ref(Tab),
mrdb:with_rdb_iterator(Ref, fun(I) -> mrdb:with_rdb_iterator(Ref, fun(I) ->
i_show_table(I, first, Limit, Ref) i_show_table(I, first, Limit, Ref)
end). end).
i_show_table(_, _, 0, _) -> i_show_table(_, _, 0, _) ->
{error, skipped_some}; {error, skipped_some};
@ -941,7 +942,7 @@ write_error(_Op, _Args, _Error, #st{on_error = OnErr}) when OnErr =/= fatal ->
ok; ok;
write_error(Op, Args, Error, _) -> write_error(Op, Args, Error, _) ->
mnesia_lib:fatal("mnesia_rocksdb write_error: ~p ~p -> ~p", mnesia_lib:fatal("mnesia_rocksdb write_error: ~p ~p -> ~p",
[Op, Args, Error]). [Op, Args, Error]).
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% COMMON PRIVATE %% COMMON PRIVATE

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mnesia_rocksdb_admin). -module(mnesia_rocksdb_admin).
-behaviour(gen_server). -behaviour(gen_server).
@ -993,7 +994,7 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
Opts = rocksdb_opts_from_trec(TRec0), Opts = rocksdb_opts_from_trec(TRec0),
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
{ok, #{ cf_info := CfI }} -> {ok, #{ cf_info := CfI }} ->
DbRec = maps:get({ext,Alias,"default"}, CfI), DbRec = maps:get({ext,Alias,"default"}, CfI),
CfNames = maps:keys(CfI), CfNames = maps:keys(CfI),
DbRec1 = DbRec#{ cfs => CfNames, DbRec1 = DbRec#{ cfs => CfNames,
mountpoint => MP }, mountpoint => MP },

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-include_lib("hut/include/hut.hrl"). -include_lib("hut/include/hut.hrl").
%% enable debugging messages through mnesia:set_debug_level(debug) %% enable debugging messages through mnesia:set_debug_level(debug)

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%%---------------------------------------------------------------- %%----------------------------------------------------------------
%% Copyright (c) 2013-2016 Klarna AB %% Copyright (c) 2013-2016 Klarna AB
%% %%

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Mid-level access API for Mnesia-managed rocksdb tables %% @doc Mid-level access API for Mnesia-managed rocksdb tables
%% %%
%% This module implements access functions for the mnesia_rocksdb %% This module implements access functions for the mnesia_rocksdb
@ -14,8 +15,7 @@
%% #{ name := <Logical table name> %% #{ name := <Logical table name>
%% , db_ref := <Rocksdb database Ref> %% , db_ref := <Rocksdb database Ref>
%% , cf_handle := <Rocksdb column family handle> %% , cf_handle := <Rocksdb column family handle>
%% , batch := <Batch reference, if any> %% , activity := Ongoing batch or transaction, if any (map())
%% , tx_handle := <Rocksdb transaction handle, if any>
%% , attr_pos := #{AttrName := Pos} %% , attr_pos := #{AttrName := Pos}
%% , mode := <Set to 'mnesia' for mnesia access flows> %% , mode := <Set to 'mnesia' for mnesia access flows>
%% , properties := <Mnesia table props in map format> %% , properties := <Mnesia table props in map format>
@ -37,7 +37,7 @@
-export([ get_ref/1 -export([ get_ref/1
, ensure_ref/1 , ensure_ref/2 , ensure_ref/1 , ensure_ref/2
, alias_of/1 , alias_of/1
, new_tx/1 , new_tx/2 , new_tx/1 , new_tx/2
, tx_ref/2 , tx_ref/2
, tx_commit/1 , tx_commit/1
@ -138,6 +138,13 @@
-type itr_handle() :: rocksdb:itr_handle(). -type itr_handle() :: rocksdb:itr_handle().
-type batch_handle() :: rocksdb:batch_handle(). -type batch_handle() :: rocksdb:batch_handle().
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := non_neg_integer() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
-type pos() :: non_neg_integer(). -type pos() :: non_neg_integer().
-type properties() :: #{ record_name := atom() -type properties() :: #{ record_name := atom()
@ -165,8 +172,7 @@
, properties := properties() , properties := properties()
, mode => mnesia , mode => mnesia
, ix_vals_f => fun( (tuple()) -> [any()] ) , ix_vals_f => fun( (tuple()) -> [any()] )
, batch => batch_handle() , activity => activity()
, tx_handle => tx_handle()
, _ => _}. , _ => _}.
-type error() :: {error, any()}. -type error() :: {error, any()}.
@ -216,10 +222,10 @@ patterns() ->
-spec snapshot(alias() | ref_or_tab()) -> {ok, snapshot_handle()} | error(). -spec snapshot(alias() | ref_or_tab()) -> {ok, snapshot_handle()} | error().
snapshot(Name) when is_atom(Name) -> snapshot(Name) when is_atom(Name) ->
case mnesia_rocksdb_admin:get_ref(Name, error) of case mnesia_rocksdb_admin:get_ref(Name, error) of
error -> error ->
snapshot(get_ref({admin, Name})); snapshot(get_ref({admin, Name}));
Ref -> Ref ->
snapshot(Ref) snapshot(Ref)
end; end;
snapshot(#{db_ref := DbRef}) -> snapshot(#{db_ref := DbRef}) ->
rocksdb:snapshot(DbRef); rocksdb:snapshot(DbRef);
@ -259,15 +265,14 @@ activity(Type, Alias, F) ->
{tx, TxOpts} -> {tx, TxOpts} ->
TxCtxt = new_tx_context(TxOpts, DbRef), TxCtxt = new_tx_context(TxOpts, DbRef),
maps:merge( maps:merge(
#{ type => tx #{ alias => Alias
, alias => Alias
, db_ref => DbRef }, TxCtxt); , db_ref => DbRef }, TxCtxt);
batch -> batch ->
{ok, Batch} = rdb_batch(), Batch = get_batch_(DbRef),
#{ type => batch #{ activity => #{ type => batch
, handle => Batch }
, alias => Alias , alias => Alias
, db_ref => DbRef , db_ref => DbRef }
, handle => Batch }
end, end,
do_activity(F, Alias, Ctxt, false). do_activity(F, Alias, Ctxt, false).
@ -295,9 +300,10 @@ run_f(F, Ctxt, true, Alias) ->
F() F()
end). end).
incr_attempt(#{ type := tx, db_ref := DbRef, attempt := A } = C) -> incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []), {ok, TxH} = rdb_transaction(DbRef, []),
C1 = C#{ attempt := A+1, handle := TxH }, Act1 = Act#{attempt := A+1, handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of case maps:is_key(snapshot, C) of
true -> true ->
{ok, SH} = rocksdb:snapshot(DbRef), {ok, SH} = rocksdb:snapshot(DbRef),
@ -363,7 +369,12 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
check_tx_opts(maps:merge(default_tx_opts(), Opts0)). check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
check_tx_opts(Opts) -> check_tx_opts(Opts) ->
check_retries(check_nosnap(Opts)). case maps:without([no_snapshot, retries], Opts) of
Other when map_size(Other) > 0 ->
abort({invalid_tx_opts, maps:keys(Other)});
_ ->
check_retries(check_nosnap(Opts))
end.
check_retries(#{retries := Retries} = Opts) -> check_retries(#{retries := Retries} = Opts) ->
if is_integer(Retries), Retries >= 0 -> if is_integer(Retries), Retries >= 0 ->
@ -382,7 +393,9 @@ new_tx_context(Opts, DbRef) ->
create_tx(Opts, DbRef) -> create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []), {ok, TxH} = rdb_transaction(DbRef, []),
Opts#{handle => TxH, attempt => 1}. Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 1})}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) -> maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of case NoSnap of
@ -394,14 +407,14 @@ maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
end. end.
commit_and_pop(Res) -> commit_and_pop(Res) ->
#{type := Type, handle := H, db_ref := DbRef} = Ctxt = current_context(), #{activity := #{type := Type, handle := H} = A} = C = current_context(),
case Type of case Type of
tx -> tx ->
case rdb_transaction_commit_and_pop(H) of case rdb_transaction_commit_and_pop(H) of
ok -> ok ->
Res; Res;
{error, {error, "Resource busy" ++ _ = Busy}} -> {error, {error, "Resource busy" ++ _ = Busy}} ->
case Ctxt of case A of
#{retries := Retries, attempt := Att} #{retries := Retries, attempt := Att}
when Att =< Retries -> when Att =< Retries ->
throw({?MODULE, busy}); throw({?MODULE, busy});
@ -412,7 +425,7 @@ commit_and_pop(Res) ->
error(Reason) error(Reason)
end; end;
batch -> batch ->
case rdb_write_batch_and_pop(DbRef, H) of case rdb_write_batch_and_pop(H, C) of
ok -> Res; ok -> Res;
Other -> Other ->
Other Other
@ -422,10 +435,10 @@ commit_and_pop(Res) ->
abort_and_pop(Cat, Err) -> abort_and_pop(Cat, Err) ->
%% We can pop the context right away, since there is no %% We can pop the context right away, since there is no
%% complex failure handling (like retry-on-busy) for rollback. %% complex failure handling (like retry-on-busy) for rollback.
#{type := Type, handle := H} = pop_ctxt(), #{activity := #{type := Type, handle := H}} = pop_ctxt(),
case Type of case Type of
tx -> ok = rdb_transaction_rollback(H); tx -> ok = rdb_transaction_rollback(H);
batch -> ok = rdb_release_batch(H) batch -> ok = release_batches(H)
end, end,
case Cat of case Cat of
error -> error(Err); error -> error(Err);
@ -451,9 +464,9 @@ rdb_transaction_rollback(H) ->
rdb_batch() -> rdb_batch() ->
rocksdb:batch(). rocksdb:batch().
rdb_write_batch_and_pop(DbRef, H) -> rdb_write_batch_and_pop(BatchRef, C) ->
%% TODO: derive write_opts(R) %% TODO: derive write_opts(R)
try rocksdb:write_batch(DbRef, H, []) try write_batches(BatchRef, write_opts(C, []))
after after
pop_ctxt() pop_ctxt()
end. end.
@ -470,24 +483,26 @@ new_tx(Tab) ->
new_tx(Tab, []). new_tx(Tab, []).
-spec new_tx(ref_or_tab(), write_options()) -> db_ref(). -spec new_tx(ref_or_tab(), write_options()) -> db_ref().
new_tx(#{activity := _}, _) ->
abort(nested_context);
new_tx(Tab, Opts) -> new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab), #{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)), {ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{tx_handle => TxH}. R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref(). -spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) -> tx_ref(Tab, TxH) ->
case ensure_ref(Tab) of case ensure_ref(Tab) of
#{tx_handle := TxH} = R -> #{activity := #{type := tx, handle := TxH}} = R ->
R; R;
#{tx_handle := OtherTxH} -> #{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH}); error({tx_handle_conflict, OtherTxH});
R -> R ->
R#{tx_handle => TxH} R#{activity => #{type => tx, handle => TxH, attempt => 1}}
end. end.
-spec tx_commit(tx_handle() | db_ref()) -> ok. -spec tx_commit(tx_handle() | db_ref()) -> ok.
tx_commit(#{tx_handle := TxH}) -> tx_commit(#{activity := #{type := tx, handle := TxH}}) ->
rdb_transaction_commit(TxH); rdb_transaction_commit(TxH);
tx_commit(TxH) -> tx_commit(TxH) ->
rdb_transaction_commit(TxH). rdb_transaction_commit(TxH).
@ -497,8 +512,9 @@ get_ref(Tab) ->
mnesia_rocksdb_admin:get_ref(Tab). mnesia_rocksdb_admin:get_ref(Tab).
-spec ensure_ref(ref_or_tab()) -> db_ref(). -spec ensure_ref(ref_or_tab()) -> db_ref().
ensure_ref(#{activity := _} = R) -> R;
ensure_ref(Ref) when is_map(Ref) -> ensure_ref(Ref) when is_map(Ref) ->
Ref; maybe_tx_ctxt(get(ctxt()), Ref);
ensure_ref(Other) -> ensure_ref(Other) ->
maybe_tx_ctxt(get(ctxt()), get_ref(Other)). maybe_tx_ctxt(get(ctxt()), get_ref(Other)).
@ -507,19 +523,18 @@ ensure_ref(Ref, R) when is_map(Ref) ->
ensure_ref(Other, R) -> ensure_ref(Other, R) ->
inherit_ctxt(get_ref(Other), R). inherit_ctxt(get_ref(Other), R).
maybe_tx_ctxt(undefined, R) -> R; maybe_tx_ctxt(undefined, R) -> R;
maybe_tx_ctxt(_, #{batch := _} = R) -> R; maybe_tx_ctxt(_, #{activity := _} = R) -> R;
maybe_tx_ctxt(_, #{tx_handle := _} = R) -> R; maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
maybe_tx_ctxt([#{type := Type, handle := H} = C|_], R) ->
case Type of case Type of
tx -> tx ->
maps:merge(maps:with([snapshot], C), R#{tx_handle => H}); maps:merge(maps:with([snapshot], C), R#{activity => A});
batch -> _ ->
R#{batch => H} R#{activity => A}
end. end.
inherit_ctxt(Ref, R) -> inherit_ctxt(Ref, R) ->
maps:merge(Ref, maps:with([batch, tx_handle], R)). maps:merge(Ref, maps:with([snapshot, activity], R)).
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res. -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
with_iterator(Tab, Fun) -> with_iterator(Tab, Fun) ->
@ -915,12 +930,12 @@ as_batch(Tab, F) ->
as_batch(Tab, F, Opts) when is_function(F, 1), is_list(Opts) -> as_batch(Tab, F, Opts) when is_function(F, 1), is_list(Opts) ->
as_batch_(ensure_ref(Tab), F, Opts). as_batch_(ensure_ref(Tab), F, Opts).
as_batch_(#{batch := _} = R, F, _) -> as_batch_(#{activity := #{type := batch}} = R, F, _) ->
%% If already inside a batch, add to that batch (batches don't seem to nest) %% If already inside a batch, add to that batch (batches don't seem to nest)
F(R); F(R);
as_batch_(#{db_ref := DbRef} = R, F, Opts) -> as_batch_(#{db_ref := DbRef} = R, F, Opts) ->
BatchRef = get_batch_(DbRef), BatchRef = get_batch_(DbRef),
try F(R#{batch => BatchRef}) of try F(R#{activity => #{type => batch, handle => BatchRef}}) of
Res -> Res ->
case write_batches(BatchRef, write_opts(R, Opts)) of case write_batches(BatchRef, write_opts(R, Opts)) of
ok -> ok ->
@ -936,48 +951,53 @@ as_batch_(#{db_ref := DbRef} = R, F, Opts) ->
get_batch(#{db_ref := DbRef, batch := BatchRef}) -> get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
try {ok, get_batch_(DbRef, BatchRef)} try {ok, get_batch_(DbRef, BatchRef)}
catch catch
error:Reason -> error:Reason ->
{error, Reason} {error, Reason}
end; end;
get_batch(_) -> get_batch(_) ->
{error, badarg}. {error, badarg}.
get_batch_(DbRef) -> get_batch_(DbRef) ->
Ref = make_ref(), Ref = make_ref(),
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rdb_batch(),
put({mrdb_batch, Ref}, #{DbRef => Batch}), put({mrdb_batch, Ref}, #{DbRef => Batch}),
Ref. Ref.
get_batch_(DbRef, BatchRef) -> get_batch_(DbRef, BatchRef) ->
Key = {mrdb_batch, BatchRef}, Key = batch_ref_key(BatchRef),
case get(Key) of case get(Key) of
undefined -> undefined ->
error(stale_batch_ref); error(stale_batch_ref);
#{DbRef := Batch} -> #{DbRef := Batch} ->
Batch; Batch;
Map -> Map ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rdb:batch(),
put(Key, Map#{DbRef => Batch}), put(Key, Map#{DbRef => Batch}),
Batch Batch
end. end.
batch_ref_key(BatchRef) ->
{mrdb_batch, BatchRef}.
write_batches(BatchRef, Opts) -> write_batches(BatchRef, Opts) ->
case get({mrdb_batch, BatchRef}) of Key = batch_ref_key(BatchRef),
undefined -> case get(Key) of
error(stale_batch_ref); undefined ->
Map -> error(stale_batch_ref);
%% Some added complication since we deal with potentially Map ->
%% multiple DbRefs, and will want to return errors. %% Some added complication since we deal with potentially
ret_batch_write_acc( %% multiple DbRefs, and will want to return errors.
maps:fold( erase(Key),
fun(DbRef, Batch, Acc) -> ret_batch_write_acc(
case rocksdb:write_batch(DbRef, Batch, Opts) of maps:fold(
ok -> fun(DbRef, Batch, Acc) ->
Acc; case rocksdb:write_batch(DbRef, Batch, Opts) of
{error,E} -> ok ->
acc_batch_write_error(E, DbRef, Acc) Acc;
end {error,E} ->
end, ok, Map)) acc_batch_write_error(E, DbRef, Acc)
end
end, ok, Map))
end. end.
ret_batch_write_acc(ok) -> ret_batch_write_acc(ok) ->
@ -992,16 +1012,17 @@ acc_batch_write_error(E, DbRef, Es) when is_list(Es) ->
[{DbRef, E}|Es]. [{DbRef, E}|Es].
release_batches(BatchRef) -> release_batches(BatchRef) ->
case get({mrdb_batch, BatchRef}) of Key = batch_ref_key(BatchRef),
undefined -> case get(Key) of
error(stale_batch_ref); undefined ->
Map -> ok;
maps_foreach( Map ->
fun(_, Batch) -> erase(Key),
rocksdb:release_batch(Batch) maps_foreach(
end, Map), fun(_, Batch) ->
erase(BatchRef), rdb_release_batch(Batch)
ok end, Map),
ok
end. end.
%% maps:foreach/2 doesn't exist in OTP 22 ... %% maps:foreach/2 doesn't exist in OTP 22 ...
@ -1347,12 +1368,12 @@ rdb_put(R, K, V) -> rdb_put(R, K, V, []).
rdb_put(R, K, V, Opts) -> rdb_put(R, K, V, Opts) ->
rdb_put_(R, K, V, write_opts(R, Opts)). rdb_put_(R, K, V, write_opts(R, Opts)).
rdb_put_(#{batch := BatchRef, rdb_put_(#{activity := #{type := batch, handle := BatchRef},
db_ref := DbRef, db_ref := DbRef,
cf_handle := CfH}, K, V, _Opts) -> cf_handle := CfH}, K, V, _Opts) ->
Batch = get_batch_(DbRef, BatchRef), Batch = get_batch_(DbRef, BatchRef),
rocksdb:batch_put(Batch, CfH, K, V); rocksdb:batch_put(Batch, CfH, K, V);
rdb_put_(#{tx_handle := TxH, cf_handle := CfH}, K, V, _Opts) -> rdb_put_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, V, _Opts) ->
rocksdb:transaction_put(TxH, CfH, K, V); rocksdb:transaction_put(TxH, CfH, K, V);
rdb_put_(#{db_ref := DbRef, cf_handle := CfH}, K, V, WOpts) -> rdb_put_(#{db_ref := DbRef, cf_handle := CfH}, K, V, WOpts) ->
rocksdb:put(DbRef, CfH, K, V, WOpts). rocksdb:put(DbRef, CfH, K, V, WOpts).
@ -1361,9 +1382,9 @@ rdb_get(R, K) -> rdb_get(R, K, []).
rdb_get(R, K, Opts) -> rdb_get(R, K, Opts) ->
rdb_get_(R, K, read_opts(R, Opts)). rdb_get_(R, K, read_opts(R, Opts)).
rdb_get_(#{tx_handle := TxH, cf_handle := CfH, snapshot := SH}, K, _Opts) -> rdb_get_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH, snapshot := SH}, K, _Opts) ->
rocksdb:transaction_get(TxH, CfH, K, [{snapshot, SH}]); rocksdb:transaction_get(TxH, CfH, K, [{snapshot, SH}]);
rdb_get_(#{tx_handle := TxH, cf_handle := CfH}, K, _Opts) -> rdb_get_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, _Opts) ->
rocksdb:transaction_get(TxH, CfH, K, []); rocksdb:transaction_get(TxH, CfH, K, []);
rdb_get_(#{db_ref := DbRef, cf_handle := CfH}, K, ROpts) -> rdb_get_(#{db_ref := DbRef, cf_handle := CfH}, K, ROpts) ->
rocksdb:get(DbRef, CfH, K, ROpts). rocksdb:get(DbRef, CfH, K, ROpts).
@ -1372,12 +1393,12 @@ rdb_delete(R, K) -> rdb_delete(R, K, []).
rdb_delete(R, K, Opts) -> rdb_delete(R, K, Opts) ->
rdb_delete_(R, K, write_opts(R, Opts)). rdb_delete_(R, K, write_opts(R, Opts)).
rdb_delete_(#{batch := BatchRef, rdb_delete_(#{activity := #{type := batch, handle := BatchRef},
db_ref := DbRef, db_ref := DbRef,
cf_handle := CfH}, K, _Opts) -> cf_handle := CfH}, K, _Opts) ->
Batch = get_batch_(DbRef, BatchRef), Batch = get_batch_(DbRef, BatchRef),
rocksdb:batch_delete(Batch, CfH, K); rocksdb:batch_delete(Batch, CfH, K);
rdb_delete_(#{tx_handle := TxH, cf_handle := CfH}, K, _Opts) -> rdb_delete_(#{activity := #{type := tx, handle := TxH}, cf_handle := CfH}, K, _Opts) ->
rocksdb:transaction_delete(TxH, CfH, K); rocksdb:transaction_delete(TxH, CfH, K);
rdb_delete_(#{db_ref := DbRef, cf_handle := CfH}, K, WOpts) -> rdb_delete_(#{db_ref := DbRef, cf_handle := CfH}, K, WOpts) ->
rocksdb:delete(DbRef, CfH, K, WOpts). rocksdb:delete(DbRef, CfH, K, WOpts).
@ -1386,7 +1407,7 @@ rdb_iterator(R) -> rdb_iterator(R, []).
rdb_iterator(R, Opts) -> rdb_iterator(R, Opts) ->
rdb_iterator_(R, read_opts(R, Opts)). rdb_iterator_(R, read_opts(R, Opts)).
rdb_iterator_(#{db_ref := DbRef, tx_handle := TxH, cf_handle := CfH}, ROpts) -> rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts); rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) -> rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
rocksdb:iterator(DbRef, CfH, ROpts). rocksdb:iterator(DbRef, CfH, ROpts).

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_index). -module(mrdb_index).
-export([ -export([

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_mutex). -module(mrdb_mutex).
-export([ do/2 ]). -export([ do/2 ]).

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_select). -module(mrdb_select).
-export([ select/3 %% (Ref, MatchSpec, Limit) -export([ select/3 %% (Ref, MatchSpec, Limit)

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mnesia_rocksdb_SUITE). -module(mnesia_rocksdb_SUITE).
-export([ -export([
@ -16,7 +17,8 @@
, encoding_binary_binary/1 , encoding_binary_binary/1
, encoding_defaults/1 , encoding_defaults/1
]). ]).
-export([ mrdb_transactions/1 -export([ mrdb_batch/1
, mrdb_transactions/1
, mrdb_repeated_transactions/1 , mrdb_repeated_transactions/1
, mrdb_abort/1 , mrdb_abort/1
, mrdb_two_procs/1 , mrdb_two_procs/1
@ -43,7 +45,8 @@ groups() ->
, {checks, [sequence], [ encoding_sext_attrs , {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary , encoding_binary_binary
, encoding_defaults ]} , encoding_defaults ]}
, {mrdb, [sequence], [ mrdb_transactions , {mrdb, [sequence], [ mrdb_batch
, mrdb_transactions
, mrdb_repeated_transactions , mrdb_repeated_transactions
, mrdb_abort , mrdb_abort
, mrdb_two_procs , mrdb_two_procs
@ -139,6 +142,43 @@ expect_error(F, Line, Type, Expected) ->
ok ok
end. end.
mrdb_batch(Config) ->
Created = create_tabs([{b, []}], Config),
D0 = get_dict(),
mrdb:activity(
batch, rdb,
fun() ->
[mrdb:insert(b, {b, K, K})
|| K <- lists:seq(1, 10)]
end),
dictionary_unchanged(D0),
[[{b,K,K}] = mrdb:read(b, K) || K <- lists:seq(1, 10)],
expect_error(
fun() -> mrdb:activity(
batch, rdb,
fun() ->
mrdb:insert(b, {b, 11, 11}),
error(willful_abort)
end)
end, ?LINE, error, '_'),
dictionary_unchanged(D0),
[] = mrdb:read(b, 11),
TRef = mrdb:get_ref(b),
mrdb:activity(
batch, rdb,
fun() ->
mrdb:insert(TRef, {b, 12, 12})
end),
dictionary_unchanged(D0),
[{b, 12, 12}] = mrdb:read(b, 12),
mrdb:as_batch(b, fun(R) ->
mrdb:insert(R, {b, 13, 13})
end),
dictionary_unchanged(D0),
[{b, 13, 13}] = mrdb:read(b, 13),
delete_tabs(Created),
ok.
mrdb_transactions(Config) -> mrdb_transactions(Config) ->
tr_ct:with_trace(fun mrdb_transactions_/1, Config, tr_ct:with_trace(fun mrdb_transactions_/1, Config,
tr_patterns( tr_patterns(
@ -149,13 +189,17 @@ mrdb_transactions_(Config) ->
Created = create_tabs([{tx, []}], Config), Created = create_tabs([{tx, []}], Config),
mrdb:insert(tx, {tx, a, 1}), mrdb:insert(tx, {tx, a, 1}),
[_] = mrdb:read(tx, a), [_] = mrdb:read(tx, a),
D0 = get_dict(),
mrdb:activity( mrdb:activity(
tx, rdb, tx, rdb,
fun() -> fun() ->
[{tx,a,N}] = mrdb:read(tx, a), [{tx,a,N}] = mrdb:read(tx, a),
N1 = N+1, N1 = N+1,
ok = mrdb:insert(tx, {tx,a,N1}) ok = mrdb:insert(tx, {tx,a,N1}),
[{tx,a,N1}] = mrdb:read(tx, a),
ok
end), end),
dictionary_unchanged(D0),
[{tx,a,2}] = mrdb:read(tx,a), [{tx,a,2}] = mrdb:read(tx,a),
delete_tabs(Created), delete_tabs(Created),
ok. ok.
@ -169,7 +213,9 @@ mrdb_repeated_transactions(Config) ->
N1 = N+1, N1 = N+1,
ok = mrdb:insert(rtx, {rtx, a, N1}) ok = mrdb:insert(rtx, {rtx, a, N1})
end, end,
D0 = get_dict(),
[ok = mrdb:activity(tx, rdb, Fun) || _ <- lists:seq(1,100)], [ok = mrdb:activity(tx, rdb, Fun) || _ <- lists:seq(1,100)],
dictionary_unchanged(D0),
[{rtx,a,100}] = mrdb:read(rtx, a), [{rtx,a,100}] = mrdb:read(rtx, a),
delete_tabs(Created), delete_tabs(Created),
ok. ok.
@ -178,6 +224,7 @@ mrdb_abort(Config) ->
Created = create_tabs([{tx_abort, []}], Config), Created = create_tabs([{tx_abort, []}], Config),
mrdb:insert(tx_abort, {tx_abort, a, 1}), mrdb:insert(tx_abort, {tx_abort, a, 1}),
Pre = mrdb:read(tx_abort, a), Pre = mrdb:read(tx_abort, a),
D0 = get_dict(),
TRes = try mrdb:activity( TRes = try mrdb:activity(
tx, rdb, tx, rdb,
fun() -> fun() ->
@ -190,6 +237,7 @@ mrdb_abort(Config) ->
error:abort_here -> error:abort_here ->
ok ok
end, end,
dictionary_unchanged(D0),
ok = TRes, ok = TRes,
Pre = mrdb:read(tx_abort, a), Pre = mrdb:read(tx_abort, a),
delete_tabs(Created), delete_tabs(Created),
@ -202,7 +250,7 @@ mrdb_two_procs(Config) ->
tr_patterns( tr_patterns(
mrdb, [ {mrdb, insert, 2, x} mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x} , {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts()))). , {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) -> mrdb_two_procs_(Config) ->
R = ?FUNCTION_NAME, R = ?FUNCTION_NAME,
@ -217,7 +265,9 @@ mrdb_two_procs_(Config) ->
end, end,
{POther, MRef} = spawn_opt( {POther, MRef} = spawn_opt(
fun() -> fun() ->
ok = mrdb:activity(tx, rdb, F0) D0 = get_dict(),
ok = mrdb:activity(tx, rdb, F0),
dictionary_unchanged(D0)
end, [monitor]), end, [monitor]),
F1 = fun() -> F1 = fun() ->
Pre = mrdb:read(R, a), Pre = mrdb:read(R, a),
@ -227,6 +277,7 @@ mrdb_two_procs_(Config) ->
ok = mrdb:insert(R, {R, a, 18}) ok = mrdb:insert(R, {R, a, 18})
end, end,
go_ahead_other(1, POther), go_ahead_other(1, POther),
Do0 = get_dict(),
try mrdb:activity({tx, #{no_snapshot => true, try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of retries => 0}}, rdb, F1) of
ok -> error(unexpected) ok -> error(unexpected)
@ -234,6 +285,7 @@ mrdb_two_procs_(Config) ->
error:{error, "Resource busy" ++ _} -> error:{error, "Resource busy" ++ _} ->
ok ok
end, end,
dictionary_unchanged(Do0),
[{R, a, 17}] = mrdb:read(R, a), [{R, a, 17}] = mrdb:read(R, a),
delete_tabs(Created), delete_tabs(Created),
ok. ok.
@ -271,7 +323,9 @@ mrdb_two_procs_tx_restart_(Config) ->
ok = mrdb:insert(R, {R, a, 18}) ok = mrdb:insert(R, {R, a, 18})
end, end,
go_ahead_other(1, POther), go_ahead_other(1, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1), mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a), [{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created), delete_tabs(Created),
ok. ok.
@ -309,7 +363,9 @@ mrdb_two_procs_snap(Config) ->
end, end,
{POther, MRef} = {POther, MRef} =
spawn_opt(fun() -> spawn_opt(fun() ->
ok = mrdb:activity(tx, rdb, F0) D0 = get_dict(),
ok = mrdb:activity(tx, rdb, F0),
dictionary_unchanged(D0)
end, [monitor]), end, [monitor]),
F1 = fun() -> F1 = fun() ->
Att = get_attempt(), Att = get_attempt(),
@ -324,7 +380,9 @@ mrdb_two_procs_snap(Config) ->
mrdb:insert(R, {R, b, 18}), mrdb:insert(R, {R, b, 18}),
1477 1477
end, end,
Do0 = get_dict(),
1477 = mrdb:activity(tx, rdb, F1), 1477 = mrdb:activity(tx, rdb, F1),
dictionary_unchanged(Do0),
[{R, a, 17}] = mrdb:read(R, a), [{R, a, 17}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b), [{R, b, 18}] = mrdb:read(R, b),
delete_tabs(Created), delete_tabs(Created),
@ -351,11 +409,13 @@ mrdb_three_procs_(Config) ->
end, end,
{P1, MRef1} = {P1, MRef1} =
spawn_opt(fun() -> spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows( do_when_p_allows(
1, Parent, ?LINE, 1, Parent, ?LINE,
fun() -> fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1) ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end) end),
dictionary_unchanged(D0)
end, [monitor]), end, [monitor]),
F2 = fun() -> F2 = fun() ->
[A0] = mrdb:read(R, a), [A0] = mrdb:read(R, a),
@ -371,6 +431,7 @@ mrdb_three_procs_(Config) ->
end, end,
{P2, MRef2} = {P2, MRef2} =
spawn_opt(fun() -> spawn_opt(fun() ->
D0 = get_dict(),
try mrdb:activity( try mrdb:activity(
{tx, #{retries => 0, {tx, #{retries => 0,
no_snapshot => true}}, rdb, F2) of no_snapshot => true}}, rdb, F2) of
@ -378,8 +439,10 @@ mrdb_three_procs_(Config) ->
catch catch
error:{error, "Resource busy" ++ _} -> error:{error, "Resource busy" ++ _} ->
ok ok
end end,
dictionary_unchanged(D0)
end, [monitor]), end, [monitor]),
Do0 = get_dict(),
ok = mrdb:activity(tx, rdb, ok = mrdb:activity(tx, rdb,
fun() -> fun() ->
Att = get_attempt(), Att = get_attempt(),
@ -398,6 +461,7 @@ mrdb_three_procs_(Config) ->
await_other_down(P2, MRef2, ?LINE), await_other_down(P2, MRef2, ?LINE),
ok = mrdb:insert(R, {R, p0, 1}) ok = mrdb:insert(R, {R, p0, 1})
end), end),
dictionary_unchanged(Do0),
[{R, p1, 1}] = mrdb:read(R, p1), [{R, p1, 1}] = mrdb:read(R, p1),
[] = mrdb:read(R, p2), [] = mrdb:read(R, p2),
[A1] = mrdb:read(R, a), [A1] = mrdb:read(R, a),
@ -511,7 +575,7 @@ await_other_down_(P, MRef, Line) ->
end. end.
get_attempt() -> get_attempt() ->
#{attempt := Attempt} = mrdb:current_context(), #{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt. Attempt.
create_tabs(Tabs, Config) -> create_tabs(Tabs, Config) ->
@ -529,3 +593,14 @@ delete_tabs(Tabs) ->
[{atomic,ok} = mnesia:delete_table(T) || T <- Tabs], [{atomic,ok} = mnesia:delete_table(T) || T <- Tabs],
ok. ok.
get_dict() ->
{dictionary, D} = process_info(self(), dictionary),
[X || {K,_} = X <- D,
K =/= log_timestamp].
dictionary_unchanged(Old) ->
New = get_dict(),
#{ deleted := []
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.