diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 089e738..2481589 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -182,6 +182,7 @@ , type , size_warnings % integer() , maintain_size % boolean() + , on_write_error = verbose :: verbose | warning | error | fatal }). %% ---------------------------------------------------------------------------- @@ -338,21 +339,36 @@ prefixes(_) -> %% set is OK as ordered_set is a kind of set. check_definition(Alias, Tab, Nodes, Props) -> Id = {Alias, Nodes}, - Props1 = lists:map( - fun({type, T} = P) -> - if T==set; T==ordered_set; T==bag -> - P; - true -> - mnesia:abort({combine_error, - Tab, - [Id, {type,T}]}) - end; - ({user_properties, _} = P) -> - %% should perhaps verify rocksdb options... - P; - (P) -> P - end, Props), - {ok, Props1}. + try lists:map( + fun({type, T} = P) -> + if T==set; T==ordered_set; T==bag -> + P; + true -> + mnesia:abort({combine_error, + Tab, + [Id, {type,T}]}) + end; + ({user_properties, UPs} = P) -> + RdbOpts = proplists:get_value(rocksdb_opts, UPs, []), + case proplists:get_value(on_write_error, RdbOpts) of + undefined -> + P; + OWE -> + case valid_mnesia_op(OWE) of + true -> + P; + false -> + throw({error, {invalid, {on_write_error, OWE}}}) + end + end; + (P) -> P + end, Props) of + Props1 -> + {ok, Props1} + catch + throw:Error -> + Error + end. %% -> ok | {error, exists} create_table(_Alias, Tab, _Props) -> @@ -806,18 +822,17 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) -> end. %% server-side part -do_update_counter(C, Val, Ref) -> +do_update_counter(C, Val, Ref, St) -> Enc = encode_key(C), case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of {ok, EncVal} -> case decode_val(EncVal) of {_, _, Old} = Rec when is_integer(Old) -> Res = Old+Val, - ?rocksdb:put(Ref, Enc, - encode_val( - setelement(3, Rec, Res)), - []), - Res; + return(Res, db_put(Ref, Enc, + encode_val( + setelement(3, Rec, Res)), + [], St)); _ -> badarg end; @@ -879,14 +894,15 @@ init({Alias, Tab, Type, RdbOpts}) -> process_flag(trap_exit, true), {ok, Ref, Ets} = do_load_table(Tab, RdbOpts), St = #st{ ets = Ets - , ref = Ref - , alias = Alias - , tab = Tab - , type = Type - , size_warnings = 0 - , maintain_size = should_maintain_size(Tab) - }, - {ok, recover_size_info(St)}. + , ref = Ref + , alias = Alias + , tab = Tab + , type = Type + , size_warnings = 0 + , maintain_size = should_maintain_size(Tab) + }, + OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error), + {ok, recover_size_info(St#st{on_write_error = OnWriteError})}. do_load_table(Tab, RdbOpts) -> MPd = data_mountpoint(Tab), @@ -906,7 +922,7 @@ handle_call({write_info, Key, Value}, _From, #st{} = St) -> _ = write_info_(Key, Value, St), {reply, ok, St}; handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> - {reply, do_update_counter(C, Incr, Ref), St}; + {reply, do_update_counter(C, Incr, Ref, St), St}; handle_call({insert, Key, Val}, _From, St) -> do_insert(Key, Val, St), {reply, ok, St}; @@ -1162,16 +1178,16 @@ size_warning(Alias, Tab) -> gen_server:cast(ProcName, size_warning). %% server-side end of insert/3. -do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false}) -> - do_insert_bag(Ref, K, V, false); -do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) -> +do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false} = St) -> + do_insert_bag(Ref, K, V, false, St); +do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) -> CurSz = read_info(size, 0, Ets), - NewSz = do_insert_bag(Ref, K, V, CurSz), + NewSz = do_insert_bag(Ref, K, V, CurSz, St), ets_insert_info(Ets, size, NewSz), ok; -do_insert(K, V, #st{ref = Ref, maintain_size = false}) -> - ?rocksdb:put(Ref, K, V, []); -do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) -> +do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> + return(ok, db_put(Ref, K, V, [], St)); +do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> IsNew = case ?rocksdb:get(Ref, K, []) of {ok, _} -> @@ -1186,65 +1202,65 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) -> ?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []), ets_insert_info(Ets, size, NewSz); false -> - ?rocksdb:put(Ref, K, V, []) + return(ok, db_put(Ref, K, V, [], St)) end, ok. -do_insert_bag(Ref, K, V, CurSz) -> +do_insert_bag(Ref, K, V, CurSz, St) -> KSz = byte_size(K), with_iterator( Ref, fun(I) -> do_insert_bag_( - KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz) + KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) end). %% There's a potential access pattern that would force counters to %% creep upwards and eventually hit the limit. This could be addressed, %% with compaction. TODO. -do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz) when Prev < ?MAX_BAG -> +do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> case Res of {ok, <>, V} -> %% object exists TSz; {ok, <>, _} -> do_insert_bag_( - Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz); + Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); _ when TSz =:= false -> Key = <>, - ?rocksdb:put(Ref, Key, V, []); + return(ok, db_put(Ref, Key, V, [], St)); _ -> NewSz = TSz + 1, {Ki, Vi} = info_obj(size, NewSz), Key = <>, - ?rocksdb:write(Ref, [{put, Ki, Vi}, {put, Key, V}], []), + db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), NewSz end. %% server-side part -do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false}) -> - do_delete_bag(byte_size(Key), Key, Ref, false); -do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) -> +do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false} = St) -> + do_delete_bag(byte_size(Key), Key, Ref, false, St); +do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) -> Sz = byte_size(Key), CurSz = read_info(size, 0, Ets), - NewSz = do_delete_bag(Sz, Key, Ref, CurSz), + NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St), ets_insert_info(Ets, size, NewSz), ok; -do_delete(Key, #st{ref = Ref, maintain_size = false}) -> - ?rocksdb:delete(Ref, Key, []); -do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true}) -> +do_delete(Key, #st{ref = Ref, maintain_size = false} = St) -> + db_delete(Ref, Key, [], St); +do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> CurSz = read_info(size, 0, Ets), case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of {ok, _} -> NewSz = CurSz -1, {Ki, Vi} = info_obj(size, NewSz), - ok = ?rocksdb:write(Ref, [{delete, Key}, {put, Ki, Vi}], []), + ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), ets_insert_info(Ets, size, NewSz); not_found -> false end. -do_delete_bag(Sz, Key, Ref, TSz) -> +do_delete_bag(Sz, Key, Ref, TSz, St) -> Found = with_iterator( Ref, fun(I) -> @@ -1255,13 +1271,13 @@ do_delete_bag(Sz, Key, Ref, TSz) -> {[], _} -> TSz; {_, false} -> - ?rocksdb:write(Ref, [{delete, K} || K <- Found], []); + db_write(Ref, [{delete, K} || K <- Found], [], St); {_, _} -> N = length(Found), NewSz = TSz - N, {Ki, Vi} = info_obj(size, NewSz), - ?rocksdb:write(Ref, [{put, Ki, Vi} | - [{delete, K} || K <- Found]], []), + db_write(Ref, [{put, Ki, Vi} | + [{delete, K} || K <- Found]], [], St), NewSz end. @@ -1279,21 +1295,21 @@ do_delete_bag_(Sz, K, Res, Ref, I) -> end. do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, - maintain_size = MaintainSize}) -> + maintain_size = MaintainSize} = St) -> Fun = fun(_, Key, Acc) -> [Key|Acc] end, Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), case {Keys, MaintainSize} of {[], _} -> ok; {_, false} -> - ?rocksdb:write(Ref, [{delete, K} || K <- Keys], []), + db_write(Ref, [{delete, K} || K <- Keys], [], St), ok; {_, true} -> CurSz = read_info(size, 0, Ets), NewSz = max(CurSz - length(Keys), 0), {Ki, Vi} = info_obj(size, NewSz), - ?rocksdb:write(Ref, [{put, Ki, Vi} | - [{delete, K} || K <- Keys]], []), + db_write(Ref, [{put, Ki, Vi} | + [{delete, K} || K <- Keys]], [], St), ets_insert_info(Ets, size, NewSz), ok end. @@ -1339,8 +1355,8 @@ property(Tab, Prop, Default) -> exit:_ -> Default end. -write_info_(Item, Val, #st{ets = Ets, ref = Ref}) -> - rocksdb_insert_info(Ref, Item, Val), +write_info_(Item, Val, #st{ets = Ets, ref = Ref} = St) -> + rocksdb_insert_info(Ref, Item, Val, St), ets_insert_info(Ets, Item, Val). ets_insert_info(Ets, Item, Val) -> @@ -1349,14 +1365,14 @@ ets_insert_info(Ets, Item, Val) -> ets_delete_info(Ets, Item) -> ets:delete(Ets, {info, Item}). -rocksdb_insert_info(Ref, Item, Val) -> +rocksdb_insert_info(Ref, Item, Val, St) -> EncKey = info_key(Item), EncVal = encode_val(Val), - ?rocksdb:put(Ref, EncKey, EncVal, []). + db_put(Ref, EncKey, EncVal, [], St). -rocksdb_delete_info(Ref, Item) -> +rocksdb_delete_info(Ref, Item, St) -> EncKey = info_key(Item), - ?rocksdb:delete(Ref, EncKey, []). + db_delete(Ref, EncKey, [], St). info_obj(Item, Val) -> {info_key(Item), encode_val(Val)}. @@ -1364,8 +1380,8 @@ info_obj(Item, Val) -> info_key(Item) -> <>. -delete_info_(Item, #st{ets = Ets, ref = Ref}) -> - rocksdb_delete_info(Ref, Item), +delete_info_(Item, #st{ets = Ets, ref = Ref} = St) -> + rocksdb_delete_info(Ref, Item, St), ets_delete_info(Ets, Item). read_info(Item, Default, Ets) -> @@ -1555,6 +1571,48 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos) when is_tuple(HeadPat) -> keypat_pfx(_, _) -> <<>>. + +%% ---------------------------------------------------------------------------- +%% Db wrappers +%% ---------------------------------------------------------------------------- +return(_Res, badarg) -> + badarg; +return(Res, _) -> + Res. + +db_put(Ref, K, V, Opts, St) -> + write_result(?rocksdb:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St). + +db_write(Ref, List, Opts, St) -> + write_result(?rocksdb:write(Ref, List, Opts, St), write, [Ref, List, Opts], St). + +db_delete(Ref, K, Opts, St) -> + write_result(?rocksdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St). + +write_result(ok, _, _, _) -> + ok; +write_result(Res, Op, Args, #st{on_write_error = Rpt}) -> + mnesia_lib:Rpt("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n", + [Op | Args] ++ [Res]), + if Rpt == fatal; Rpt == error -> + badarg; + true -> + ok + end. + +rpt_fmt([_|T]) -> + lists:append(["~p" | [", ~p" || _ <- T]]). + +valid_mnesia_op(Op) -> + if Op==verbose + ; Op==warning + ; Op==error + ; Op==fatal -> + true; + true -> + false + end. + %% ---------------------------------------------------------------------------- %% COMMON PRIVATE %% ----------------------------------------------------------------------------