Debugging direct access API

This commit is contained in:
Ulf Wiger 2022-07-08 11:23:12 +02:00
parent 0da3169a01
commit 93a296f6b9
4 changed files with 265 additions and 85 deletions

View File

@ -145,14 +145,6 @@
-import(mrdb, [ with_iterator/2 -import(mrdb, [ with_iterator/2
]). ]).
-import(mnesia_rocksdb_admin, [ get_ref/1 ]).
%% -import(mnesia_rocksdb_lib, [ encode_key/2
%% , encode_val/3
%% , decode_key/2
%% , decode_val/3
%% ]).
-include("mnesia_rocksdb.hrl"). -include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl"). -include("mnesia_rocksdb_int.hrl").
@ -165,10 +157,10 @@
%% RECORDS %% RECORDS
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
-record(st, { ref -record(st, { alias
, alias
, tab , tab
, type , type
, status
, on_error , on_error
}). }).
@ -219,6 +211,10 @@ register(Alias) ->
{error, Reason} {error, Reason}
end. end.
get_ref(Tab) ->
R = mnesia_rocksdb_admin:get_ref(Tab),
R#{mode => mnesia}.
default_alias() -> default_alias() ->
rocksdb_copies. rocksdb_copies.
@ -778,33 +774,24 @@ start_proc(Alias, Tab, Type, ProcName, Props, RdbOpts) ->
gen_server:start_link({local, ProcName}, ?MODULE, gen_server:start_link({local, ProcName}, ?MODULE,
{Alias, Tab, Type, Props, RdbOpts}, []). {Alias, Tab, Type, Props, RdbOpts}, []).
init({Alias, Tab, Type, _Props, RdbOpts}) -> init({Alias, Tab, Type, _Props, _RdbOpts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% In case of a process restart, we try to rebuild the state %% In case of a process restart, we try to rebuild the state
%% from the cf info held by the admin process. %% from the cf info held by the admin process.
Ref = case mnesia_rocksdb_admin:request_ref(Alias, Tab) of S0 = #st{ tab = Tab
{ok, Ref1} -> Ref1; , alias = Alias
{error, _} -> undefined , type = Type },
end, S = case mnesia_rocksdb_admin:request_ref(Alias, Tab) of
{ok, update_state(Ref, Alias, Tab, Type, RdbOpts, #st{})}. {ok, _Ref} -> S0#st{status = active};
{error, _} -> S0
update_state(Ref, Alias, Tab, Type, _RdbOpts, St) -> end,
St#st{ tab = Tab {ok, S}.
, alias = Alias
, type = Type
, ref = maybe_set_ref_mode(Ref)
}.
maybe_set_ref_mode(Ref) when is_map(Ref) ->
Ref#{mode => mnesia};
maybe_set_ref_mode(Ref) ->
Ref.
handle_call({create_table, Tab, Props}, _From, handle_call({create_table, Tab, Props}, _From,
#st{alias = Alias, tab = Tab} = St) -> #st{alias = Alias, tab = Tab} = St) ->
try mnesia_rocksdb_admin:create_table(Alias, Tab, Props) of try mnesia_rocksdb_admin:create_table(Alias, Tab, Props) of
{ok, Ref} -> {ok, _Ref} ->
{reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}}; {reply, ok, St#st{status = active}};
Other -> Other ->
{reply, Other, St} {reply, Other, St}
catch catch
@ -813,38 +800,38 @@ handle_call({create_table, Tab, Props}, _From,
end; end;
handle_call({load_table, _LoadReason, Props}, _From, handle_call({load_table, _LoadReason, Props}, _From,
#st{alias = Alias, tab = Tab} = St) -> #st{alias = Alias, tab = Tab} = St) ->
{ok, Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), {ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}}; {reply, ok, St#st{status = active}};
handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) -> handle_call({write_info, Key, Value}, _From, #st{tab = Tab} = St) ->
mrdb:write_info(Ref, Key, Value), mrdb:write_info(get_ref(Tab), Key, Value),
{reply, ok, St}; {reply, ok, St};
handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> handle_call({update_counter, C, Incr}, _From, #st{tab = Tab} = St) ->
{reply, mrdb:update_counter(Ref, C, Incr), St}; {reply, mrdb:update_counter(get_ref(Tab), C, Incr), St};
handle_call({read, Key} = M, _From, #st{ref = Ref} = St) -> handle_call({read, Key} = M, _From, #st{tab = Tab} = St) ->
{reply, ?TRY(mrdb:read(Ref, Key), M, St), St}; {reply, ?TRY(mrdb:read(get_ref(Tab), Key), M, St), St};
handle_call(first = M, _From, #st{ref = Ref} = St) -> handle_call(first = M, _From, #st{tab = Tab} = St) ->
{reply, ?TRY(mrdb:first(Ref), M, St), St}; {reply, ?TRY(mrdb:first(get_ref(Tab)), M, St), St};
handle_call({prev, Key} = M, _From, #st{ref = Ref} = St) -> handle_call({prev, Key} = M, _From, #st{tab = Tab} = St) ->
{reply, ?TRY(mrdb:prev(Ref, Key), M, St), St}; {reply, ?TRY(mrdb:prev(get_ref(Tab), Key), M, St), St};
handle_call({next, Key} = M, _From, #st{ref = Ref} = St) -> handle_call({next, Key} = M, _From, #st{tab = Tab} = St) ->
{reply, ?TRY(mrdb:next(Ref, Key), M, St), St}; {reply, ?TRY(mrdb:next(get_ref(Tab), Key), M, St), St};
handle_call(last = M, _From, #st{ref = Ref} = St) -> handle_call(last = M, _From, #st{tab = Tab} = St) ->
{reply, ?TRY(mrdb:last(Ref), M, St), St}; {reply, ?TRY(mrdb:last(get_ref(Tab)), M, St), St};
handle_call({insert, Obj}, _From, St) -> handle_call({insert, Obj}, _From, St) ->
Res = do_insert(Obj, St), Res = do_insert(Obj, St),
{reply, Res, St}; {reply, Res, St};
handle_call({delete, Key}, _From, St) -> handle_call({delete, Key}, _From, St) ->
Res = do_delete(Key, St), Res = do_delete(Key, St),
{reply, Res, St}; {reply, Res, St};
handle_call({match_delete, Pat}, _From, #st{ref = Ref} = St) -> handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
Res = mrdb:match_delete(Ref, Pat), Res = mrdb:match_delete(get_ref(Tab), Pat),
{reply, Res, St}; {reply, Res, St};
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) -> handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
_ = mnesia_rocksdb_admin:close_table(Alias, Tab), _ = mnesia_rocksdb_admin:close_table(Alias, Tab),
{reply, ok, St#st{ref = undefined}}; {reply, ok, St#st{status = undefined}};
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) -> handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
ok = mnesia_rocksdb_admin:delete_table(Alias, Tab), ok = mnesia_rocksdb_admin:delete_table(Alias, Tab),
{stop, normal, ok, St#st{ref = undefined}}. {stop, normal, ok, St#st{status = undefined}}.
handle_cast(_, St) -> handle_cast(_, St) ->
{noreply, St}. {noreply, St}.
@ -901,12 +888,12 @@ do_call(P, Req) ->
end. end.
%% server-side end of insert/3. %% server-side end of insert/3.
do_insert(Obj, #st{ref = Ref} = St) -> do_insert(Obj, #st{tab = Tab} = St) ->
db_insert(Ref, Obj, [], St). db_insert(get_ref(Tab), Obj, [], St).
%% server-side part %% server-side part
do_delete(Key, #st{ref = Ref} = St) -> do_delete(Key, #st{tab = Tab} = St) ->
db_delete(Ref, Key, [], St). db_delete(get_ref(Tab), Key, [], St).
proc_name(_Alias, Tab) -> proc_name(_Alias, Tab) ->
list_to_atom("mnesia_ext_proc_" ++ mnesia_rocksdb_lib:tabname(Tab)). list_to_atom("mnesia_ext_proc_" ++ mnesia_rocksdb_lib:tabname(Tab)).

View File

@ -15,6 +15,7 @@
, get_ref/2 %% (Name, Default -> Ref | Default , get_ref/2 %% (Name, Default -> Ref | Default
, request_ref/2 %% (Alias, Name) -> Ref , request_ref/2 %% (Alias, Name) -> Ref
, close_table/2 , close_table/2
, clear_table/1
]). ]).
-export([ migrate_standalone/2 ]). -export([ migrate_standalone/2 ]).
@ -34,7 +35,9 @@
, write_table_property/3 %% (Alias, Tab, Property) , write_table_property/3 %% (Alias, Tab, Property)
]). ]).
-export([meta/0]). -export([ meta/0
, get_cached_env/2
, set_and_cache_env/2 ]).
-include("mnesia_rocksdb.hrl"). -include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl"). -include("mnesia_rocksdb_int.hrl").
@ -73,7 +76,8 @@
| {remove_aliases, [alias()]} | {remove_aliases, [alias()]}
| {migrate, [{tabname(), map()}]} | {migrate, [{tabname(), map()}]}
| {prep_close, table()} | {prep_close, table()}
| {close_table, table()}. | {close_table, table()}
| {clear_table, table() | cf() }.
-type reason() :: any(). -type reason() :: any().
-type reply() :: any(). -type reply() :: any().
@ -126,6 +130,30 @@ erase_pt_list(Names) ->
Meta = meta(), Meta = meta(),
persistent_term:put(?PT_KEY, maps:without(Names, Meta)). persistent_term:put(?PT_KEY, maps:without(Names, Meta)).
check_application_defaults(Meta) ->
Value = application:get_env(mnesia_rocksdb, mnesia_compatible_aborts, false),
Meta#{ {mnesia_compatible_aborts} => Value }.
get_cached_env(Key, Default) ->
maps:get({Key}, meta(), Default).
set_and_cache_env_(Key, Value) when is_atom(Key) ->
Meta = meta(),
Prev = maps:get({Key}, Meta, undefined),
application:set_env(mnesia_rocksdb, Key, Value),
persistent_term:put(?PT_KEY, Meta#{{Key} => Value}),
Prev.
maybe_initial_meta() ->
case persistent_term:get(?PT_KEY, undefined) of
undefined ->
M = check_application_defaults(#{}),
persistent_term:put(?PT_KEY, M),
M;
M when is_map(M) ->
M
end.
meta() -> meta() ->
persistent_term:get(?PT_KEY, #{}). persistent_term:get(?PT_KEY, #{}).
@ -137,6 +165,9 @@ prep_close(_, _) ->
get_pt(Name, Default) -> get_pt(Name, Default) ->
maps:get(Name, meta(), Default). maps:get(Name, meta(), Default).
set_and_cache_env(Key, Value) ->
gen_server:call(?MODULE, {set_and_cache_env, Key, Value}).
create_table(Alias, Name, Props) -> create_table(Alias, Name, Props) ->
call(Alias, {create_table, Name, Props}). call(Alias, {create_table, Name, Props}).
@ -171,6 +202,16 @@ request_ref(Alias, Name) ->
close_table(Alias, Name) -> close_table(Alias, Name) ->
call(Alias, {close_table, Name}). call(Alias, {close_table, Name}).
clear_table(#{alias := Alias, name := Name}) ->
case Name of
{admin, _} -> mnesia:abort({bad_type, Name});
{_} -> mnesia:abort({no_exists, Name});
_ ->
call(Alias, {clear_table, Name})
end;
clear_table(Name) ->
clear_table(get_ref(Name)).
add_aliases(Aliases) -> add_aliases(Aliases) ->
call([], {add_aliases, Aliases}). call([], {add_aliases, Aliases}).
@ -276,6 +317,7 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) -> init([]) ->
_ = maybe_initial_meta(), %% bootstrap pt
Opts = default_opts(), Opts = default_opts(),
process_flag(trap_exit, true), process_flag(trap_exit, true),
mnesia:subscribe({table, schema, simple}), mnesia:subscribe({table, schema, simple}),
@ -317,7 +359,10 @@ recover_tab({T, #{db_ref := DbRef,
update_cf(Alias, T, R, St); update_cf(Alias, T, R, St);
false -> false ->
error({cannot_access_table, T}) error({cannot_access_table, T})
end. end;
recover_tab(_, St) ->
St.
%% TODO: generalize %% TODO: generalize
get_aliases() -> get_aliases() ->
@ -376,6 +421,9 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
end. end.
-spec handle_call({alias(), req()}, any(), st()) -> gen_server_reply(). -spec handle_call({alias(), req()}, any(), st()) -> gen_server_reply().
handle_call({set_and_cache_env, Key, Value}, _From, St) ->
Res = set_and_cache_env_(Key, Value),
{reply, Res, St};
handle_call({[], {add_aliases, Aliases}}, _From, St) -> handle_call({[], {add_aliases, Aliases}}, _From, St) ->
St1 = do_add_aliases(Aliases, St), St1 = do_add_aliases(Aliases, St),
{reply, ok, St1}; {reply, ok, St1};
@ -492,6 +540,23 @@ handle_req(Alias, {delete_table, Name}, Backend, St) ->
{error, not_found} -> {error, not_found} ->
{reply, ok, St} {reply, ok, St}
end; end;
handle_req(Alias, {clear_table, Name}, Backend, #st{} = St) ->
case find_cf(Alias, Name, Backend, St) of
{ok, #{ status := open
, type := column_family
, db_ref := DbRef
, cf_handle := CfH} = Cf} ->
CfName = tab_to_cf_name(Name),
ok = rocksdb:drop_column_family(DbRef, CfH),
{ok, CfH1} = rocksdb:create_column_family(DbRef, CfName, cfopts()),
ok = rocksdb:destroy_column_family(DbRef, CfH),
Cf1 = Cf#{cf_handle := CfH1},
St1 = update_cf(Alias, Name, Cf1, St),
put_pt(Name, Cf1),
{reply, ok, St1};
_ ->
{reply, {error, not_found}, St}
end;
handle_req(Alias, {get_ref, Name}, Backend, #st{} = St) -> handle_req(Alias, {get_ref, Name}, Backend, #st{} = St) ->
case find_cf(Alias, Name, Backend, St) of case find_cf(Alias, Name, Backend, St) of
{ok, #{status := open} = Ref} -> {ok, #{status := open} = Ref} ->

View File

@ -53,6 +53,7 @@
, delete/2 , delete/3 , delete/2 , delete/3
, delete_object/2, delete_object/3 , delete_object/2, delete_object/3
, match_delete/2 , match_delete/2
, clear_table/1
, batch_write/2 , batch_write/3 , batch_write/2 , batch_write/3
, update_counter/3, update_counter/4 , update_counter/3, update_counter/4
, as_batch/2 , as_batch/3 , as_batch/2 , as_batch/3
@ -285,7 +286,7 @@ do_activity(F, Alias, Ctxt, WithLock) ->
do_activity(F, Alias, Ctxt, true) do_activity(F, Alias, Ctxt, true)
end end
catch catch
Cat:Err when Cat==error; Cat==exit -> Cat:Err ->
abort_and_pop(Cat, Err) abort_and_pop(Cat, Err)
end. end.
@ -315,20 +316,18 @@ incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef }
ctxt() -> {?MODULE, ctxt}. ctxt() -> {?MODULE, ctxt}.
push_ctxt(C) -> push_ctxt(C) ->
K = ctxt(), C1 = case get_ctxt() of
C1 = case get(K) of
undefined -> [C]; undefined -> [C];
C0 -> [C|C0] C0 -> [C|C0]
end, end,
put(K, C1), put_ctxt(C1),
ok. ok.
pop_ctxt() -> pop_ctxt() ->
K = ctxt(), case get_ctxt() of
case get(K) of
undefined -> error(no_ctxt); undefined -> error(no_ctxt);
[C] -> erase(K) , maybe_release_snapshot(C); [C] -> erase_ctxt() , maybe_release_snapshot(C);
[H|T] -> put(K, T), maybe_release_snapshot(H) [H|T] -> put_ctxt(T), maybe_release_snapshot(H)
end. end.
maybe_release_snapshot(#{snapshot := SH} = C) -> maybe_release_snapshot(#{snapshot := SH} = C) ->
@ -342,7 +341,7 @@ maybe_release_snapshot(C) ->
C. C.
current_context() -> current_context() ->
case get(ctxt()) of case pdict_get(ctxt()) of
[C|_] -> [C|_] ->
C; C;
undefined -> undefined ->
@ -432,6 +431,7 @@ commit_and_pop(Res) ->
end end
end. end.
-spec abort_and_pop(atom(), any()) -> no_return().
abort_and_pop(Cat, Err) -> abort_and_pop(Cat, Err) ->
%% We can pop the context right away, since there is no %% We can pop the context right away, since there is no
%% complex failure handling (like retry-on-busy) for rollback. %% complex failure handling (like retry-on-busy) for rollback.
@ -440,12 +440,48 @@ abort_and_pop(Cat, Err) ->
tx -> ok = rdb_transaction_rollback(H); tx -> ok = rdb_transaction_rollback(H);
batch -> ok = release_batches(H) batch -> ok = release_batches(H)
end, end,
case Cat of return_abort(Type, Cat, Err).
error -> error(Err);
exit -> exit(Err) -spec return_abort(batch | tx, atom(), any()) -> no_return().
%% throw -> throw(Err) return_abort(batch, Cat, Err) ->
re_throw(Cat, Err);
return_abort(tx, Cat, Err) ->
case mnesia_compatible_aborts() of
true ->
%% Mnesia always captures stack traces, but this could actually become a
%% performance issue in some cases (generally, it's better not to lazily
%% produce stack traces.) Since we want to pushe the option checking for
%% mnesia-abort-style compatibility to AFTER detecting an abort, we don't
%% order a stack trace initially, and instead insert an empty list.
%% (The exact stack trace wouldn't be the same anyway.)
Err1 =
case Cat of
error -> {fix_error(Err), []};
exit -> fix_error(Err);
throw -> {throw, Err}
end,
exit({aborted, Err1});
false ->
re_throw(Cat, Err)
end. end.
-spec re_throw(atom(), any()) -> no_return().
re_throw(Cat, Err) ->
case Cat of
error -> error(Err);
exit -> exit(Err);
throw -> throw(Err)
end.
mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
fix_error({aborted, Err}) ->
Err;
fix_error(Err) ->
Err.
rdb_transaction(DbRef, Opts) -> rdb_transaction(DbRef, Opts) ->
rocksdb:transaction(DbRef, Opts). rocksdb:transaction(DbRef, Opts).
@ -476,7 +512,12 @@ rdb_release_batch(H) ->
%% @doc Aborts an ongoing {@link activity/2} %% @doc Aborts an ongoing {@link activity/2}
abort(Reason) -> abort(Reason) ->
erlang:error({mrdb_abort, Reason}). case mnesia_compatible_aborts() of
true ->
mnesia:abort(Reason);
false ->
erlang:error({mrdb_abort, Reason})
end.
-spec new_tx(table() | db_ref()) -> db_ref(). -spec new_tx(table() | db_ref()) -> db_ref().
new_tx(Tab) -> new_tx(Tab) ->
@ -514,9 +555,18 @@ get_ref(Tab) ->
-spec ensure_ref(ref_or_tab()) -> db_ref(). -spec ensure_ref(ref_or_tab()) -> db_ref().
ensure_ref(#{activity := _} = R) -> R; ensure_ref(#{activity := _} = R) -> R;
ensure_ref(Ref) when is_map(Ref) -> ensure_ref(Ref) when is_map(Ref) ->
maybe_tx_ctxt(get(ctxt()), Ref); maybe_tx_ctxt(get_ctxt(), Ref);
ensure_ref(Other) -> ensure_ref(Other) ->
maybe_tx_ctxt(get(ctxt()), get_ref(Other)). maybe_tx_ctxt(get_ctxt(), get_ref(Other)).
get_ctxt() ->
pdict_get(ctxt()).
put_ctxt(C) ->
pdict_put(ctxt(), C).
erase_ctxt() ->
pdict_erase(ctxt()).
ensure_ref(Ref, R) when is_map(Ref) -> ensure_ref(Ref, R) when is_map(Ref) ->
inherit_ctxt(Ref, R); inherit_ctxt(Ref, R);
@ -933,6 +983,9 @@ as_batch(Tab, F, Opts) when is_function(F, 1), is_list(Opts) ->
as_batch_(#{activity := #{type := batch}} = R, F, _) -> as_batch_(#{activity := #{type := batch}} = R, F, _) ->
%% If already inside a batch, add to that batch (batches don't seem to nest) %% If already inside a batch, add to that batch (batches don't seem to nest)
F(R); F(R);
as_batch_(#{activity := #{type := tx}} = R, F, _) ->
%% If already inside a tx, we need to respect the tx context
F(R);
as_batch_(#{db_ref := DbRef} = R, F, Opts) -> as_batch_(#{db_ref := DbRef} = R, F, Opts) ->
BatchRef = get_batch_(DbRef), BatchRef = get_batch_(DbRef),
try F(R#{activity => #{type => batch, handle => BatchRef}}) of try F(R#{activity => #{type => batch, handle => BatchRef}}) of
@ -960,34 +1013,41 @@ get_batch(_) ->
get_batch_(DbRef) -> get_batch_(DbRef) ->
Ref = make_ref(), Ref = make_ref(),
{ok, Batch} = rdb_batch(), {ok, Batch} = rdb_batch(),
put({mrdb_batch, Ref}, #{DbRef => Batch}), pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
Ref. Ref.
get_batch_(DbRef, BatchRef) -> get_batch_(DbRef, BatchRef) ->
Key = batch_ref_key(BatchRef), Key = batch_ref_key(BatchRef),
case get(Key) of case pdict_get(Key) of
undefined -> undefined ->
error(stale_batch_ref); error(stale_batch_ref);
#{DbRef := Batch} -> #{DbRef := Batch} ->
Batch; Batch;
Map -> Map ->
{ok, Batch} = rdb_batch(), {ok, Batch} = rdb_batch(),
put(Key, Map#{DbRef => Batch}), pdict_put(Key, Map#{DbRef => Batch}),
Batch Batch
end. end.
pdict_get(K) -> get(K).
pdict_put(K, V) -> put(K, V).
pdict_erase(K) -> erase(K).
batch_ref_key(BatchRef) -> batch_ref_key(BatchRef) ->
{mrdb_batch, BatchRef}. {mrdb_batch, BatchRef}.
write_batches(BatchRef, Opts) -> write_batches(BatchRef, Opts) ->
Key = batch_ref_key(BatchRef), Key = batch_ref_key(BatchRef),
case get(Key) of case pdict_get(Key) of
undefined -> undefined ->
error(stale_batch_ref); error(stale_batch_ref);
Map -> Map ->
%% Some added complication since we deal with potentially %% Some added complication since we deal with potentially
%% multiple DbRefs, and will want to return errors. %% multiple DbRefs, and will want to return errors.
erase(Key), pdict_erase(Key),
ret_batch_write_acc( ret_batch_write_acc(
maps:fold( maps:fold(
fun(DbRef, Batch, Acc) -> fun(DbRef, Batch, Acc) ->
@ -1013,11 +1073,11 @@ acc_batch_write_error(E, DbRef, Es) when is_list(Es) ->
release_batches(BatchRef) -> release_batches(BatchRef) ->
Key = batch_ref_key(BatchRef), Key = batch_ref_key(BatchRef),
case get(Key) of case pdict_get(Key) of
undefined -> undefined ->
ok; ok;
Map -> Map ->
erase(Key), pdict_erase(Key),
maps_foreach( maps_foreach(
fun(_, Batch) -> fun(_, Batch) ->
rdb_release_batch(Batch) rdb_release_batch(Batch)
@ -1149,13 +1209,21 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
select(Cont) -> select(Cont) ->
mrdb_select:select(Cont). mrdb_select:select(Cont).
clear_table(Tab) ->
match_delete(Tab, '_').
match_delete(Tab, Pat) -> match_delete(Tab, Pat) ->
Ref = ensure_ref(Tab), Ref = ensure_ref(Tab),
MatchSpec = [{Pat, [], [true]}], case Pat of
as_batch(Ref, fun(R) -> '_' ->
%% call select() with AccKeys=true, returning [{Key, _}] delete_whole_table(Ref);
match_delete_(mrdb_select:select(Ref, MatchSpec, true, 30), R) _ ->
end). MatchSpec = [{Pat, [], [true]}],
as_batch(Ref, fun(R) ->
%% call select() with AccKeys=true, returning [{Key, _}]
match_delete_(mrdb_select:select(Ref, MatchSpec, true, 30), R)
end)
end.
match_delete_({L, Cont}, Ref) -> match_delete_({L, Cont}, Ref) ->
[rdb_delete(Ref, K, []) || {K,_} <- L], [rdb_delete(Ref, K, []) || {K,_} <- L],
@ -1163,6 +1231,25 @@ match_delete_({L, Cont}, Ref) ->
match_delete_('$end_of_table', _) -> match_delete_('$end_of_table', _) ->
ok. ok.
delete_whole_table(Ref) ->
mnesia_rocksdb_admin:clear_table(Ref).
%% with_rdb_iterator(
%% Ref,
%% fun(I) ->
%% case i_move(I, first) of
%% {ok, First, _} ->
%% {ok, Last, _} = i_move(I, last),
%% %% Pad Last, since delete_range excludes EndKey
%% delete_range(Ref, First, <<Last/binary, 0>>);
%% _ ->
%% ok
%% end
%% end).
%% N.B. rocksdb:delete_range/5 isn't implemented yet
%% delete_range(#{db_ref := DbRef, cf_handle := CfH} = R, Start, End) ->
%% rocksdb:delete_range(DbRef, CfH, Start, End, write_opts(R, [])).
fold(Tab, Fun, Acc) -> fold(Tab, Fun, Acc) ->
fold(Tab, Fun, Acc, [{'_', [], ['$_']}]). fold(Tab, Fun, Acc, [{'_', [], ['$_']}]).

View File

@ -19,6 +19,7 @@
]). ]).
-export([ mrdb_batch/1 -export([ mrdb_batch/1
, mrdb_transactions/1 , mrdb_transactions/1
, mrdb_abort_reasons/1
, mrdb_repeated_transactions/1 , mrdb_repeated_transactions/1
, mrdb_abort/1 , mrdb_abort/1
, mrdb_two_procs/1 , mrdb_two_procs/1
@ -47,6 +48,7 @@ groups() ->
, encoding_defaults ]} , encoding_defaults ]}
, {mrdb, [sequence], [ mrdb_batch , {mrdb, [sequence], [ mrdb_batch
, mrdb_transactions , mrdb_transactions
, mrdb_abort_reasons
, mrdb_repeated_transactions , mrdb_repeated_transactions
, mrdb_abort , mrdb_abort
, mrdb_two_procs , mrdb_two_procs
@ -204,6 +206,45 @@ mrdb_transactions_(Config) ->
delete_tabs(Created), delete_tabs(Created),
ok. ok.
mrdb_abort_reasons(_Config) ->
Prev = mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, true),
X = some_value,
compare_txs('throw', fun() -> throw(X) end),
compare_txs('exit' , fun() -> exit(X) end),
compare_txs('error', fun() -> error(X) end),
compare_txs('abort', fun() -> mnesia:abort(X) end),
compare_txs('abort' , fun() -> mrdb:abort(X) end),
mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, Prev),
ok.
compare_txs(Type, F) ->
{caught, exit, {aborted, EMn}} = mnesia_tx(F),
{caught, exit, {aborted, EMr}} = mrdb_tx(F),
ct:log("Mnesia = ~p/~p", [Type, EMn]),
ct:log("Mrdb = ~p/~p", [Type, EMr]),
case {Type, EMn, EMr} of
{error, {some_value, [_|_]}, {some_value, []}} -> ok;
{throw, {throw, some_value}, {throw, some_value}} -> ok;
{exit, some_value, some_value} -> ok;
{abort, some_value, some_value} -> ok
end.
mnesia_tx(F) ->
try
mnesia:activity(transaction, F)
catch
C:E ->
{caught, C, E}
end.
mrdb_tx(F) ->
try
mrdb:activity(transaction, rdb, F)
catch
C:E ->
{caught, C, E}
end.
mrdb_repeated_transactions(Config) -> mrdb_repeated_transactions(Config) ->
Created = create_tabs([{rtx, []}], Config), Created = create_tabs([{rtx, []}], Config),
mrdb:insert(rtx, {rtx, a, 0}), mrdb:insert(rtx, {rtx, a, 0}),