Default to accessing data via the table owner process

This commit is contained in:
Ulf Wiger 2022-04-13 14:24:53 +02:00
parent 0830cecbf9
commit b70654d0a2
3 changed files with 81 additions and 20 deletions

View File

@ -189,6 +189,8 @@
table/0,
table_type/0]).
-define(TRY(Expr, Msg, St), try_or_abort(fun() -> Expr end, Msg, St)).
%% ----------------------------------------------------------------------------
%% CONVENIENCE API
%% ----------------------------------------------------------------------------
@ -585,8 +587,8 @@ chunk_fun() ->
%% Whereas the return type, legacy | Ref, seems odd, it's a shortcut for
%% performance reasons.
access_type(Tab) ->
case get_ref(Tab) of
#{semantics := bag, vsn := 1} -> legacy;
case mrdb:ensure_ref(Tab) of
#{access_type := legacy} -> legacy;
R ->
R#{mode => mnesia}
end.
@ -597,8 +599,11 @@ delete(Alias, Tab, Key) ->
R -> db_delete(R, Key, [], R)
end.
first(_Alias, Tab) ->
mrdb:first(Tab).
first(Alias, Tab) ->
case access_type(Tab) of
legacy -> call(Alias, Tab, first);
R -> mrdb:first(R)
end.
%% Not relevant for an ordered_set
fixtable(_Alias, _Tab, _Bool) ->
@ -610,13 +615,19 @@ insert(Alias, Tab, Obj) ->
R -> db_insert(R, Obj, [], [])
end.
last(_Alias, Tab) ->
mrdb:last(Tab).
last(Alias, Tab) ->
case access_type(Tab) of
legacy -> call(Alias, Tab, last);
R -> mrdb:last(R)
end.
%% Since we replace the key with [] in the record, we have to put it back
%% into the found record.
lookup(_Alias, Tab, Key) ->
mrdb:read(Tab, Key).
lookup(Alias, Tab, Key) ->
case access_type(Tab) of
legacy -> call(Alias, Tab, {read, Key});
R -> mrdb:read(R, Key)
end.
match_delete(Alias, Tab, Pat) ->
case access_type(Tab) of
@ -634,11 +645,17 @@ match_delete_(#{name := {_, index, {_,bag}}, semantics := set} = R, Pat) ->
match_delete_(R, Pat) ->
mrdb:match_delete(R, Pat).
next(_Alias, Tab, Key) ->
mrdb:next(Tab, Key).
next(Alias, Tab, Key) ->
case access_type(Tab) of
legacy -> call(Alias, Tab, {next, Key});
R -> mrdb:next(R, Key)
end.
prev(_Alias, Tab, Key) ->
mrdb:prev(Tab, Key).
prev(Alias, Tab, Key) ->
case access_type(Tab) of
legacy -> call(Alias, Tab, {prev, Key});
R -> mrdb:prev(R, Key)
end.
repair_continuation(Cont, _Ms) ->
Cont.
@ -795,13 +812,23 @@ handle_call({create_table, Tab, Props}, _From,
end;
handle_call({load_table, _LoadReason, Props}, _From,
#st{alias = Alias, tab = Tab} = St) ->
ok = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St};
{ok, Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{ref = maybe_set_ref_mode(Ref)}};
handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) ->
mrdb:write_info(Ref, Key, Value),
{reply, ok, St};
handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) ->
{reply, mrdb:update_counter(Ref, C, Incr), St};
handle_call({read, Key} = M, _From, #st{ref = Ref} = St) ->
{reply, ?TRY(mrdb:read(Ref, Key), M, St), St};
handle_call(first = M, _From, #st{ref = Ref} = St) ->
{reply, ?TRY(mrdb:first(Ref), M, St), St};
handle_call({prev, Key} = M, _From, #st{ref = Ref} = St) ->
{reply, ?TRY(mrdb:prev(Ref, Key), M, St), St};
handle_call({next, Key} = M, _From, #st{ref = Ref} = St) ->
{reply, ?TRY(mrdb:next(Ref, Key), M, St), St};
handle_call(last = M, _From, #st{ref = Ref} = St) ->
{reply, ?TRY(mrdb:last(Ref), M, St), St};
handle_call({insert, Obj}, _From, St) ->
Res = do_insert(Obj, St),
{reply, Res, St};
@ -838,6 +865,13 @@ terminate(_Reason, _St) ->
%% GEN SERVER PRIVATE
%% ----------------------------------------------------------------------------
try_or_abort(F, Req, S) ->
try F()
catch error:E:ST ->
?log(error, "CAUGHT error:~p / ~p; Req = ~p, St = ~p", [E, ST, Req, S]),
{abort, {caught, {E, ST}}}
end.
opt_call(Alias, Tab, Req) ->
ProcName = proc_name(Alias, Tab),
case whereis(ProcName) of

View File

@ -460,8 +460,8 @@ handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
try
case find_cf(Alias, Name, Backend, St) of
{ok, #{status := open}} ->
{reply, ok, St};
{ok, #{status := open} = TRec} ->
{reply, {ok, TRec}, St};
{ok, #{status := created} = TRec} ->
handle_load_table_req(Alias, Name, TRec, Backend, St);
_ ->
@ -538,7 +538,7 @@ handle_load_table_req(Alias, Name, TRec, Backend, St) ->
St2 = update_cf(Alias, Name, TRec2, St1),
?log(debug, "Table loaded ~p", [Name]),
put_pt(Name, TRec2),
{reply, ok, St2};
{reply, {ok, TRec2}, St2};
{error, _} = Error ->
{reply, Error, St}
end.
@ -1133,7 +1133,20 @@ check_version(TRec) ->
check_version_and_encoding(#{} = TRec) ->
Vsn = check_version(TRec),
Encoding = get_encoding(Vsn, TRec),
TRec#{vsn => Vsn, encoding => Encoding}.
set_access_type(TRec#{vsn => Vsn, encoding => Encoding}).
set_access_type(R) ->
R#{access_type => access_type_(R)}.
access_type_(#{semantics := bag}) -> legacy;
access_type_(#{properties := #{user_properties := #{rocksdb_access_type := T}}}) ->
valid_access_type(T);
access_type_(_) ->
valid_access_type(application:get_env(mnesia_rocksdb, default_access_type, legacy)).
valid_access_type(T) when T==legacy; T==direct -> T;
valid_access_type(T) ->
mrdb:abort({invalid_access_type, T}).
%% This access function assumes that the requested user property is
%% a 2-tuple. Mnesia allows user properties to be any non-empty tuple.

View File

@ -7,6 +7,8 @@
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-export([continuation_info/2]).
-import(mnesia_rocksdb_lib, [ keypos/1
, decode_key/2
, decode_val/3
@ -46,9 +48,20 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) ->
select(Cont) ->
case Cont of
'$end_of_table' -> '$end_of_table';
_ -> Cont()
_ when is_function(Cont, 1) ->
Cont(cont)
end.
continuation_info(Item, C) when is_atom(Item), is_function(C, 1) ->
continuation_info_(Item, C(sel));
continuation_info(_, _) -> undefined.
continuation_info_(ref, #sel{ref = Ref}) -> Ref;
continuation_info_(ms, #sel{ms = MS }) -> MS;
continuation_info_(limit, #sel{limit = L }) -> L;
continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) ->
{AccKeys, F} =
if is_function(Fun, 3) ->
@ -227,7 +240,8 @@ decr(infinity) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc),
fun() ->
fun(sel) -> Sel;
(cont) ->
mrdb:with_rdb_iterator(
Ref,
fun(NewI) ->