Add caller error store to error handling
During initialization of the backend the caller may provide a new configuration to define a target ETS table for storing errors which the backend encountered at runtime. These errors are written during handling of write errors.
This commit is contained in:
parent
49ff8bcca0
commit
63bae181e3
@ -173,19 +173,24 @@
|
|||||||
, compiled_ms
|
, compiled_ms
|
||||||
, limit
|
, limit
|
||||||
, key_only = false % TODO: not used
|
, key_only = false % TODO: not used
|
||||||
, direction = forward}). % TODO: not used
|
, direction = forward % TODO: not used
|
||||||
|
}).
|
||||||
|
|
||||||
-type on_write_error() :: debug | verbose | warning | error | fatal.
|
-type on_write_error() :: debug | verbose | warning | error | fatal.
|
||||||
|
-type on_write_error_store() :: atom() | undefined.
|
||||||
|
|
||||||
-define(WRITE_ERR_DEFAULT, verbose).
|
-define(WRITE_ERR_DEFAULT, verbose).
|
||||||
|
-define(WRITE_ERR_STORE_DEFAULT, undefined).
|
||||||
|
|
||||||
-record(st, { ets
|
-record(st, { ets
|
||||||
, ref
|
, ref
|
||||||
, alias
|
, alias
|
||||||
, tab
|
, tab
|
||||||
, type
|
, type
|
||||||
, size_warnings % integer()
|
, size_warnings :: integer()
|
||||||
, maintain_size % boolean()
|
, maintain_size :: boolean()
|
||||||
, on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error()
|
, on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error()
|
||||||
|
, on_write_error_store = ?WRITE_ERR_STORE_DEFAULT :: on_write_error_store()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% ----------------------------------------------------------------------------
|
%% ----------------------------------------------------------------------------
|
||||||
@ -342,33 +347,38 @@ prefixes(_) ->
|
|||||||
%% set is OK as ordered_set is a kind of set.
|
%% set is OK as ordered_set is a kind of set.
|
||||||
check_definition(Alias, Tab, Nodes, Props) ->
|
check_definition(Alias, Tab, Nodes, Props) ->
|
||||||
Id = {Alias, Nodes},
|
Id = {Alias, Nodes},
|
||||||
try lists:map(
|
try
|
||||||
fun({type, T} = P) ->
|
Props1 = lists:map(fun(E) -> check_definition_entry(Tab, Id, E) end, Props),
|
||||||
if T==set; T==ordered_set; T==bag ->
|
{ok, Props1}
|
||||||
P;
|
|
||||||
true ->
|
|
||||||
mnesia:abort({combine_error,
|
|
||||||
Tab,
|
|
||||||
[Id, {type,T}]})
|
|
||||||
end;
|
|
||||||
({user_properties, UPs} = P) ->
|
|
||||||
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
|
|
||||||
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
|
|
||||||
case valid_mnesia_op(OWE) of
|
|
||||||
true ->
|
|
||||||
P;
|
|
||||||
false ->
|
|
||||||
throw({error, {invalid, {on_write_error, OWE}}})
|
|
||||||
end;
|
|
||||||
(P) -> P
|
|
||||||
end, Props) of
|
|
||||||
Props1 ->
|
|
||||||
{ok, Props1}
|
|
||||||
catch
|
catch
|
||||||
throw:Error ->
|
throw:Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_definition_entry(_Tab, _Id, {type, T} = P) when T==set; T==ordered_set; T==bag ->
|
||||||
|
P;
|
||||||
|
check_definition_entry(Tab, Id, {type, T}) ->
|
||||||
|
mnesia:abort({combine_error, Tab, [Id, {type, T}]});
|
||||||
|
check_definition_entry(_Tab, _Id, {user_properties, UPs} = P) ->
|
||||||
|
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
|
||||||
|
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
|
||||||
|
OWEStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT),
|
||||||
|
case valid_mnesia_op(OWE) of
|
||||||
|
true ->
|
||||||
|
case OWEStore of
|
||||||
|
undefined ->
|
||||||
|
P;
|
||||||
|
V when is_atom(V) ->
|
||||||
|
P;
|
||||||
|
V ->
|
||||||
|
throw({error, {invalid_configuration, {on_write_error_store, V}}})
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
throw({error, {invalid_configuration, {on_write_error, OWE}}})
|
||||||
|
end;
|
||||||
|
check_definition_entry(_Tab, _Id, P) ->
|
||||||
|
P.
|
||||||
|
|
||||||
%% -> ok | {error, exists}
|
%% -> ok | {error, exists}
|
||||||
create_table(_Alias, Tab, _Props) ->
|
create_table(_Alias, Tab, _Props) ->
|
||||||
create_mountpoint(Tab).
|
create_mountpoint(Tab).
|
||||||
@ -893,6 +903,8 @@ init({Alias, Tab, Type, RdbOpts}) ->
|
|||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
try
|
try
|
||||||
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
||||||
|
OnWriteError = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
|
||||||
|
OnWriteErrorStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT),
|
||||||
St = #st{ ets = Ets
|
St = #st{ ets = Ets
|
||||||
, ref = Ref
|
, ref = Ref
|
||||||
, alias = Alias
|
, alias = Alias
|
||||||
@ -900,9 +912,10 @@ init({Alias, Tab, Type, RdbOpts}) ->
|
|||||||
, type = Type
|
, type = Type
|
||||||
, size_warnings = 0
|
, size_warnings = 0
|
||||||
, maintain_size = should_maintain_size(Tab)
|
, maintain_size = should_maintain_size(Tab)
|
||||||
|
, on_write_error = OnWriteError
|
||||||
|
, on_write_error_store = OnWriteErrorStore
|
||||||
},
|
},
|
||||||
OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error),
|
{ok, recover_size_info(St)}
|
||||||
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}
|
|
||||||
catch
|
catch
|
||||||
throw:badarg ->
|
throw:badarg ->
|
||||||
{error, write_error}
|
{error, write_error}
|
||||||
@ -1612,16 +1625,35 @@ db_delete(Ref, K, Opts, St) ->
|
|||||||
|
|
||||||
write_result(ok, _, _, _) ->
|
write_result(ok, _, _, _) ->
|
||||||
ok;
|
ok;
|
||||||
write_result(Res, Op, Args, #st{on_write_error = Rpt}) ->
|
write_result(Res, Op, Args, #st{tab = Tab, on_write_error = Rpt, on_write_error_store = OWEStore}) ->
|
||||||
RptOp = rpt_op(Rpt),
|
RptOp = rpt_op(Rpt),
|
||||||
|
maybe_store_error(OWEStore, Res, Tab, Op, Args, erlang:system_time(second)),
|
||||||
mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
|
mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
|
||||||
[Op | Args] ++ [Res]),
|
[Op | Args] ++ [Res]),
|
||||||
if Rpt == fatal; Rpt == error ->
|
if Rpt == fatal; Rpt == error ->
|
||||||
throw(badarg);
|
throw(badarg);
|
||||||
true ->
|
true ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_store_error(undefined, _, _, _, _, _) ->
|
||||||
|
ok;
|
||||||
|
maybe_store_error(Table, Err, IntTable, put, [_, K, _, _], Time) ->
|
||||||
|
insert_error(Table, IntTable, K, Err, Time);
|
||||||
|
maybe_store_error(Table, Err, IntTable, delete, [_, K, _], Time) ->
|
||||||
|
insert_error(Table, IntTable, K, Err, Time);
|
||||||
|
maybe_store_error(Table, Err, IntTable, write, [_, List, _], Time) ->
|
||||||
|
lists:map(fun
|
||||||
|
({put, K, _}) ->
|
||||||
|
insert_error(Table, IntTable, K, Err, Time);
|
||||||
|
({delete, K}) ->
|
||||||
|
insert_error(Table, IntTable, K, Err, Time)
|
||||||
|
end, List).
|
||||||
|
|
||||||
|
insert_error(Table, {Type, _, _}, K, Err, Time) ->
|
||||||
|
{_, K1} = decode_key(K),
|
||||||
|
ets:insert(Table, {{Type, K1}, Err, Time}).
|
||||||
|
|
||||||
rpt_fmt([_|T]) ->
|
rpt_fmt([_|T]) ->
|
||||||
lists:append(["~p" | [", ~p" || _ <- T]]).
|
lists:append(["~p" | [", ~p" || _ <- T]]).
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user