Fix table load bug, fail properly on mnesia-ordered updates
This commit is contained in:
parent
d5dafb5b7e
commit
2339232f59
@ -168,6 +168,7 @@
|
||||
, alias
|
||||
, tab
|
||||
, type
|
||||
, on_error
|
||||
}).
|
||||
|
||||
-type data_tab() :: atom().
|
||||
@ -445,8 +446,9 @@ create_table(Alias, Tab, Props) ->
|
||||
{ok, Pid} = maybe_start_proc(Alias, Tab, Props),
|
||||
do_call(Pid, {create_table, Tab, Props}).
|
||||
|
||||
load_table(Alias, Tab, LoadReason, Opts) ->
|
||||
call(Alias, Tab, {load_table, LoadReason, Opts}).
|
||||
load_table(Alias, Tab, LoadReason, Props) ->
|
||||
{ok, Pid} = maybe_start_proc(Alias, Tab, Props),
|
||||
do_call(Pid, {load_table, LoadReason, Props}).
|
||||
|
||||
close_table(Alias, Tab) ->
|
||||
case mnesia_rocksdb_admin:get_ref(Tab, error) of
|
||||
@ -594,10 +596,6 @@ delete(Alias, Tab, Key) ->
|
||||
legacy -> call(Alias, Tab, {delete, Key});
|
||||
R -> db_delete(R, Key, [], R)
|
||||
end.
|
||||
%% call_if_legacy(Alias, Tab, {delete, Key}, fun() -> mrdb
|
||||
%% mrdb:delete(Tab, Key).
|
||||
%% %% opt_call(Alias, Tab, {delete, Key}),
|
||||
%% %% ok.
|
||||
|
||||
first(_Alias, Tab) ->
|
||||
mrdb:first(Tab).
|
||||
@ -609,7 +607,7 @@ fixtable(_Alias, _Tab, _Bool) ->
|
||||
insert(Alias, Tab, Obj) ->
|
||||
case access_type(Tab) of
|
||||
legacy -> call(Alias, Tab, {insert, Obj});
|
||||
R -> db_insert(R, Obj, [], R)
|
||||
R -> db_insert(R, Obj, [], [])
|
||||
end.
|
||||
|
||||
last(_Alias, Tab) ->
|
||||
@ -795,9 +793,9 @@ handle_call({create_table, Tab, Props}, _From,
|
||||
exit:{aborted, Error} ->
|
||||
{reply, {aborted, Error}, St}
|
||||
end;
|
||||
handle_call({load_table, _LoadReason, _Opts}, _From,
|
||||
handle_call({load_table, _LoadReason, Props}, _From,
|
||||
#st{alias = Alias, tab = Tab} = St) ->
|
||||
ok = mnesia_rocksdb_admin:load_table(Alias, Tab),
|
||||
ok = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
|
||||
{reply, ok, St};
|
||||
handle_call({write_info, Key, Value}, _From, #st{ref = Ref} = St) ->
|
||||
mrdb:write_info(Ref, Key, Value),
|
||||
@ -869,11 +867,11 @@ do_call(P, Req) ->
|
||||
|
||||
%% server-side end of insert/3.
|
||||
do_insert(Obj, #st{ref = Ref} = St) ->
|
||||
return_catch(fun() -> db_insert(Ref, Obj, [], St) end).
|
||||
db_insert(Ref, Obj, [], St).
|
||||
|
||||
%% server-side part
|
||||
do_delete(Key, #st{ref = Ref} = St) ->
|
||||
return_catch(fun() -> db_delete(Ref, Key, [], St) end).
|
||||
db_delete(Ref, Key, [], St).
|
||||
|
||||
proc_name(_Alias, Tab) ->
|
||||
list_to_atom("mnesia_ext_proc_" ++ mnesia_rocksdb_lib:tabname(Tab)).
|
||||
@ -885,21 +883,30 @@ whereis_proc(Alias, Tab) ->
|
||||
%% Db wrappers
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
return_catch(F) when is_function(F, 0) ->
|
||||
try F()
|
||||
db_insert(Ref, Obj, Opts, St) ->
|
||||
try mrdb:insert(Ref, Obj, Opts) of
|
||||
ok -> ok;
|
||||
{error, _} = Error ->
|
||||
write_error(insert, [Ref, Obj, Opts], Error, St)
|
||||
catch
|
||||
throw:badarg ->
|
||||
badarg
|
||||
Cat:Exception:T ->
|
||||
write_error(insert, [Ref, Obj, Opts], {Cat, Exception, T}, St)
|
||||
end.
|
||||
|
||||
db_insert(Ref, Obj, Opts, St) ->
|
||||
write_result(mrdb:insert(Ref, Obj, Opts), insert, [Ref, Obj, Opts], St).
|
||||
|
||||
db_delete(Ref, K, Opts, St) ->
|
||||
write_result(mrdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St).
|
||||
try mrdb:delete(Ref, K, Opts) of
|
||||
ok -> ok;
|
||||
{error, _} = Error ->
|
||||
write_error(delete, [Ref, K, Opts], Error, St)
|
||||
catch
|
||||
Cat:Exception:T ->
|
||||
write_error(delete, [Ref, K, Opts], {Cat, Exception, T}, St)
|
||||
end.
|
||||
|
||||
write_result(ok, _, _, _) ->
|
||||
ok.
|
||||
write_error(_Op, _Args, _Error, #st{on_error = OnErr}) when OnErr =/= fatal ->
|
||||
ok;
|
||||
write_error(Op, Args, Error, _) ->
|
||||
mnesia:fatal("mnesia_rocksdb write_error: ~p ~p -> ~p", [Op, Args, Error]).
|
||||
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% COMMON PRIVATE
|
||||
|
@ -7,7 +7,7 @@
|
||||
, remove_aliases/1
|
||||
, create_table/3 %% (Alias, Name, Props) -> {ok, Ref} | error()
|
||||
, delete_table/2 %% (Alias, Name) -> ok
|
||||
, load_table/2 %% (Alias, Name) -> ok
|
||||
, load_table/3 %% (Alias, Name, Props) -> ok
|
||||
, related_resources/2 %% (Alias, Name) -> [RelatedTab]
|
||||
, prep_close/2 %% (Alias, Tab) -> ok
|
||||
, get_ref/1 %% (Name) -> Ref | abort()
|
||||
@ -143,8 +143,8 @@ create_table(Alias, Name, Props) ->
|
||||
delete_table(Alias, Name) ->
|
||||
call(Alias, {delete_table, Name}).
|
||||
|
||||
load_table(Alias, Name) ->
|
||||
call(Alias, {load_table, Name}).
|
||||
load_table(Alias, Name, Props) ->
|
||||
call(Alias, {load_table, Name, Props}).
|
||||
|
||||
related_resources(Alias, Name) ->
|
||||
if is_atom(Name) ->
|
||||
@ -463,24 +463,32 @@ handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
end;
|
||||
handle_req(Alias, {load_table, Name}, Backend, St) ->
|
||||
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
|
||||
?log(debug, "load_table, ~p, ~p", [Name, Props]),
|
||||
try
|
||||
case find_cf(Alias, Name, Backend, St) of
|
||||
{ok, #{status := open}} ->
|
||||
?log(info, "load_table(~p) when table already loaded", [Name]),
|
||||
{reply, ok, St};
|
||||
{ok, TRec} ->
|
||||
case create_table_from_trec(Alias, Name, TRec, Backend, St) of
|
||||
{ok, TRec1, St1} ->
|
||||
TRec2 = TRec1#{status => open},
|
||||
St2 = update_cf(Alias, Name, TRec2, St1),
|
||||
?log(debug, "Table loaded ~p", [Name]),
|
||||
put_pt(Name, TRec2),
|
||||
{reply, ok, St2};
|
||||
{ok, #{status := created} = TRec} ->
|
||||
handle_load_table_req(Alias, Name, TRec, Backend, St);
|
||||
_ ->
|
||||
?log(debug, "load_table (~p) without preceding create_table", [Name]),
|
||||
case create_trec(Alias, Name, Props, Backend, St) of
|
||||
{ok, NewCf} ->
|
||||
?log(debug, "NewCf = ~p", [NewCf]),
|
||||
St1 = update_cf(Alias, Name, NewCf, St),
|
||||
Backend1 = maps:get(Alias, St1#st.backends),
|
||||
{ok, TRec} = find_cf(Alias, Name, Backend1, St1),
|
||||
handle_load_table_req(Alias, Name, TRec, Backend1, St1);
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
end;
|
||||
error ->
|
||||
{reply, {abort, {bad_type, Name}}, St}
|
||||
{reply, {abort, Error}, St}
|
||||
end
|
||||
end
|
||||
catch
|
||||
error:E:ST ->
|
||||
?log(error, "CAUGHT error:~p (Tab: ~p) / ~p", [E, Name, ST]),
|
||||
{reply, {error, E}, St}
|
||||
end;
|
||||
handle_req(_Alias, {prep_close, Name}, Backend, St) ->
|
||||
ok_reply(do_prep_close(Name, Backend, St), St);
|
||||
@ -533,6 +541,17 @@ handle_req(Alias, {migrate, Tabs0}, Backend, St) ->
|
||||
{reply, Error, St}
|
||||
end.
|
||||
|
||||
handle_load_table_req(Alias, Name, TRec, Backend, St) ->
|
||||
case create_table_from_trec(Alias, Name, TRec, Backend, St) of
|
||||
{ok, TRec1, St1} ->
|
||||
TRec2 = TRec1#{status => open},
|
||||
St2 = update_cf(Alias, Name, TRec2, St1),
|
||||
?log(debug, "Table loaded ~p", [Name]),
|
||||
put_pt(Name, TRec2),
|
||||
{reply, ok, St2};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
end.
|
||||
|
||||
%% if an index table has been created or deleted, make sure the main
|
||||
%% ref reflects it.
|
||||
|
Loading…
x
Reference in New Issue
Block a user