Add tests for error handling
Document reason for breaking out update functions
This commit is contained in:
+104
-83
@@ -165,25 +165,28 @@
|
||||
%% RECORDS
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
-record(sel, {alias, % TODO: not used
|
||||
tab,
|
||||
ref,
|
||||
keypat,
|
||||
ms, % TODO: not used
|
||||
compiled_ms,
|
||||
limit,
|
||||
key_only = false, % TODO: not used
|
||||
direction = forward}). % TODO: not used
|
||||
-record(sel, { alias % TODO: not used
|
||||
, tab
|
||||
, ref
|
||||
, keypat
|
||||
, ms % TODO: not used
|
||||
, compiled_ms
|
||||
, limit
|
||||
, key_only = false % TODO: not used
|
||||
, direction = forward}). % TODO: not used
|
||||
|
||||
-type on_write_error() :: debug | verbose | warning | error | fatal.
|
||||
-define(WRITE_ERR_DEFAULT, verbose).
|
||||
|
||||
-record(st, { ets
|
||||
, ref
|
||||
, alias
|
||||
, tab
|
||||
, type
|
||||
, size_warnings % integer()
|
||||
, maintain_size % boolean()
|
||||
, on_write_error = verbose :: debug | verbose | warning | error | fatal
|
||||
}).
|
||||
, ref
|
||||
, alias
|
||||
, tab
|
||||
, type
|
||||
, size_warnings % integer()
|
||||
, maintain_size % boolean()
|
||||
, on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error()
|
||||
}).
|
||||
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% CONVENIENCE API
|
||||
@@ -350,16 +353,12 @@ check_definition(Alias, Tab, Nodes, Props) ->
|
||||
end;
|
||||
({user_properties, UPs} = P) ->
|
||||
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
|
||||
case proplists:get_value(on_write_error, RdbOpts) of
|
||||
undefined ->
|
||||
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
|
||||
case valid_mnesia_op(OWE) of
|
||||
true ->
|
||||
P;
|
||||
OWE ->
|
||||
case valid_mnesia_op(OWE) of
|
||||
true ->
|
||||
P;
|
||||
false ->
|
||||
throw({error, {invalid, {on_write_error, OWE}}})
|
||||
end
|
||||
false ->
|
||||
throw({error, {invalid, {on_write_error, OWE}}})
|
||||
end;
|
||||
(P) -> P
|
||||
end, Props) of
|
||||
@@ -416,7 +415,9 @@ load_table_(Alias, Tab, Type, RdbOpts) ->
|
||||
%% transform_table on a rocksdb_table that has indexing.
|
||||
?dbg("ERR: table:~p already loaded pid:~p stack:~p~n",
|
||||
[Tab, _Pid, _Stack]),
|
||||
ok
|
||||
ok;
|
||||
{error, Other} ->
|
||||
mnesia:abort(Other)
|
||||
end.
|
||||
|
||||
close_table(Alias, Tab) ->
|
||||
@@ -814,12 +815,7 @@ slot_iter_set(Res, _, _, _) when element(1, Res) =/= ok ->
|
||||
'$end_of_table'.
|
||||
|
||||
update_counter(Alias, Tab, C, Val) when is_integer(Val) ->
|
||||
case call(Alias, Tab, {update_counter, C, Val}) of
|
||||
badarg ->
|
||||
mnesia:abort(badarg);
|
||||
Res ->
|
||||
Res
|
||||
end.
|
||||
call(Alias, Tab, {update_counter, C, Val}).
|
||||
|
||||
%% server-side part
|
||||
do_update_counter(C, Val, Ref, St) ->
|
||||
@@ -829,10 +825,13 @@ do_update_counter(C, Val, Ref, St) ->
|
||||
case decode_val(EncVal) of
|
||||
{_, _, Old} = Rec when is_integer(Old) ->
|
||||
Res = Old+Val,
|
||||
return(Res, db_put(Ref, Enc,
|
||||
encode_val(
|
||||
setelement(3, Rec, Res)),
|
||||
[], St));
|
||||
return_catch(
|
||||
fun() ->
|
||||
db_put(Ref, Enc,
|
||||
encode_val(
|
||||
setelement(3, Rec, Res)),
|
||||
[], St)
|
||||
end);
|
||||
_ ->
|
||||
badarg
|
||||
end;
|
||||
@@ -892,17 +891,22 @@ start_proc(Alias, Tab, Type, RdbOpts) ->
|
||||
|
||||
init({Alias, Tab, Type, RdbOpts}) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
||||
St = #st{ ets = Ets
|
||||
, ref = Ref
|
||||
, alias = Alias
|
||||
, tab = Tab
|
||||
, type = Type
|
||||
, size_warnings = 0
|
||||
, maintain_size = should_maintain_size(Tab)
|
||||
},
|
||||
OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error),
|
||||
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}.
|
||||
try
|
||||
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
||||
St = #st{ ets = Ets
|
||||
, ref = Ref
|
||||
, alias = Alias
|
||||
, tab = Tab
|
||||
, type = Type
|
||||
, size_warnings = 0
|
||||
, maintain_size = should_maintain_size(Tab)
|
||||
},
|
||||
OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error),
|
||||
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}
|
||||
catch
|
||||
throw:badarg ->
|
||||
{error, write_error}
|
||||
end.
|
||||
|
||||
do_load_table(Tab, RdbOpts) ->
|
||||
MPd = data_mountpoint(Tab),
|
||||
@@ -924,11 +928,11 @@ handle_call({write_info, Key, Value}, _From, #st{} = St) ->
|
||||
handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) ->
|
||||
{reply, do_update_counter(C, Incr, Ref, St), St};
|
||||
handle_call({insert, Key, Val}, _From, St) ->
|
||||
do_insert(Key, Val, St),
|
||||
{reply, ok, St};
|
||||
Res = do_insert(Key, Val, St),
|
||||
{reply, Res, St};
|
||||
handle_call({delete, Key}, _From, St) ->
|
||||
do_delete(Key, St),
|
||||
{reply, ok, St};
|
||||
Res = do_delete(Key, St),
|
||||
{reply, Res, St};
|
||||
handle_call(clear_table, _From, #st{ets = Ets, tab = Tab, ref = Ref} = St) ->
|
||||
MPd = data_mountpoint(Tab),
|
||||
?dbg("Attempting clear_table(~p)~n", [Tab]),
|
||||
@@ -1179,14 +1183,17 @@ size_warning(Alias, Tab) ->
|
||||
|
||||
%% server-side end of insert/3.
|
||||
do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
|
||||
do_insert_bag(Ref, K, V, false, St);
|
||||
return_catch(fun() -> do_insert_bag(Ref, K, V, false, St) end);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_insert_bag(Ref, K, V, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok;
|
||||
return_catch(
|
||||
fun() ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_insert_bag(Ref, K, V, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok
|
||||
end);
|
||||
do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) ->
|
||||
return(ok, db_put(Ref, K, V, [], St));
|
||||
return_catch(fun() -> db_put(Ref, K, V, [], St) end);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
|
||||
IsNew =
|
||||
case ?rocksdb:get(Ref, K, []) of
|
||||
@@ -1197,12 +1204,17 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
|
||||
end,
|
||||
case IsNew of
|
||||
true ->
|
||||
NewSz = read_info(size, 0, Ets) + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []),
|
||||
ets_insert_info(Ets, size, NewSz);
|
||||
return_catch(
|
||||
fun() ->
|
||||
NewSz = read_info(size, 0, Ets) + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
L = [{put, Ki, Vi}, {put, K, V}],
|
||||
write_result(mnesia_rocksdb_lib:write(Ref, L, []),
|
||||
write, [Ref, L, []], St), % may throw
|
||||
ets_insert_info(Ets, size, NewSz)
|
||||
end);
|
||||
false ->
|
||||
return(ok, db_put(Ref, K, V, [], St))
|
||||
return_catch(fun() -> db_put(Ref, K, V, [], St) end)
|
||||
end,
|
||||
ok.
|
||||
|
||||
@@ -1228,7 +1240,7 @@ do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
|
||||
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St);
|
||||
_ when TSz =:= false ->
|
||||
Key = <<K/binary, (Prev+1):?BAG_CNT>>,
|
||||
return(ok, db_put(Ref, Key, V, [], St));
|
||||
db_put(Ref, Key, V, [], St);
|
||||
_ ->
|
||||
NewSz = TSz + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
@@ -1239,23 +1251,29 @@ do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
|
||||
|
||||
%% server-side part
|
||||
do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
|
||||
do_delete_bag(byte_size(Key), Key, Ref, false, St);
|
||||
return_catch(fun() -> do_delete_bag(byte_size(Key), Key, Ref, false, St) end);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
|
||||
Sz = byte_size(Key),
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok;
|
||||
return_catch(
|
||||
fun() ->
|
||||
Sz = byte_size(Key),
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok
|
||||
end);
|
||||
do_delete(Key, #st{ref = Ref, maintain_size = false} = St) ->
|
||||
db_delete(Ref, Key, [], St);
|
||||
return_catch(fun() -> db_delete(Ref, Key, [], St) end);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of
|
||||
{ok, _} ->
|
||||
NewSz = CurSz -1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
|
||||
ets_insert_info(Ets, size, NewSz);
|
||||
return_catch(
|
||||
fun() ->
|
||||
NewSz = CurSz -1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
|
||||
ets_insert_info(Ets, size, NewSz)
|
||||
end);
|
||||
not_found ->
|
||||
false
|
||||
end.
|
||||
@@ -1575,19 +1593,22 @@ keypat_pfx(_, _) ->
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% Db wrappers
|
||||
%% ----------------------------------------------------------------------------
|
||||
return(_Res, badarg) ->
|
||||
badarg;
|
||||
return(Res, _) ->
|
||||
Res.
|
||||
|
||||
return_catch(F) when is_function(F, 0) ->
|
||||
try F()
|
||||
catch
|
||||
throw:badarg ->
|
||||
badarg
|
||||
end.
|
||||
|
||||
db_put(Ref, K, V, Opts, St) ->
|
||||
write_result(?rocksdb:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St).
|
||||
write_result(mnesia_rocksdb_lib:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St).
|
||||
|
||||
db_write(Ref, List, Opts, St) ->
|
||||
write_result(?rocksdb:write(Ref, List, Opts, St), write, [Ref, List, Opts], St).
|
||||
write_result(mnesia_rocksdb_lib:write(Ref, List, Opts), write, [Ref, List, Opts], St).
|
||||
|
||||
db_delete(Ref, K, Opts, St) ->
|
||||
write_result(?rocksdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St).
|
||||
write_result(mnesia_rocksdb_lib:delete(Ref, K, Opts), delete, [Ref, K, Opts], St).
|
||||
|
||||
write_result(ok, _, _, _) ->
|
||||
ok;
|
||||
@@ -1596,7 +1617,7 @@ write_result(Res, Op, Args, #st{on_write_error = Rpt}) ->
|
||||
mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
|
||||
[Op | Args] ++ [Res]),
|
||||
if Rpt == fatal; Rpt == error ->
|
||||
badarg;
|
||||
throw(badarg);
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
%%% @doc RocksDB update wrappers, in separate module for easy tracing and mocking.
|
||||
%%%
|
||||
-module(mnesia_rocksdb_lib).
|
||||
|
||||
-export([put/4,
|
||||
write/3,
|
||||
delete/3]).
|
||||
|
||||
|
||||
put(Ref, K, V, Opts) ->
|
||||
rocksdb:put(Ref, K, V, Opts).
|
||||
|
||||
write(Ref, L, Opts) ->
|
||||
rocksdb:write(Ref, L, Opts).
|
||||
|
||||
delete(Ref, K, Opts) ->
|
||||
rocksdb:delete(Ref, K, Opts).
|
||||
Reference in New Issue
Block a user