From 2339232f597c70493e00b6bbdeb03bd5c4ed0c79 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Wed, 6 Apr 2022 14:52:58 +0200 Subject: [PATCH] Fix table load bug, fail properly on mnesia-ordered updates --- src/mnesia_rocksdb.erl | 49 +++++++++++++++++++--------------- src/mnesia_rocksdb_admin.erl | 51 +++++++++++++++++++++++++----------- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 80ce80e..2a77e0f 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -168,6 +168,7 @@ , alias , tab , type + , on_error }). -type data_tab() :: atom(). @@ -445,8 +446,9 @@ create_table(Alias, Tab, Props) -> {ok, Pid} = maybe_start_proc(Alias, Tab, Props), do_call(Pid, {create_table, Tab, Props}). -load_table(Alias, Tab, LoadReason, Opts) -> - call(Alias, Tab, {load_table, LoadReason, Opts}). +load_table(Alias, Tab, LoadReason, Props) -> + {ok, Pid} = maybe_start_proc(Alias, Tab, Props), + do_call(Pid, {load_table, LoadReason, Props}). close_table(Alias, Tab) -> case mnesia_rocksdb_admin:get_ref(Tab, error) of @@ -594,10 +596,6 @@ delete(Alias, Tab, Key) -> legacy -> call(Alias, Tab, {delete, Key}); R -> db_delete(R, Key, [], R) end. - %% call_if_legacy(Alias, Tab, {delete, Key}, fun() -> mrdb - %% mrdb:delete(Tab, Key). - %% %% opt_call(Alias, Tab, {delete, Key}), - %% %% ok. first(_Alias, Tab) -> mrdb:first(Tab). @@ -609,7 +607,7 @@ fixtable(_Alias, _Tab, _Bool) -> insert(Alias, Tab, Obj) -> case access_type(Tab) of legacy -> call(Alias, Tab, {insert, Obj}); - R -> db_insert(R, Obj, [], R) + R -> db_insert(R, Obj, [], []) end. last(_Alias, Tab) -> @@ -795,9 +793,9 @@ handle_call({create_table, Tab, Props}, _From, exit:{aborted, Error} -> {reply, {aborted, Error}, St} end; -handle_call({load_table, _LoadReason, _Opts}, _From, +handle_call({load_table, _LoadReason, Props}, _From, #st{alias = Alias, tab = Tab} = St) -> - ok = mnesia_rocksdb_admin:load_table(Alias, Tab), + ok = mnesia_rocksdb_admin:load_table(Alias, Tab, Props), {reply, ok, St}; handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) -> mrdb:write_info(Ref, Key, Value), @@ -869,11 +867,11 @@ do_call(P, Req) -> %% server-side end of insert/3. do_insert(Obj, #st{ref = Ref} = St) -> - return_catch(fun() -> db_insert(Ref, Obj, [], St) end). + db_insert(Ref, Obj, [], St). %% server-side part do_delete(Key, #st{ref = Ref} = St) -> - return_catch(fun() -> db_delete(Ref, Key, [], St) end). + db_delete(Ref, Key, [], St). proc_name(_Alias, Tab) -> list_to_atom("mnesia_ext_proc_" ++ mnesia_rocksdb_lib:tabname(Tab)). @@ -885,21 +883,30 @@ whereis_proc(Alias, Tab) -> %% Db wrappers %% ---------------------------------------------------------------------------- -return_catch(F) when is_function(F, 0) -> - try F() +db_insert(Ref, Obj, Opts, St) -> + try mrdb:insert(Ref, Obj, Opts) of + ok -> ok; + {error, _} = Error -> + write_error(insert, [Ref, Obj, Opts], Error, St) catch - throw:badarg -> - badarg + Cat:Exception:T -> + write_error(insert, [Ref, Obj, Opts], {Cat, Exception, T}, St) end. -db_insert(Ref, Obj, Opts, St) -> - write_result(mrdb:insert(Ref, Obj, Opts), insert, [Ref, Obj, Opts], St). - db_delete(Ref, K, Opts, St) -> - write_result(mrdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St). + try mrdb:delete(Ref, K, Opts) of + ok -> ok; + {error, _} = Error -> + write_error(delete, [Ref, K, Opts], Error, St) + catch + Cat:Exception:T -> + write_error(delete, [Ref, K, Opts], {Cat, Exception, T}, St) + end. -write_result(ok, _, _, _) -> - ok. +write_error(_Op, _Args, _Error, #st{on_error = OnErr}) when OnErr =/= fatal -> + ok; +write_error(Op, Args, Error, _) -> + mnesia:fatal("mnesia_rocksdb write_error: ~p ~p -> ~p", [Op, Args, Error]). %% ---------------------------------------------------------------------------- %% COMMON PRIVATE diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index a3b3b57..1b7db78 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -7,7 +7,7 @@ , remove_aliases/1 , create_table/3 %% (Alias, Name, Props) -> {ok, Ref} | error() , delete_table/2 %% (Alias, Name) -> ok - , load_table/2 %% (Alias, Name) -> ok + , load_table/3 %% (Alias, Name, Props) -> ok , related_resources/2 %% (Alias, Name) -> [RelatedTab] , prep_close/2 %% (Alias, Tab) -> ok , get_ref/1 %% (Name) -> Ref | abort() @@ -143,8 +143,8 @@ create_table(Alias, Name, Props) -> delete_table(Alias, Name) -> call(Alias, {delete_table, Name}). -load_table(Alias, Name) -> - call(Alias, {load_table, Name}). +load_table(Alias, Name, Props) -> + call(Alias, {load_table, Name, Props}). related_resources(Alias, Name) -> if is_atom(Name) -> @@ -463,24 +463,32 @@ handle_req(Alias, {create_table, Name, Props}, Backend, St) -> {error, _} = Error -> {reply, Error, St} end; -handle_req(Alias, {load_table, Name}, Backend, St) -> +handle_req(Alias, {load_table, Name, Props}, Backend, St) -> + ?log(debug, "load_table, ~p, ~p", [Name, Props]), + try case find_cf(Alias, Name, Backend, St) of {ok, #{status := open}} -> ?log(info, "load_table(~p) when table already loaded", [Name]), {reply, ok, St}; - {ok, TRec} -> - case create_table_from_trec(Alias, Name, TRec, Backend, St) of - {ok, TRec1, St1} -> - TRec2 = TRec1#{status => open}, - St2 = update_cf(Alias, Name, TRec2, St1), - ?log(debug, "Table loaded ~p", [Name]), - put_pt(Name, TRec2), - {reply, ok, St2}; + {ok, #{status := created} = TRec} -> + handle_load_table_req(Alias, Name, TRec, Backend, St); + _ -> + ?log(debug, "load_table (~p) without preceding create_table", [Name]), + case create_trec(Alias, Name, Props, Backend, St) of + {ok, NewCf} -> + ?log(debug, "NewCf = ~p", [NewCf]), + St1 = update_cf(Alias, Name, NewCf, St), + Backend1 = maps:get(Alias, St1#st.backends), + {ok, TRec} = find_cf(Alias, Name, Backend1, St1), + handle_load_table_req(Alias, Name, TRec, Backend1, St1); {error, _} = Error -> - {reply, Error, St} - end; - error -> - {reply, {abort, {bad_type, Name}}, St} + {reply, {abort, Error}, St} + end + end + catch + error:E:ST -> + ?log(error, "CAUGHT error:~p (Tab: ~p) / ~p", [E, Name, ST]), + {reply, {error, E}, St} end; handle_req(_Alias, {prep_close, Name}, Backend, St) -> ok_reply(do_prep_close(Name, Backend, St), St); @@ -533,6 +541,17 @@ handle_req(Alias, {migrate, Tabs0}, Backend, St) -> {reply, Error, St} end. +handle_load_table_req(Alias, Name, TRec, Backend, St) -> + case create_table_from_trec(Alias, Name, TRec, Backend, St) of + {ok, TRec1, St1} -> + TRec2 = TRec1#{status => open}, + St2 = update_cf(Alias, Name, TRec2, St1), + ?log(debug, "Table loaded ~p", [Name]), + put_pt(Name, TRec2), + {reply, ok, St2}; + {error, _} = Error -> + {reply, Error, St} + end. %% if an index table has been created or deleted, make sure the main %% ref reflects it.