From b70654d0a2e965a73a03507a50038ba45c948d89 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Wed, 13 Apr 2022 14:24:53 +0200 Subject: [PATCH] Default to accessing data via the table owner process --- src/mnesia_rocksdb.erl | 62 ++++++++++++++++++++++++++++-------- src/mnesia_rocksdb_admin.erl | 21 +++++++++--- src/mrdb_select.erl | 18 +++++++++-- 3 files changed, 81 insertions(+), 20 deletions(-) diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 1a34206..1fccb72 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -189,6 +189,8 @@ table/0, table_type/0]). + +-define(TRY(Expr, Msg, St), try_or_abort(fun() -> Expr end, Msg, St)). %% ---------------------------------------------------------------------------- %% CONVENIENCE API %% ---------------------------------------------------------------------------- @@ -585,8 +587,8 @@ chunk_fun() -> %% Whereas the return type, legacy | Ref, seems odd, it's a shortcut for %% performance reasons. access_type(Tab) -> - case get_ref(Tab) of - #{semantics := bag, vsn := 1} -> legacy; + case mrdb:ensure_ref(Tab) of + #{access_type := legacy} -> legacy; R -> R#{mode => mnesia} end. @@ -597,8 +599,11 @@ delete(Alias, Tab, Key) -> R -> db_delete(R, Key, [], R) end. -first(_Alias, Tab) -> - mrdb:first(Tab). +first(Alias, Tab) -> + case access_type(Tab) of + legacy -> call(Alias, Tab, first); + R -> mrdb:first(R) + end. %% Not relevant for an ordered_set fixtable(_Alias, _Tab, _Bool) -> @@ -610,13 +615,19 @@ insert(Alias, Tab, Obj) -> R -> db_insert(R, Obj, [], []) end. -last(_Alias, Tab) -> - mrdb:last(Tab). +last(Alias, Tab) -> + case access_type(Tab) of + legacy -> call(Alias, Tab, last); + R -> mrdb:last(R) + end. %% Since we replace the key with [] in the record, we have to put it back %% into the found record. -lookup(_Alias, Tab, Key) -> - mrdb:read(Tab, Key). +lookup(Alias, Tab, Key) -> + case access_type(Tab) of + legacy -> call(Alias, Tab, {read, Key}); + R -> mrdb:read(R, Key) + end. match_delete(Alias, Tab, Pat) -> case access_type(Tab) of @@ -634,11 +645,17 @@ match_delete_(#{name := {_, index, {_,bag}}, semantics := set} = R, Pat) -> match_delete_(R, Pat) -> mrdb:match_delete(R, Pat). -next(_Alias, Tab, Key) -> - mrdb:next(Tab, Key). +next(Alias, Tab, Key) -> + case access_type(Tab) of + legacy -> call(Alias, Tab, {next, Key}); + R -> mrdb:next(R, Key) + end. -prev(_Alias, Tab, Key) -> - mrdb:prev(Tab, Key). +prev(Alias, Tab, Key) -> + case access_type(Tab) of + legacy -> call(Alias, Tab, {prev, Key}); + R -> mrdb:prev(R, Key) + end. repair_continuation(Cont, _Ms) -> Cont. @@ -795,13 +812,23 @@ handle_call({create_table, Tab, Props}, _From, end; handle_call({load_table, _LoadReason, Props}, _From, #st{alias = Alias, tab = Tab} = St) -> - ok = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), - {reply, ok, St}; + {ok, Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), + {reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}}; handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) -> mrdb:write_info(Ref, Key, Value), {reply, ok, St}; handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> {reply, mrdb:update_counter(Ref, C, Incr), St}; +handle_call({read, Key} = M, _From, #st{ref = Ref} = St) -> + {reply, ?TRY(mrdb:read(Ref, Key), M, St), St}; +handle_call(first = M, _From, #st{ref = Ref} = St) -> + {reply, ?TRY(mrdb:first(Ref), M, St), St}; +handle_call({prev, Key} = M, _From, #st{ref = Ref} = St) -> + {reply, ?TRY(mrdb:prev(Ref, Key), M, St), St}; +handle_call({next, Key} = M, _From, #st{ref = Ref} = St) -> + {reply, ?TRY(mrdb:next(Ref, Key), M, St), St}; +handle_call(last = M, _From, #st{ref = Ref} = St) -> + {reply, ?TRY(mrdb:last(Ref), M, St), St}; handle_call({insert, Obj}, _From, St) -> Res = do_insert(Obj, St), {reply, Res, St}; @@ -838,6 +865,13 @@ terminate(_Reason, _St) -> %% GEN SERVER PRIVATE %% ---------------------------------------------------------------------------- +try_or_abort(F, Req, S) -> + try F() + catch error:E:ST -> + ?log(error, "CAUGHT error:~p / ~p; Req = ~p, St = ~p", [E, ST, Req, S]), + {abort, {caught, {E, ST}}} + end. + opt_call(Alias, Tab, Req) -> ProcName = proc_name(Alias, Tab), case whereis(ProcName) of diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 5c1475d..4d11bf3 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -460,8 +460,8 @@ handle_req(Alias, {create_table, Name, Props}, Backend, St) -> handle_req(Alias, {load_table, Name, Props}, Backend, St) -> try case find_cf(Alias, Name, Backend, St) of - {ok, #{status := open}} -> - {reply, ok, St}; + {ok, #{status := open} = TRec} -> + {reply, {ok, TRec}, St}; {ok, #{status := created} = TRec} -> handle_load_table_req(Alias, Name, TRec, Backend, St); _ -> @@ -538,7 +538,7 @@ handle_load_table_req(Alias, Name, TRec, Backend, St) -> St2 = update_cf(Alias, Name, TRec2, St1), ?log(debug, "Table loaded ~p", [Name]), put_pt(Name, TRec2), - {reply, ok, St2}; + {reply, {ok, TRec2}, St2}; {error, _} = Error -> {reply, Error, St} end. @@ -1133,7 +1133,20 @@ check_version(TRec) -> check_version_and_encoding(#{} = TRec) -> Vsn = check_version(TRec), Encoding = get_encoding(Vsn, TRec), - TRec#{vsn => Vsn, encoding => Encoding}. + set_access_type(TRec#{vsn => Vsn, encoding => Encoding}). + +set_access_type(R) -> + R#{access_type => access_type_(R)}. + +access_type_(#{semantics := bag}) -> legacy; +access_type_(#{properties := #{user_properties := #{rocksdb_access_type := T}}}) -> + valid_access_type(T); +access_type_(_) -> + valid_access_type(application:get_env(mnesia_rocksdb, default_access_type, legacy)). + +valid_access_type(T) when T==legacy; T==direct -> T; +valid_access_type(T) -> + mrdb:abort({invalid_access_type, T}). %% This access function assumes that the requested user property is %% a 2-tuple. Mnesia allows user properties to be any non-empty tuple. diff --git a/src/mrdb_select.erl b/src/mrdb_select.erl index 30cb1eb..316a4f7 100644 --- a/src/mrdb_select.erl +++ b/src/mrdb_select.erl @@ -7,6 +7,8 @@ , rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit) ]). +-export([continuation_info/2]). + -import(mnesia_rocksdb_lib, [ keypos/1 , decode_key/2 , decode_val/3 @@ -46,9 +48,20 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) -> select(Cont) -> case Cont of '$end_of_table' -> '$end_of_table'; - _ -> Cont() + _ when is_function(Cont, 1) -> + Cont(cont) end. +continuation_info(Item, C) when is_atom(Item), is_function(C, 1) -> + continuation_info_(Item, C(sel)); +continuation_info(_, _) -> undefined. + +continuation_info_(ref, #sel{ref = Ref}) -> Ref; +continuation_info_(ms, #sel{ms = MS }) -> MS; +continuation_info_(limit, #sel{limit = L }) -> L; +continuation_info_(direction, #sel{direction = Dir}) -> Dir; +continuation_info_(_, _) -> undefined. + fold(Ref, Fun, Acc, MS, Limit) -> {AccKeys, F} = if is_function(Fun, 3) -> @@ -227,7 +240,8 @@ decr(infinity) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> {lists:reverse(Acc), - fun() -> + fun(sel) -> Sel; + (cont) -> mrdb:with_rdb_iterator( Ref, fun(NewI) ->