diff --git a/Makefile b/Makefile index 3dee3bf..91ff3d3 100644 --- a/Makefile +++ b/Makefile @@ -12,9 +12,13 @@ docs: check: $(REBAR3) dialyzer -test: +eunit: $(REBAR3) eunit $(suite) +ct: + $(REBAR3) ct $(suite) + +test: eunit ct conf_clean: @: diff --git a/README.md b/README.md index 86e5e09..94ae7dc 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,68 @@ This means that a prefix key identifies the start of the sequence of entries whose keys match the prefix. The backend uses this to optimize selects on prefix keys. +## Customization + +RocksDB supports a number of customization options. These can be specified +by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`, +for example: + +```erlang +mnesia:create_table(foo, [{rocksdb_copies, [node()]}, + ... + {user_properties, + [{rocksdb_opts, [{max_open_files, 1024}]}] + }]) +``` + +Consult the [RocksDB documentation](https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) +for information on configuration parameters. Also see the section below on handling write errors. + +The default configuration for tables in `mnesia_rocksdb` is: +``` +default_open_opts() -> + [ {create_if_missing, true} + , {cache_size, + list_to_integer(get_env_default("ROCKSDB_CACHE_SIZE", "32212254"))} + , {block_size, 1024} + , {max_open_files, 100} + , {write_buffer_size, + list_to_integer(get_env_default( + "ROCKSDB_WRITE_BUFFER_SIZE", "4194304"))} + , {compression, + list_to_atom(get_env_default("ROCKSDB_COMPRESSION", "true"))} + , {use_bloomfilter, true} + ]. +``` + +It is also possible, for larger databases, to produce a tuning parameter file. +This is experimental, and mostly copied from `mnesia_leveldb`. Consult the +source code in `mnesia_rocksdb_tuning.erl` and `mnesia_rocksdb_params.erl`. +Contributions are welcome. + +## Handling of errors in write operations + +The RocksDB update operations return either `ok` or `{error, any()}`. +Since the actual updates are performed after the 'point-of-no-return', +returning an `error` result will cause mnesia to behave unpredictably, since +the operations are expected to simply work. + +An `on_write_error` option can be provided, per-table, in the `rocksdb_opts` +user property (see [Customization](#customization) above). Supported values indicate at which level an error +indication should be reported. Mnesia may save reported events in RAM, and may +also print them, depending on the debug level (controlled with `mnesia:set_debug_level/1`). + +Mnesia debug levels are, in increasing detail, `none | verbose | debug | trace` +The supported values for `on_write_error` are: + + | Value | Saved at debug level | Printed at debug level | Action | + | ------- | -------------------- | ---------------------- | --------- | + | debug | unless none | verbose, debug, trace | ignore | + | verbose | unless none | verbose, debug, trace | ignore | + | warning | always | always | ignore | + | error | always | always | exception | + | fatal | always | always | core dump | + ## Caveats Avoid placing `bag` tables in RocksDB. Although they work, each write diff --git a/rebar.config b/rebar.config index 7182ca7..456a159 100644 --- a/rebar.config +++ b/rebar.config @@ -9,6 +9,7 @@ [ {test, [ - {deps, [{proper, "1.2.0"}]} + {deps, [ {proper, "1.2.0"} + , {meck, "0.8.13"}]} ]} ]}. diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index a0a83cb..278c85e 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -165,25 +165,28 @@ %% RECORDS %% ---------------------------------------------------------------------------- --record(sel, {alias, % TODO: not used - tab, - ref, - keypat, - ms, % TODO: not used - compiled_ms, - limit, - key_only = false, % TODO: not used - direction = forward}). % TODO: not used +-record(sel, { alias % TODO: not used + , tab + , ref + , keypat + , ms % TODO: not used + , compiled_ms + , limit + , key_only = false % TODO: not used + , direction = forward}). % TODO: not used + +-type on_write_error() :: debug | verbose | warning | error | fatal. +-define(WRITE_ERR_DEFAULT, verbose). -record(st, { ets - , ref - , alias - , tab - , type - , size_warnings % integer() - , maintain_size % boolean() - , on_write_error = verbose :: debug | verbose | warning | error | fatal - }). + , ref + , alias + , tab + , type + , size_warnings % integer() + , maintain_size % boolean() + , on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error() + }). %% ---------------------------------------------------------------------------- %% CONVENIENCE API @@ -350,16 +353,12 @@ check_definition(Alias, Tab, Nodes, Props) -> end; ({user_properties, UPs} = P) -> RdbOpts = proplists:get_value(rocksdb_opts, UPs, []), - case proplists:get_value(on_write_error, RdbOpts) of - undefined -> + OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT), + case valid_mnesia_op(OWE) of + true -> P; - OWE -> - case valid_mnesia_op(OWE) of - true -> - P; - false -> - throw({error, {invalid, {on_write_error, OWE}}}) - end + false -> + throw({error, {invalid, {on_write_error, OWE}}}) end; (P) -> P end, Props) of @@ -416,7 +415,9 @@ load_table_(Alias, Tab, Type, RdbOpts) -> %% transform_table on a rocksdb_table that has indexing. ?dbg("ERR: table:~p already loaded pid:~p stack:~p~n", [Tab, _Pid, _Stack]), - ok + ok; + {error, Other} -> + mnesia:abort(Other) end. close_table(Alias, Tab) -> @@ -814,12 +815,7 @@ slot_iter_set(Res, _, _, _) when element(1, Res) =/= ok -> '$end_of_table'. update_counter(Alias, Tab, C, Val) when is_integer(Val) -> - case call(Alias, Tab, {update_counter, C, Val}) of - badarg -> - mnesia:abort(badarg); - Res -> - Res - end. + call(Alias, Tab, {update_counter, C, Val}). %% server-side part do_update_counter(C, Val, Ref, St) -> @@ -829,10 +825,13 @@ do_update_counter(C, Val, Ref, St) -> case decode_val(EncVal) of {_, _, Old} = Rec when is_integer(Old) -> Res = Old+Val, - return(Res, db_put(Ref, Enc, - encode_val( - setelement(3, Rec, Res)), - [], St)); + return_catch( + fun() -> + db_put(Ref, Enc, + encode_val( + setelement(3, Rec, Res)), + [], St) + end); _ -> badarg end; @@ -892,17 +891,22 @@ start_proc(Alias, Tab, Type, RdbOpts) -> init({Alias, Tab, Type, RdbOpts}) -> process_flag(trap_exit, true), - {ok, Ref, Ets} = do_load_table(Tab, RdbOpts), - St = #st{ ets = Ets - , ref = Ref - , alias = Alias - , tab = Tab - , type = Type - , size_warnings = 0 - , maintain_size = should_maintain_size(Tab) - }, - OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error), - {ok, recover_size_info(St#st{on_write_error = OnWriteError})}. + try + {ok, Ref, Ets} = do_load_table(Tab, RdbOpts), + St = #st{ ets = Ets + , ref = Ref + , alias = Alias + , tab = Tab + , type = Type + , size_warnings = 0 + , maintain_size = should_maintain_size(Tab) + }, + OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error), + {ok, recover_size_info(St#st{on_write_error = OnWriteError})} + catch + throw:badarg -> + {error, write_error} + end. do_load_table(Tab, RdbOpts) -> MPd = data_mountpoint(Tab), @@ -924,11 +928,11 @@ handle_call({write_info, Key, Value}, _From, #st{} = St) -> handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) -> {reply, do_update_counter(C, Incr, Ref, St), St}; handle_call({insert, Key, Val}, _From, St) -> - do_insert(Key, Val, St), - {reply, ok, St}; + Res = do_insert(Key, Val, St), + {reply, Res, St}; handle_call({delete, Key}, _From, St) -> - do_delete(Key, St), - {reply, ok, St}; + Res = do_delete(Key, St), + {reply, Res, St}; handle_call(clear_table, _From, #st{ets = Ets, tab = Tab, ref = Ref} = St) -> MPd = data_mountpoint(Tab), ?dbg("Attempting clear_table(~p)~n", [Tab]), @@ -1179,14 +1183,17 @@ size_warning(Alias, Tab) -> %% server-side end of insert/3. do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false} = St) -> - do_insert_bag(Ref, K, V, false, St); + return_catch(fun() -> do_insert_bag(Ref, K, V, false, St) end); do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) -> - CurSz = read_info(size, 0, Ets), - NewSz = do_insert_bag(Ref, K, V, CurSz, St), - ets_insert_info(Ets, size, NewSz), - ok; + return_catch( + fun() -> + CurSz = read_info(size, 0, Ets), + NewSz = do_insert_bag(Ref, K, V, CurSz, St), + ets_insert_info(Ets, size, NewSz), + ok + end); do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> - return(ok, db_put(Ref, K, V, [], St)); + return_catch(fun() -> db_put(Ref, K, V, [], St) end); do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> IsNew = case ?rocksdb:get(Ref, K, []) of @@ -1197,12 +1204,17 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> end, case IsNew of true -> - NewSz = read_info(size, 0, Ets) + 1, - {Ki, Vi} = info_obj(size, NewSz), - ?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []), - ets_insert_info(Ets, size, NewSz); + return_catch( + fun() -> + NewSz = read_info(size, 0, Ets) + 1, + {Ki, Vi} = info_obj(size, NewSz), + L = [{put, Ki, Vi}, {put, K, V}], + write_result(mnesia_rocksdb_lib:write(Ref, L, []), + write, [Ref, L, []], St), % may throw + ets_insert_info(Ets, size, NewSz) + end); false -> - return(ok, db_put(Ref, K, V, [], St)) + return_catch(fun() -> db_put(Ref, K, V, [], St) end) end, ok. @@ -1228,7 +1240,7 @@ do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); _ when TSz =:= false -> Key = <>, - return(ok, db_put(Ref, Key, V, [], St)); + db_put(Ref, Key, V, [], St); _ -> NewSz = TSz + 1, {Ki, Vi} = info_obj(size, NewSz), @@ -1239,23 +1251,29 @@ do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> %% server-side part do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false} = St) -> - do_delete_bag(byte_size(Key), Key, Ref, false, St); + return_catch(fun() -> do_delete_bag(byte_size(Key), Key, Ref, false, St) end); do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) -> - Sz = byte_size(Key), - CurSz = read_info(size, 0, Ets), - NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St), - ets_insert_info(Ets, size, NewSz), - ok; + return_catch( + fun() -> + Sz = byte_size(Key), + CurSz = read_info(size, 0, Ets), + NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St), + ets_insert_info(Ets, size, NewSz), + ok + end); do_delete(Key, #st{ref = Ref, maintain_size = false} = St) -> - db_delete(Ref, Key, [], St); + return_catch(fun() -> db_delete(Ref, Key, [], St) end); do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> CurSz = read_info(size, 0, Ets), case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of {ok, _} -> - NewSz = CurSz -1, - {Ki, Vi} = info_obj(size, NewSz), - ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), - ets_insert_info(Ets, size, NewSz); + return_catch( + fun() -> + NewSz = CurSz -1, + {Ki, Vi} = info_obj(size, NewSz), + ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), + ets_insert_info(Ets, size, NewSz) + end); not_found -> false end. @@ -1575,19 +1593,22 @@ keypat_pfx(_, _) -> %% ---------------------------------------------------------------------------- %% Db wrappers %% ---------------------------------------------------------------------------- -return(_Res, badarg) -> - badarg; -return(Res, _) -> - Res. + +return_catch(F) when is_function(F, 0) -> + try F() + catch + throw:badarg -> + badarg + end. db_put(Ref, K, V, Opts, St) -> - write_result(?rocksdb:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St). + write_result(mnesia_rocksdb_lib:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St). db_write(Ref, List, Opts, St) -> - write_result(?rocksdb:write(Ref, List, Opts, St), write, [Ref, List, Opts], St). + write_result(mnesia_rocksdb_lib:write(Ref, List, Opts), write, [Ref, List, Opts], St). db_delete(Ref, K, Opts, St) -> - write_result(?rocksdb:delete(Ref, K, Opts), delete, [Ref, K, Opts], St). + write_result(mnesia_rocksdb_lib:delete(Ref, K, Opts), delete, [Ref, K, Opts], St). write_result(ok, _, _, _) -> ok; @@ -1596,7 +1617,7 @@ write_result(Res, Op, Args, #st{on_write_error = Rpt}) -> mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n", [Op | Args] ++ [Res]), if Rpt == fatal; Rpt == error -> - badarg; + throw(badarg); true -> ok end. diff --git a/src/mnesia_rocksdb_lib.erl b/src/mnesia_rocksdb_lib.erl new file mode 100644 index 0000000..0a4a463 --- /dev/null +++ b/src/mnesia_rocksdb_lib.erl @@ -0,0 +1,17 @@ +%%% @doc RocksDB update wrappers, in separate module for easy tracing and mocking. +%%% +-module(mnesia_rocksdb_lib). + +-export([put/4, + write/3, + delete/3]). + + +put(Ref, K, V, Opts) -> + rocksdb:put(Ref, K, V, Opts). + +write(Ref, L, Opts) -> + rocksdb:write(Ref, L, Opts). + +delete(Ref, K, Opts) -> + rocksdb:delete(Ref, K, Opts). diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl new file mode 100644 index 0000000..aed46cd --- /dev/null +++ b/test/mnesia_rocksdb_SUITE.erl @@ -0,0 +1,49 @@ +-module(mnesia_rocksdb_SUITE). + +-export([ + all/0 + , groups/0 + , suite/0 + , init_per_suite/1 + , end_per_suite/1 + , init_per_group/2 + , end_per_group/2 + , init_per_testcase/2 + , end_per_testcase/2 + ]). + +-export([error_handling/1]). + +-include_lib("common_test/include/ct.hrl"). + +suite() -> + []. + +all() -> + [{group, all_tests}]. + +groups() -> + [{all_tests, [sequence], [error_handling]}]. + + +error_handling(_Config) -> + mnesia_rocksdb_error_handling:run(). + + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_, Config) -> + Config. + +end_per_group(_, _Config) -> + ok. + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. diff --git a/test/mnesia_rocksdb_error_handling.erl b/test/mnesia_rocksdb_error_handling.erl new file mode 100644 index 0000000..84470a4 --- /dev/null +++ b/test/mnesia_rocksdb_error_handling.erl @@ -0,0 +1,103 @@ +-module(mnesia_rocksdb_error_handling). + +-export([run/0, + run/4]). + + +run() -> + setup(), + %% run only one test for 'fatal', to save time. + [run(Type, Op, L, MaintainSz) || MaintainSz <- [false, true], + Type <- [set, bag], + Op <- [insert, update, delete], + L <- levels()] + ++ [run(set, insert, fatal, false)]. + +run(Type, Op, Level, MaintainSz) -> + setup(), + {ok, Tab} = create_tab(Type, Level, MaintainSz), + mnesia:dirty_write({Tab, a, 1}), % pre-existing data + with_mock(Level, Op, Tab, fun() -> + try_write(Op, Type, Tab), + expect_error(Level, Tab) + end). + +levels() -> + [debug, verbose, warning, error]. + +setup() -> + mnesia:stop(), + start_mnesia(). + +create_tab(Type, Level, MaintainSz) -> + TabName = tab_name(Type, Level, MaintainSz), + UserProps = user_props(Level, MaintainSz), + {atomic, ok} = mnesia:create_table(TabName, [{rdb, [node()]}, + {user_properties, UserProps}]), + {ok, TabName}. + +tab_name(Type, Level, MaintainSz) -> + binary_to_atom(iolist_to_binary( + ["t" | [["_", atom_to_list(A)] + || A <- [?MODULE, Type, Level, MaintainSz]]]), utf8). + +user_props(Level, MaintainSz) -> + [{maintain_sz, MaintainSz}, + {rocksdb_opts, [{on_write_error, Level}]}]. + +start_mnesia() -> + mnesia_rocksdb_tlib:start_mnesia(reset), + ok. + +with_mock(Level, Op, Tab, F) -> + mnesia:subscribe(system), + mnesia:set_debug_level(debug), + meck:new(mnesia_rocksdb_lib, [passthrough]), + meck:expect(mnesia_rocksdb_lib, put, 4, {error, some_put_error}), + meck:expect(mnesia_rocksdb_lib, write, 3, {error, some_write_error}), + meck:expect(mnesia_rocksdb_lib, delete, 3, {error,some_delete_error}), + try {Level, Op, Tab, F()} of + {_, _, _, ok} -> + ok; + Other -> + io:fwrite("OTHER: ~p~n", [Other]), + ok + catch + exit:{{aborted,_},_} -> + Level = error, + ok + after + mnesia:set_debug_level(none), + mnesia:unsubscribe(system), + meck:unload(mnesia_rocksdb_lib) + end. + +try_write(insert, set, Tab) -> + mnesia:dirty_write({Tab, b, 2}); +try_write(insert, bag, Tab) -> + mnesia:dirty_write({Tab, a, 2}); +try_write(update, _, Tab) -> + mnesia:dirty_write({Tab, a, 1}); +try_write(delete, _, Tab) -> + mnesia:dirty_delete({Tab, a}). + + +expect_error(Level, Tab) -> + Tag = rpt_tag(Level), + receive + {mnesia_system_event, {mnesia_fatal, Fmt, Args, _Core}} -> + Tag = mnesia_fatal, + io:fwrite("EVENT(~p, ~p):~n ~s", [Tag, Tab, io_lib:fwrite(Fmt, Args)]), + ok; + {mnesia_system_event, {Tag, Fmt, Args}} -> + io:fwrite("EVENT(~p, ~p):~n ~s", [Tag, Tab, io_lib:fwrite(Fmt, Args)]), + ok + after 1000 -> + error({expected_error, [Level, Tab]}) + end. + +rpt_tag(fatal ) -> mnesia_fatal; +rpt_tag(error ) -> mnesia_error; +rpt_tag(warning) -> mnesia_warning; +rpt_tag(verbose) -> mnesia_info; +rpt_tag(debug ) -> mnesia_info.