Check return values on update operations.

- put(), write() and delete() wrapped, checking for non-ok returns
- Option {on_write_error, verbose | warning | error | fatal}
  added to the rocksdb_opts user property. The corresponding function
  mnesia_lib:Op(Fmt, Args) will be called if an error return is spotted.
This commit is contained in:
Ulf Wiger 2019-09-24 15:45:07 +02:00
parent ad8e7b63ca
commit c8da9ce31b

View File

@ -182,6 +182,7 @@
, type , type
, size_warnings % integer() , size_warnings % integer()
, maintain_size % boolean() , maintain_size % boolean()
, on_write_error = verbose :: verbose | warning | error | fatal
}). }).
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -338,21 +339,36 @@ prefixes(_) ->
%% set is OK as ordered_set is a kind of set. %% set is OK as ordered_set is a kind of set.
check_definition(Alias, Tab, Nodes, Props) -> check_definition(Alias, Tab, Nodes, Props) ->
Id = {Alias, Nodes}, Id = {Alias, Nodes},
Props1 = lists:map( try lists:map(
fun({type, T} = P) -> fun({type, T} = P) ->
if T==set; T==ordered_set; T==bag -> if T==set; T==ordered_set; T==bag ->
P; P;
true -> true ->
mnesia:abort({combine_error, mnesia:abort({combine_error,
Tab, Tab,
[Id, {type,T}]}) [Id, {type,T}]})
end; end;
({user_properties, _} = P) -> ({user_properties, UPs} = P) ->
%% should perhaps verify rocksdb options... RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
P; case proplists:get_value(on_write_error, RdbOpts) of
(P) -> P undefined ->
end, Props), P;
{ok, Props1}. OWE ->
case valid_mnesia_op(OWE) of
true ->
P;
false ->
throw({error, {invalid, {on_write_error, OWE}}})
end
end;
(P) -> P
end, Props) of
Props1 ->
{ok, Props1}
catch
throw:Error ->
Error
end.
%% -> ok | {error, exists} %% -> ok | {error, exists}
create_table(_Alias, Tab, _Props) -> create_table(_Alias, Tab, _Props) ->
@ -806,18 +822,17 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) ->
end. end.
%% server-side part %% server-side part
do_update_counter(C, Val, Ref) -> do_update_counter(C, Val, Ref, St) ->
Enc = encode_key(C), Enc = encode_key(C),
case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of
{ok, EncVal} -> {ok, EncVal} ->
case decode_val(EncVal) of case decode_val(EncVal) of
{_, _, Old} = Rec when is_integer(Old) -> {_, _, Old} = Rec when is_integer(Old) ->
Res = Old+Val, Res = Old+Val,
?rocksdb:put(Ref, Enc, return(Res, db_put(Ref, Enc,
encode_val( encode_val(
setelement(3, Rec, Res)), setelement(3, Rec, Res)),
[]), [], St));
Res;
_ -> _ ->
badarg badarg
end; end;
@ -879,14 +894,15 @@ init({Alias, Tab, Type, RdbOpts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts), {ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
St = #st{ ets = Ets St = #st{ ets = Ets
, ref = Ref , ref = Ref
, alias = Alias , alias = Alias
, tab = Tab , tab = Tab
, type = Type , type = Type
, size_warnings = 0 , size_warnings = 0
, maintain_size = should_maintain_size(Tab) , maintain_size = should_maintain_size(Tab)
}, },
{ok, recover_size_info(St)}. OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error),
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}.
do_load_table(Tab, RdbOpts) -> do_load_table(Tab, RdbOpts) ->
MPd = data_mountpoint(Tab), MPd = data_mountpoint(Tab),
@ -906,7 +922,7 @@ handle_call({write_info, Key, Value}, _From, #st{} = St) ->
_ = write_info_(Key, Value, St), _ = write_info_(Key, Value, St),
{reply, ok, St}; {reply, ok, St};
handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) ->
{reply, do_update_counter(C, Incr, Ref), St}; {reply, do_update_counter(C, Incr, Ref, St), St};
handle_call({insert, Key, Val}, _From, St) -> handle_call({insert, Key, Val}, _From, St) ->
do_insert(Key, Val, St), do_insert(Key, Val, St),
{reply, ok, St}; {reply, ok, St};
@ -1162,16 +1178,16 @@ size_warning(Alias, Tab) ->
gen_server:cast(ProcName, size_warning). gen_server:cast(ProcName, size_warning).
%% server-side end of insert/3. %% server-side end of insert/3.
do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false}) -> do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
do_insert_bag(Ref, K, V, false); do_insert_bag(Ref, K, V, false, St);
do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) -> do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
NewSz = do_insert_bag(Ref, K, V, CurSz), NewSz = do_insert_bag(Ref, K, V, CurSz, St),
ets_insert_info(Ets, size, NewSz), ets_insert_info(Ets, size, NewSz),
ok; ok;
do_insert(K, V, #st{ref = Ref, maintain_size = false}) -> do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) ->
?rocksdb:put(Ref, K, V, []); return(ok, db_put(Ref, K, V, [], St));
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) -> do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
IsNew = IsNew =
case ?rocksdb:get(Ref, K, []) of case ?rocksdb:get(Ref, K, []) of
{ok, _} -> {ok, _} ->
@ -1186,65 +1202,65 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) ->
?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []), ?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []),
ets_insert_info(Ets, size, NewSz); ets_insert_info(Ets, size, NewSz);
false -> false ->
?rocksdb:put(Ref, K, V, []) return(ok, db_put(Ref, K, V, [], St))
end, end,
ok. ok.
do_insert_bag(Ref, K, V, CurSz) -> do_insert_bag(Ref, K, V, CurSz, St) ->
KSz = byte_size(K), KSz = byte_size(K),
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
do_insert_bag_( do_insert_bag_(
KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz) KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St)
end). end).
%% There's a potential access pattern that would force counters to %% There's a potential access pattern that would force counters to
%% creep upwards and eventually hit the limit. This could be addressed, %% creep upwards and eventually hit the limit. This could be addressed,
%% with compaction. TODO. %% with compaction. TODO.
do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz) when Prev < ?MAX_BAG -> do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
case Res of case Res of
{ok, <<K:Sz/binary, _:?BAG_CNT>>, V} -> {ok, <<K:Sz/binary, _:?BAG_CNT>>, V} ->
%% object exists %% object exists
TSz; TSz;
{ok, <<K:Sz/binary, N:?BAG_CNT>>, _} -> {ok, <<K:Sz/binary, N:?BAG_CNT>>, _} ->
do_insert_bag_( do_insert_bag_(
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz); Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St);
_ when TSz =:= false -> _ when TSz =:= false ->
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
?rocksdb:put(Ref, Key, V, []); return(ok, db_put(Ref, Key, V, [], St));
_ -> _ ->
NewSz = TSz + 1, NewSz = TSz + 1,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
?rocksdb:write(Ref, [{put, Ki, Vi}, {put, Key, V}], []), db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St),
NewSz NewSz
end. end.
%% server-side part %% server-side part
do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false}) -> do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
do_delete_bag(byte_size(Key), Key, Ref, false); do_delete_bag(byte_size(Key), Key, Ref, false, St);
do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) -> do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
Sz = byte_size(Key), Sz = byte_size(Key),
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
NewSz = do_delete_bag(Sz, Key, Ref, CurSz), NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St),
ets_insert_info(Ets, size, NewSz), ets_insert_info(Ets, size, NewSz),
ok; ok;
do_delete(Key, #st{ref = Ref, maintain_size = false}) -> do_delete(Key, #st{ref = Ref, maintain_size = false} = St) ->
?rocksdb:delete(Ref, Key, []); db_delete(Ref, Key, [], St);
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true}) -> do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of
{ok, _} -> {ok, _} ->
NewSz = CurSz -1, NewSz = CurSz -1,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
ok = ?rocksdb:write(Ref, [{delete, Key}, {put, Ki, Vi}], []), ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
ets_insert_info(Ets, size, NewSz); ets_insert_info(Ets, size, NewSz);
not_found -> not_found ->
false false
end. end.
do_delete_bag(Sz, Key, Ref, TSz) -> do_delete_bag(Sz, Key, Ref, TSz, St) ->
Found = Found =
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
@ -1255,13 +1271,13 @@ do_delete_bag(Sz, Key, Ref, TSz) ->
{[], _} -> {[], _} ->
TSz; TSz;
{_, false} -> {_, false} ->
?rocksdb:write(Ref, [{delete, K} || K <- Found], []); db_write(Ref, [{delete, K} || K <- Found], [], St);
{_, _} -> {_, _} ->
N = length(Found), N = length(Found),
NewSz = TSz - N, NewSz = TSz - N,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
?rocksdb:write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Found]], []), [{delete, K} || K <- Found]], [], St),
NewSz NewSz
end. end.
@ -1279,21 +1295,21 @@ do_delete_bag_(Sz, K, Res, Ref, I) ->
end. end.
do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type,
maintain_size = MaintainSize}) -> maintain_size = MaintainSize} = St) ->
Fun = fun(_, Key, Acc) -> [Key|Acc] end, Fun = fun(_, Key, Acc) -> [Key|Acc] end,
Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30),
case {Keys, MaintainSize} of case {Keys, MaintainSize} of
{[], _} -> {[], _} ->
ok; ok;
{_, false} -> {_, false} ->
?rocksdb:write(Ref, [{delete, K} || K <- Keys], []), db_write(Ref, [{delete, K} || K <- Keys], [], St),
ok; ok;
{_, true} -> {_, true} ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
NewSz = max(CurSz - length(Keys), 0), NewSz = max(CurSz - length(Keys), 0),
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
?rocksdb:write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Keys]], []), [{delete, K} || K <- Keys]], [], St),
ets_insert_info(Ets, size, NewSz), ets_insert_info(Ets, size, NewSz),
ok ok
end. end.
@ -1339,8 +1355,8 @@ property(Tab, Prop, Default) ->
exit:_ -> Default exit:_ -> Default
end. end.
write_info_(Item, Val, #st{ets = Ets, ref = Ref}) -> write_info_(Item, Val, #st{ets = Ets, ref = Ref} = St) ->
rocksdb_insert_info(Ref, Item, Val), rocksdb_insert_info(Ref, Item, Val, St),
ets_insert_info(Ets, Item, Val). ets_insert_info(Ets, Item, Val).
ets_insert_info(Ets, Item, Val) -> ets_insert_info(Ets, Item, Val) ->
@ -1349,14 +1365,14 @@ ets_insert_info(Ets, Item, Val) ->
ets_delete_info(Ets, Item) -> ets_delete_info(Ets, Item) ->
ets:delete(Ets, {info, Item}). ets:delete(Ets, {info, Item}).
rocksdb_insert_info(Ref, Item, Val) -> rocksdb_insert_info(Ref, Item, Val, St) ->
EncKey = info_key(Item), EncKey = info_key(Item),
EncVal = encode_val(Val), EncVal = encode_val(Val),
?rocksdb:put(Ref, EncKey, EncVal, []). db_put(Ref, EncKey, EncVal, [], St).
rocksdb_delete_info(Ref, Item) -> rocksdb_delete_info(Ref, Item, St) ->
EncKey = info_key(Item), EncKey = info_key(Item),
?rocksdb:delete(Ref, EncKey, []). db_delete(Ref, EncKey, [], St).
info_obj(Item, Val) -> info_obj(Item, Val) ->
{info_key(Item), encode_val(Val)}. {info_key(Item), encode_val(Val)}.
@ -1364,8 +1380,8 @@ info_obj(Item, Val) ->
info_key(Item) -> info_key(Item) ->
<<?INFO_TAG, (encode_key(Item))/binary>>. <<?INFO_TAG, (encode_key(Item))/binary>>.
delete_info_(Item, #st{ets = Ets, ref = Ref}) -> delete_info_(Item, #st{ets = Ets, ref = Ref} = St) ->
rocksdb_delete_info(Ref, Item), rocksdb_delete_info(Ref, Item, St),
ets_delete_info(Ets, Item). ets_delete_info(Ets, Item).
read_info(Item, Default, Ets) -> read_info(Item, Default, Ets) ->
@ -1555,6 +1571,48 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos) when is_tuple(HeadPat) ->
keypat_pfx(_, _) -> keypat_pfx(_, _) ->
<<>>. <<>>.
%% ----------------------------------------------------------------------------
%% Db wrappers
%% ----------------------------------------------------------------------------
return(_Res, badarg) ->
badarg;
return(Res, _) ->
Res.
db_put(Ref, K, V, Opts, St) ->
write_result(?rocksdb:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St).
db_write(Ref, List, Opts, St) ->
write_result(?rocksdb:write(Ref, List, Opts, St), write, [Ref, List, Opts], St).
db_delete(Ref, K, Opts, St) ->
write_result(?rocksdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St).
write_result(ok, _, _, _) ->
ok;
write_result(Res, Op, Args, #st{on_write_error = Rpt}) ->
mnesia_lib:Rpt("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
[Op | Args] ++ [Res]),
if Rpt == fatal; Rpt == error ->
badarg;
true ->
ok
end.
rpt_fmt([_|T]) ->
lists:append(["~p" | [", ~p" || _ <- T]]).
valid_mnesia_op(Op) ->
if Op==verbose
; Op==warning
; Op==error
; Op==fatal ->
true;
true ->
false
end.
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% COMMON PRIVATE %% COMMON PRIVATE
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------