diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 3ba11a5..8cb4b6d 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -145,14 +145,6 @@ -import(mrdb, [ with_iterator/2 ]). --import(mnesia_rocksdb_admin, [ get_ref/1 ]). - -%% -import(mnesia_rocksdb_lib, [ encode_key/2 -%% , encode_val/3 -%% , decode_key/2 -%% , decode_val/3 -%% ]). - -include("mnesia_rocksdb.hrl"). -include("mnesia_rocksdb_int.hrl"). @@ -165,10 +157,10 @@ %% RECORDS %% ---------------------------------------------------------------------------- --record(st, { ref - , alias +-record(st, { alias , tab , type + , status , on_error }). @@ -219,6 +211,10 @@ register(Alias) -> {error, Reason} end. +get_ref(Tab) -> + R = mnesia_rocksdb_admin:get_ref(Tab), + R#{mode => mnesia}. + default_alias() -> rocksdb_copies. @@ -778,33 +774,24 @@ start_proc(Alias, Tab, Type, ProcName, Props, RdbOpts) -> gen_server:start_link({local, ProcName}, ?MODULE, {Alias, Tab, Type, Props, RdbOpts}, []). -init({Alias, Tab, Type, _Props, RdbOpts}) -> +init({Alias, Tab, Type, _Props, _RdbOpts}) -> process_flag(trap_exit, true), %% In case of a process restart, we try to rebuild the state %% from the cf info held by the admin process. - Ref = case mnesia_rocksdb_admin:request_ref(Alias, Tab) of - {ok, Ref1} -> Ref1; - {error, _} -> undefined - end, - {ok, update_state(Ref, Alias, Tab, Type, RdbOpts, #st{})}. - -update_state(Ref, Alias, Tab, Type, _RdbOpts, St) -> - St#st{ tab = Tab - , alias = Alias - , type = Type - , ref = maybe_set_ref_mode(Ref) - }. - -maybe_set_ref_mode(Ref) when is_map(Ref) -> - Ref#{mode => mnesia}; -maybe_set_ref_mode(Ref) -> - Ref. + S0 = #st{ tab = Tab + , alias = Alias + , type = Type }, + S = case mnesia_rocksdb_admin:request_ref(Alias, Tab) of + {ok, _Ref} -> S0#st{status = active}; + {error, _} -> S0 + end, + {ok, S}. handle_call({create_table, Tab, Props}, _From, #st{alias = Alias, tab = Tab} = St) -> try mnesia_rocksdb_admin:create_table(Alias, Tab, Props) of - {ok, Ref} -> - {reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}}; + {ok, _Ref} -> + {reply, ok, St#st{status = active}}; Other -> {reply, Other, St} catch @@ -813,38 +800,38 @@ handle_call({create_table, Tab, Props}, _From, end; handle_call({load_table, _LoadReason, Props}, _From, #st{alias = Alias, tab = Tab} = St) -> - {ok, Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), - {reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}}; -handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) -> - mrdb:write_info(Ref, Key, Value), + {ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), + {reply, ok, St#st{status = active}}; +handle_call({write_info, Key, Value}, _From, #st{tab = Tab} = St) -> + mrdb:write_info(get_ref(Tab), Key, Value), {reply, ok, St}; -handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> - {reply, mrdb:update_counter(Ref, C, Incr), St}; -handle_call({read, Key} = M, _From, #st{ref = Ref} = St) -> - {reply, ?TRY(mrdb:read(Ref, Key), M, St), St}; -handle_call(first = M, _From, #st{ref = Ref} = St) -> - {reply, ?TRY(mrdb:first(Ref), M, St), St}; -handle_call({prev, Key} = M, _From, #st{ref = Ref} = St) -> - {reply, ?TRY(mrdb:prev(Ref, Key), M, St), St}; -handle_call({next, Key} = M, _From, #st{ref = Ref} = St) -> - {reply, ?TRY(mrdb:next(Ref, Key), M, St), St}; -handle_call(last = M, _From, #st{ref = Ref} = St) -> - {reply, ?TRY(mrdb:last(Ref), M, St), St}; +handle_call({update_counter, C, Incr}, _From, #st{tab = Tab} = St) -> + {reply, mrdb:update_counter(get_ref(Tab), C, Incr), St}; +handle_call({read, Key} = M, _From, #st{tab = Tab} = St) -> + {reply, ?TRY(mrdb:read(get_ref(Tab), Key), M, St), St}; +handle_call(first = M, _From, #st{tab = Tab} = St) -> + {reply, ?TRY(mrdb:first(get_ref(Tab)), M, St), St}; +handle_call({prev, Key} = M, _From, #st{tab = Tab} = St) -> + {reply, ?TRY(mrdb:prev(get_ref(Tab), Key), M, St), St}; +handle_call({next, Key} = M, _From, #st{tab = Tab} = St) -> + {reply, ?TRY(mrdb:next(get_ref(Tab), Key), M, St), St}; +handle_call(last = M, _From, #st{tab = Tab} = St) -> + {reply, ?TRY(mrdb:last(get_ref(Tab)), M, St), St}; handle_call({insert, Obj}, _From, St) -> Res = do_insert(Obj, St), {reply, Res, St}; handle_call({delete, Key}, _From, St) -> Res = do_delete(Key, St), {reply, Res, St}; -handle_call({match_delete, Pat}, _From, #st{ref = Ref} = St) -> - Res = mrdb:match_delete(Ref, Pat), +handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) -> + Res = mrdb:match_delete(get_ref(Tab), Pat), {reply, Res, St}; handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) -> _ = mnesia_rocksdb_admin:close_table(Alias, Tab), - {reply, ok, St#st{ref = undefined}}; + {reply, ok, St#st{status = undefined}}; handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) -> ok = mnesia_rocksdb_admin:delete_table(Alias, Tab), - {stop, normal, ok, St#st{ref = undefined}}. + {stop, normal, ok, St#st{status = undefined}}. handle_cast(_, St) -> {noreply, St}. @@ -901,12 +888,12 @@ do_call(P, Req) -> end. %% server-side end of insert/3. -do_insert(Obj, #st{ref = Ref} = St) -> - db_insert(Ref, Obj, [], St). +do_insert(Obj, #st{tab = Tab} = St) -> + db_insert(get_ref(Tab), Obj, [], St). %% server-side part -do_delete(Key, #st{ref = Ref} = St) -> - db_delete(Ref, Key, [], St). +do_delete(Key, #st{tab = Tab} = St) -> + db_delete(get_ref(Tab), Key, [], St). proc_name(_Alias, Tab) -> list_to_atom("mnesia_ext_proc_" ++ mnesia_rocksdb_lib:tabname(Tab)). diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 944c65e..e39563c 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -15,6 +15,7 @@ , get_ref/2 %% (Name, Default -> Ref | Default , request_ref/2 %% (Alias, Name) -> Ref , close_table/2 + , clear_table/1 ]). -export([ migrate_standalone/2 ]). @@ -34,7 +35,9 @@ , write_table_property/3 %% (Alias, Tab, Property) ]). --export([meta/0]). +-export([ meta/0 + , get_cached_env/2 + , set_and_cache_env/2 ]). -include("mnesia_rocksdb.hrl"). -include("mnesia_rocksdb_int.hrl"). @@ -73,7 +76,8 @@ | {remove_aliases, [alias()]} | {migrate, [{tabname(), map()}]} | {prep_close, table()} - | {close_table, table()}. + | {close_table, table()} + | {clear_table, table() | cf() }. -type reason() :: any(). -type reply() :: any(). @@ -126,6 +130,30 @@ erase_pt_list(Names) -> Meta = meta(), persistent_term:put(?PT_KEY, maps:without(Names, Meta)). +check_application_defaults(Meta) -> + Value = application:get_env(mnesia_rocksdb, mnesia_compatible_aborts, false), + Meta#{ {mnesia_compatible_aborts} => Value }. + +get_cached_env(Key, Default) -> + maps:get({Key}, meta(), Default). + +set_and_cache_env_(Key, Value) when is_atom(Key) -> + Meta = meta(), + Prev = maps:get({Key}, Meta, undefined), + application:set_env(mnesia_rocksdb, Key, Value), + persistent_term:put(?PT_KEY, Meta#{{Key} => Value}), + Prev. + +maybe_initial_meta() -> + case persistent_term:get(?PT_KEY, undefined) of + undefined -> + M = check_application_defaults(#{}), + persistent_term:put(?PT_KEY, M), + M; + M when is_map(M) -> + M + end. + meta() -> persistent_term:get(?PT_KEY, #{}). @@ -137,6 +165,9 @@ prep_close(_, _) -> get_pt(Name, Default) -> maps:get(Name, meta(), Default). +set_and_cache_env(Key, Value) -> + gen_server:call(?MODULE, {set_and_cache_env, Key, Value}). + create_table(Alias, Name, Props) -> call(Alias, {create_table, Name, Props}). @@ -171,6 +202,16 @@ request_ref(Alias, Name) -> close_table(Alias, Name) -> call(Alias, {close_table, Name}). +clear_table(#{alias := Alias, name := Name}) -> + case Name of + {admin, _} -> mnesia:abort({bad_type, Name}); + {_} -> mnesia:abort({no_exists, Name}); + _ -> + call(Alias, {clear_table, Name}) + end; +clear_table(Name) -> + clear_table(get_ref(Name)). + add_aliases(Aliases) -> call([], {add_aliases, Aliases}). @@ -276,6 +317,7 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> + _ = maybe_initial_meta(), %% bootstrap pt Opts = default_opts(), process_flag(trap_exit, true), mnesia:subscribe({table, schema, simple}), @@ -317,7 +359,10 @@ recover_tab({T, #{db_ref := DbRef, update_cf(Alias, T, R, St); false -> error({cannot_access_table, T}) - end. + end; +recover_tab(_, St) -> + St. + %% TODO: generalize get_aliases() -> @@ -376,6 +421,9 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs end. -spec handle_call({alias(), req()}, any(), st()) -> gen_server_reply(). +handle_call({set_and_cache_env, Key, Value}, _From, St) -> + Res = set_and_cache_env_(Key, Value), + {reply, Res, St}; handle_call({[], {add_aliases, Aliases}}, _From, St) -> St1 = do_add_aliases(Aliases, St), {reply, ok, St1}; @@ -492,6 +540,23 @@ handle_req(Alias, {delete_table, Name}, Backend, St) -> {error, not_found} -> {reply, ok, St} end; +handle_req(Alias, {clear_table, Name}, Backend, #st{} = St) -> + case find_cf(Alias, Name, Backend, St) of + {ok, #{ status := open + , type := column_family + , db_ref := DbRef + , cf_handle := CfH} = Cf} -> + CfName = tab_to_cf_name(Name), + ok = rocksdb:drop_column_family(DbRef, CfH), + {ok, CfH1} = rocksdb:create_column_family(DbRef, CfName, cfopts()), + ok = rocksdb:destroy_column_family(DbRef, CfH), + Cf1 = Cf#{cf_handle := CfH1}, + St1 = update_cf(Alias, Name, Cf1, St), + put_pt(Name, Cf1), + {reply, ok, St1}; + _ -> + {reply, {error, not_found}, St} + end; handle_req(Alias, {get_ref, Name}, Backend, #st{} = St) -> case find_cf(Alias, Name, Backend, St) of {ok, #{status := open} = Ref} -> diff --git a/src/mrdb.erl b/src/mrdb.erl index d80cb63..cbd80e9 100644 --- a/src/mrdb.erl +++ b/src/mrdb.erl @@ -53,6 +53,7 @@ , delete/2 , delete/3 , delete_object/2, delete_object/3 , match_delete/2 + , clear_table/1 , batch_write/2 , batch_write/3 , update_counter/3, update_counter/4 , as_batch/2 , as_batch/3 @@ -285,7 +286,7 @@ do_activity(F, Alias, Ctxt, WithLock) -> do_activity(F, Alias, Ctxt, true) end catch - Cat:Err when Cat==error; Cat==exit -> + Cat:Err -> abort_and_pop(Cat, Err) end. @@ -315,20 +316,18 @@ incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } ctxt() -> {?MODULE, ctxt}. push_ctxt(C) -> - K = ctxt(), - C1 = case get(K) of + C1 = case get_ctxt() of undefined -> [C]; C0 -> [C|C0] end, - put(K, C1), + put_ctxt(C1), ok. pop_ctxt() -> - K = ctxt(), - case get(K) of + case get_ctxt() of undefined -> error(no_ctxt); - [C] -> erase(K) , maybe_release_snapshot(C); - [H|T] -> put(K, T), maybe_release_snapshot(H) + [C] -> erase_ctxt() , maybe_release_snapshot(C); + [H|T] -> put_ctxt(T), maybe_release_snapshot(H) end. maybe_release_snapshot(#{snapshot := SH} = C) -> @@ -342,7 +341,7 @@ maybe_release_snapshot(C) -> C. current_context() -> - case get(ctxt()) of + case pdict_get(ctxt()) of [C|_] -> C; undefined -> @@ -432,6 +431,7 @@ commit_and_pop(Res) -> end end. +-spec abort_and_pop(atom(), any()) -> no_return(). abort_and_pop(Cat, Err) -> %% We can pop the context right away, since there is no %% complex failure handling (like retry-on-busy) for rollback. @@ -440,12 +440,48 @@ abort_and_pop(Cat, Err) -> tx -> ok = rdb_transaction_rollback(H); batch -> ok = release_batches(H) end, - case Cat of - error -> error(Err); - exit -> exit(Err) - %% throw -> throw(Err) + return_abort(Type, Cat, Err). + +-spec return_abort(batch | tx, atom(), any()) -> no_return(). +return_abort(batch, Cat, Err) -> + re_throw(Cat, Err); +return_abort(tx, Cat, Err) -> + case mnesia_compatible_aborts() of + true -> + %% Mnesia always captures stack traces, but this could actually become a + %% performance issue in some cases (generally, it's better not to lazily + %% produce stack traces.) Since we want to pushe the option checking for + %% mnesia-abort-style compatibility to AFTER detecting an abort, we don't + %% order a stack trace initially, and instead insert an empty list. + %% (The exact stack trace wouldn't be the same anyway.) + Err1 = + case Cat of + error -> {fix_error(Err), []}; + exit -> fix_error(Err); + throw -> {throw, Err} + end, + exit({aborted, Err1}); + false -> + re_throw(Cat, Err) end. +-spec re_throw(atom(), any()) -> no_return(). +re_throw(Cat, Err) -> + case Cat of + error -> error(Err); + exit -> exit(Err); + throw -> throw(Err) + end. + + +mnesia_compatible_aborts() -> + mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false). + +fix_error({aborted, Err}) -> + Err; +fix_error(Err) -> + Err. + rdb_transaction(DbRef, Opts) -> rocksdb:transaction(DbRef, Opts). @@ -476,7 +512,12 @@ rdb_release_batch(H) -> %% @doc Aborts an ongoing {@link activity/2} abort(Reason) -> - erlang:error({mrdb_abort, Reason}). + case mnesia_compatible_aborts() of + true -> + mnesia:abort(Reason); + false -> + erlang:error({mrdb_abort, Reason}) + end. -spec new_tx(table() | db_ref()) -> db_ref(). new_tx(Tab) -> @@ -514,9 +555,18 @@ get_ref(Tab) -> -spec ensure_ref(ref_or_tab()) -> db_ref(). ensure_ref(#{activity := _} = R) -> R; ensure_ref(Ref) when is_map(Ref) -> - maybe_tx_ctxt(get(ctxt()), Ref); + maybe_tx_ctxt(get_ctxt(), Ref); ensure_ref(Other) -> - maybe_tx_ctxt(get(ctxt()), get_ref(Other)). + maybe_tx_ctxt(get_ctxt(), get_ref(Other)). + +get_ctxt() -> + pdict_get(ctxt()). + +put_ctxt(C) -> + pdict_put(ctxt(), C). + +erase_ctxt() -> + pdict_erase(ctxt()). ensure_ref(Ref, R) when is_map(Ref) -> inherit_ctxt(Ref, R); @@ -933,6 +983,9 @@ as_batch(Tab, F, Opts) when is_function(F, 1), is_list(Opts) -> as_batch_(#{activity := #{type := batch}} = R, F, _) -> %% If already inside a batch, add to that batch (batches don't seem to nest) F(R); +as_batch_(#{activity := #{type := tx}} = R, F, _) -> + %% If already inside a tx, we need to respect the tx context + F(R); as_batch_(#{db_ref := DbRef} = R, F, Opts) -> BatchRef = get_batch_(DbRef), try F(R#{activity => #{type => batch, handle => BatchRef}}) of @@ -960,34 +1013,41 @@ get_batch(_) -> get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(), - put({mrdb_batch, Ref}, #{DbRef => Batch}), + pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref. get_batch_(DbRef, BatchRef) -> Key = batch_ref_key(BatchRef), - case get(Key) of + case pdict_get(Key) of undefined -> error(stale_batch_ref); #{DbRef := Batch} -> Batch; Map -> {ok, Batch} = rdb_batch(), - put(Key, Map#{DbRef => Batch}), + pdict_put(Key, Map#{DbRef => Batch}), Batch end. + +pdict_get(K) -> get(K). + +pdict_put(K, V) -> put(K, V). + +pdict_erase(K) -> erase(K). + batch_ref_key(BatchRef) -> {mrdb_batch, BatchRef}. write_batches(BatchRef, Opts) -> Key = batch_ref_key(BatchRef), - case get(Key) of + case pdict_get(Key) of undefined -> error(stale_batch_ref); Map -> %% Some added complication since we deal with potentially %% multiple DbRefs, and will want to return errors. - erase(Key), + pdict_erase(Key), ret_batch_write_acc( maps:fold( fun(DbRef, Batch, Acc) -> @@ -1013,11 +1073,11 @@ acc_batch_write_error(E, DbRef, Es) when is_list(Es) -> release_batches(BatchRef) -> Key = batch_ref_key(BatchRef), - case get(Key) of + case pdict_get(Key) of undefined -> ok; Map -> - erase(Key), + pdict_erase(Key), maps_foreach( fun(_, Batch) -> rdb_release_batch(Batch) @@ -1149,13 +1209,21 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 -> select(Cont) -> mrdb_select:select(Cont). +clear_table(Tab) -> + match_delete(Tab, '_'). + match_delete(Tab, Pat) -> Ref = ensure_ref(Tab), - MatchSpec = [{Pat, [], [true]}], - as_batch(Ref, fun(R) -> - %% call select() with AccKeys=true, returning [{Key, _}] - match_delete_(mrdb_select:select(Ref, MatchSpec, true, 30), R) - end). + case Pat of + '_' -> + delete_whole_table(Ref); + _ -> + MatchSpec = [{Pat, [], [true]}], + as_batch(Ref, fun(R) -> + %% call select() with AccKeys=true, returning [{Key, _}] + match_delete_(mrdb_select:select(Ref, MatchSpec, true, 30), R) + end) + end. match_delete_({L, Cont}, Ref) -> [rdb_delete(Ref, K, []) || {K,_} <- L], @@ -1163,6 +1231,25 @@ match_delete_({L, Cont}, Ref) -> match_delete_('$end_of_table', _) -> ok. +delete_whole_table(Ref) -> + mnesia_rocksdb_admin:clear_table(Ref). +%% with_rdb_iterator( +%% Ref, +%% fun(I) -> +%% case i_move(I, first) of +%% {ok, First, _} -> +%% {ok, Last, _} = i_move(I, last), +%% %% Pad Last, since delete_range excludes EndKey +%% delete_range(Ref, First, <>); +%% _ -> +%% ok +%% end +%% end). + +%% N.B. rocksdb:delete_range/5 isn't implemented yet +%% delete_range(#{db_ref := DbRef, cf_handle := CfH} = R, Start, End) -> +%% rocksdb:delete_range(DbRef, CfH, Start, End, write_opts(R, [])). + fold(Tab, Fun, Acc) -> fold(Tab, Fun, Acc, [{'_', [], ['$_']}]). diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl index 4fcc452..5c4d1eb 100644 --- a/test/mnesia_rocksdb_SUITE.erl +++ b/test/mnesia_rocksdb_SUITE.erl @@ -19,6 +19,7 @@ ]). -export([ mrdb_batch/1 , mrdb_transactions/1 + , mrdb_abort_reasons/1 , mrdb_repeated_transactions/1 , mrdb_abort/1 , mrdb_two_procs/1 @@ -47,6 +48,7 @@ groups() -> , encoding_defaults ]} , {mrdb, [sequence], [ mrdb_batch , mrdb_transactions + , mrdb_abort_reasons , mrdb_repeated_transactions , mrdb_abort , mrdb_two_procs @@ -204,6 +206,45 @@ mrdb_transactions_(Config) -> delete_tabs(Created), ok. +mrdb_abort_reasons(_Config) -> + Prev = mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, true), + X = some_value, + compare_txs('throw', fun() -> throw(X) end), + compare_txs('exit' , fun() -> exit(X) end), + compare_txs('error', fun() -> error(X) end), + compare_txs('abort', fun() -> mnesia:abort(X) end), + compare_txs('abort' , fun() -> mrdb:abort(X) end), + mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, Prev), + ok. + +compare_txs(Type, F) -> + {caught, exit, {aborted, EMn}} = mnesia_tx(F), + {caught, exit, {aborted, EMr}} = mrdb_tx(F), + ct:log("Mnesia = ~p/~p", [Type, EMn]), + ct:log("Mrdb = ~p/~p", [Type, EMr]), + case {Type, EMn, EMr} of + {error, {some_value, [_|_]}, {some_value, []}} -> ok; + {throw, {throw, some_value}, {throw, some_value}} -> ok; + {exit, some_value, some_value} -> ok; + {abort, some_value, some_value} -> ok + end. + +mnesia_tx(F) -> + try + mnesia:activity(transaction, F) + catch + C:E -> + {caught, C, E} + end. + +mrdb_tx(F) -> + try + mrdb:activity(transaction, rdb, F) + catch + C:E -> + {caught, C, E} + end. + mrdb_repeated_transactions(Config) -> Created = create_tabs([{rtx, []}], Config), mrdb:insert(rtx, {rtx, a, 0}),