diff --git a/README.md b/README.md index 94ae7dc..0a6187b 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ # mnesia_rocksdb -A RocksDb backend for Mnesia + +A RocksDB backend for Mnesia. This permits Erlang/OTP applications to use RocksDB as a backend for mnesia tables. It is based on Klarna's `mnesia_eleveldb`. ## Prerequisites -- rocksdb + +- rocksdb (included as dependency) - Erlang/OTP 20.0 or newer (https://github.com/erlang/otp) ## Getting started @@ -76,13 +78,16 @@ Contributions are welcome. The RocksDB update operations return either `ok` or `{error, any()}`. Since the actual updates are performed after the 'point-of-no-return', -returning an `error` result will cause mnesia to behave unpredictably, since -the operations are expected to simply work. +returning an `error` result will cause mnesia to behave unpredictably, +since the operations are expected to simply work. + +### Option 1: `on_write_error` An `on_write_error` option can be provided, per-table, in the `rocksdb_opts` -user property (see [Customization](#customization) above). Supported values indicate at which level an error -indication should be reported. Mnesia may save reported events in RAM, and may -also print them, depending on the debug level (controlled with `mnesia:set_debug_level/1`). +user property (see [Customization](#customization) above). +Supported values indicate at which level an error indication should be reported. +Mnesia may save reported events in RAM, and may also print them, +depending on the debug level (controlled with `mnesia:set_debug_level/1`). Mnesia debug levels are, in increasing detail, `none | verbose | debug | trace` The supported values for `on_write_error` are: @@ -95,6 +100,22 @@ The supported values for `on_write_error` are: | error | always | always | exception | | fatal | always | always | core dump | +### Option 2: `on_write_error_store` + +An `on_write_error_store` option can be provided, per-table, in the `rocksdb_opts` +user property (see [Customization](#customization) above). +When set, the backend will use the value of the option as the name for an ETS table +which is used as storage for runtime write errors. The table must be set up outside +of the backend by the clients themselves. + +Entries to the table are in the form of a tuple `{{Table, Key}, Error, InsertedAt}` +where `Table` refers to the Mnesia table name, `Key` is the primary key being used by Mnesia, +`Error` is the error encountered by the backend, and `InsertedAt` refers to the time +the error was encountered as system time in milliseconds. + +The backend will only insert entries and otherwise not manage the table. Thus, clients +are expected to clean up the table during runtime to prevent memory leakage. + ## Caveats Avoid placing `bag` tables in RocksDB. Although they work, each write diff --git a/rebar3 b/rebar3 index a0deec9..00494f9 100755 Binary files a/rebar3 and b/rebar3 differ diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 278c85e..1411b27 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -23,7 +23,6 @@ -module(mnesia_rocksdb). - %% ---------------------------------------------------------------------------- %% BEHAVIOURS %% ---------------------------------------------------------------------------- @@ -31,7 +30,6 @@ -behaviour(mnesia_backend_type). -behaviour(gen_server). - %% ---------------------------------------------------------------------------- %% EXPORTS %% ---------------------------------------------------------------------------- @@ -64,13 +62,13 @@ %% schema level callbacks -export([semantics/2, - check_definition/4, - create_table/3, - load_table/4, - close_table/2, - sync_close_table/2, - delete_table/2, - info/3]). + check_definition/4, + create_table/3, + load_table/4, + close_table/2, + sync_close_table/2, + delete_table/2, + info/3]). %% table synch calls -export([sender_init/4, @@ -173,19 +171,24 @@ , compiled_ms , limit , key_only = false % TODO: not used - , direction = forward}). % TODO: not used + , direction = forward % TODO: not used + }). -type on_write_error() :: debug | verbose | warning | error | fatal. +-type on_write_error_store() :: atom() | undefined. + -define(WRITE_ERR_DEFAULT, verbose). +-define(WRITE_ERR_STORE_DEFAULT, undefined). -record(st, { ets , ref , alias , tab , type - , size_warnings % integer() - , maintain_size % boolean() + , size_warnings :: integer() + , maintain_size :: boolean() , on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error() + , on_write_error_store = ?WRITE_ERR_STORE_DEFAULT :: on_write_error_store() }). %% ---------------------------------------------------------------------------- @@ -209,7 +212,6 @@ register(Alias) -> default_alias() -> rocksdb_copies. - %% ---------------------------------------------------------------------------- %% DEBUG API %% ---------------------------------------------------------------------------- @@ -342,33 +344,38 @@ prefixes(_) -> %% set is OK as ordered_set is a kind of set. check_definition(Alias, Tab, Nodes, Props) -> Id = {Alias, Nodes}, - try lists:map( - fun({type, T} = P) -> - if T==set; T==ordered_set; T==bag -> - P; - true -> - mnesia:abort({combine_error, - Tab, - [Id, {type,T}]}) - end; - ({user_properties, UPs} = P) -> - RdbOpts = proplists:get_value(rocksdb_opts, UPs, []), - OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT), - case valid_mnesia_op(OWE) of - true -> - P; - false -> - throw({error, {invalid, {on_write_error, OWE}}}) - end; - (P) -> P - end, Props) of - Props1 -> - {ok, Props1} + try + Props1 = lists:map(fun(E) -> check_definition_entry(Tab, Id, E) end, Props), + {ok, Props1} catch throw:Error -> Error end. +check_definition_entry(_Tab, _Id, {type, T} = P) when T==set; T==ordered_set; T==bag -> + P; +check_definition_entry(Tab, Id, {type, T}) -> + mnesia:abort({combine_error, Tab, [Id, {type, T}]}); +check_definition_entry(_Tab, _Id, {user_properties, UPs} = P) -> + RdbOpts = proplists:get_value(rocksdb_opts, UPs, []), + OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT), + OWEStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT), + case valid_mnesia_op(OWE) of + true -> + case OWEStore of + undefined -> + P; + V when is_atom(V) -> + P; + V -> + throw({error, {invalid_configuration, {on_write_error_store, V}}}) + end; + false -> + throw({error, {invalid_configuration, {on_write_error, OWE}}}) + end; +check_definition_entry(_Tab, _Id, P) -> + P. + %% -> ok | {error, exists} create_table(_Alias, Tab, _Props) -> create_mountpoint(Tab). @@ -509,13 +516,13 @@ info(_Alias, Tab, memory) -> end; info(Alias, Tab, size) -> case retrieve_size(Alias, Tab) of - {ok, Size} -> - if Size < 10000 -> ok; - true -> size_warning(Alias, Tab) - end, - Size; - Error -> - Error + {ok, Size} -> + if Size < 10000 -> ok; + true -> size_warning(Alias, Tab) + end, + Size; + Error -> + Error end; info(_Alias, Tab, Item) -> case try_read_info(Tab, Item, undefined) of @@ -527,10 +534,10 @@ info(_Alias, Tab, Item) -> retrieve_size(_Alias, Tab) -> case try_read_info(Tab, size, 0) of - {ok, Size} -> - {ok, Size}; - Error -> - Error + {ok, Size} -> + {ok, Size}; + Error -> + Error end. try_read_info(Tab, Item, Default) -> @@ -627,10 +634,10 @@ first(Alias, Tab) -> %% PRIVATE ITERATOR i_first(I) -> case ?rocksdb:iterator_move(I, <>) of - {ok, First, _} -> - decode_key(First); - _ -> - '$end_of_table' + {ok, First, _} -> + decode_key(First); + _ -> + '$end_of_table' end. %% Not relevant for an ordered_set @@ -652,12 +659,12 @@ last(Alias, Tab) -> %% PRIVATE ITERATOR i_last(I) -> case ?rocksdb:iterator_move(I, last) of - {ok, << ?INFO_TAG, _/binary >>, _} -> - '$end_of_table'; - {ok, Last, _} -> - decode_key(Last); - _ -> - '$end_of_table' + {ok, << ?INFO_TAG, _/binary >>, _} -> + '$end_of_table'; + {ok, Last, _} -> + decode_key(Last); + _ -> + '$end_of_table' end. %% Since we replace the key with [] in the record, we have to put it back @@ -666,33 +673,34 @@ lookup(Alias, Tab, Key) -> Enc = encode_key(Key), {Ref, Type} = call(Alias, Tab, get_ref), case Type of - bag -> lookup_bag(Ref, Key, Enc, keypos(Tab)); - _ -> - case ?rocksdb:get(Ref, Enc, []) of - {ok, EncVal} -> - [setelement(keypos(Tab), decode_val(EncVal), Key)]; - _ -> - [] - end + bag -> + lookup_bag(Ref, Key, Enc, keypos(Tab)); + _ -> + case ?rocksdb:get(Ref, Enc, []) of + {ok, EncVal} -> + [setelement(keypos(Tab), decode_val(EncVal), Key)]; + _ -> + [] + end end. lookup_bag(Ref, K, Enc, KP) -> Sz = byte_size(Enc), with_iterator( Ref, fun(I) -> - lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), - K, I, KP) - end). + lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), + K, I, KP) + end). lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) -> lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP); lookup_bag_(Sz, Enc, Res, K, I, KP) -> case Res of - {ok, <>, V} -> - [setelement(KP, decode_val(V), K)| - lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; - _ -> - [] + {ok, <>, V} -> + [setelement(KP, decode_val(V), K)| + lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; + _ -> + [] end. match_delete(Alias, Tab, Pat) when is_atom(Pat) -> @@ -749,23 +757,23 @@ prev(Alias, Tab, Key0) -> %% PRIVATE ITERATOR i_prev(I, Key) -> case ?rocksdb:iterator_move(I, Key) of - {ok, _, _} -> - i_move_to_prev(I, Key); - {error, invalid_iterator} -> - i_last(I) + {ok, _, _} -> + i_move_to_prev(I, Key); + {error, invalid_iterator} -> + i_last(I) end. %% PRIVATE ITERATOR i_move_to_prev(I, Key) -> case ?rocksdb:iterator_move(I, prev) of - {ok, << ?INFO_TAG, _/binary >>, _} -> - '$end_of_table'; - {ok, Prev, _} when Prev < Key -> - decode_key(Prev); - {ok, _, _} -> - i_move_to_prev(I, Key); - _ -> - '$end_of_table' + {ok, << ?INFO_TAG, _/binary >>, _} -> + '$end_of_table'; + {ok, Prev, _} when Prev < Key -> + decode_key(Prev); + {ok, _, _} -> + i_move_to_prev(I, Key); + _ -> + '$end_of_table' end. repair_continuation(Cont, _Ms) -> @@ -821,22 +829,22 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) -> do_update_counter(C, Val, Ref, St) -> Enc = encode_key(C), case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of - {ok, EncVal} -> - case decode_val(EncVal) of - {_, _, Old} = Rec when is_integer(Old) -> - Res = Old+Val, - return_catch( + {ok, EncVal} -> + case decode_val(EncVal) of + {_, _, Old} = Rec when is_integer(Old) -> + Res = Old+Val, + return_catch( fun() -> db_put(Ref, Enc, encode_val( setelement(3, Rec, Res)), [], St) end); - _ -> - badarg - end; - _ -> - badarg + _ -> + badarg + end; + _ -> + badarg end. %% PRIVATE @@ -893,6 +901,8 @@ init({Alias, Tab, Type, RdbOpts}) -> process_flag(trap_exit, true), try {ok, Ref, Ets} = do_load_table(Tab, RdbOpts), + OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT), + OWEStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT), St = #st{ ets = Ets , ref = Ref , alias = Alias @@ -900,9 +910,10 @@ init({Alias, Tab, Type, RdbOpts}) -> , type = Type , size_warnings = 0 , maintain_size = should_maintain_size(Tab) + , on_write_error = OWE + , on_write_error_store = OWEStore }, - OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error), - {ok, recover_size_info(St#st{on_write_error = OnWriteError})} + {ok, recover_size_info(St)} catch throw:badarg -> {error, write_error} @@ -1195,15 +1206,14 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> return_catch(fun() -> db_put(Ref, K, V, [], St) end); do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> - IsNew = - case ?rocksdb:get(Ref, K, []) of - {ok, _} -> - false; - _ -> - true - end, + IsNew = case ?rocksdb:get(Ref, K, []) of + {ok, _} -> + false; + _ -> + true + end, case IsNew of - true -> + true -> return_catch( fun() -> NewSz = read_info(size, 0, Ets) + 1, @@ -1213,8 +1223,8 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> write, [Ref, L, []], St), % may throw ets_insert_info(Ets, size, NewSz) end); - false -> - return_catch(fun() -> db_put(Ref, K, V, [], St) end) + false -> + return_catch(fun() -> db_put(Ref, K, V, [], St) end) end, ok. @@ -1222,9 +1232,9 @@ do_insert_bag(Ref, K, V, CurSz, St) -> KSz = byte_size(K), with_iterator( Ref, fun(I) -> - do_insert_bag_( - KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) - end). + do_insert_bag_( + KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) + end). %% There's a potential access pattern that would force counters to @@ -1232,21 +1242,21 @@ do_insert_bag(Ref, K, V, CurSz, St) -> %% with compaction. TODO. do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> case Res of - {ok, <>, V} -> - %% object exists - TSz; - {ok, <>, _} -> - do_insert_bag_( - Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); - _ when TSz =:= false -> - Key = <>, - db_put(Ref, Key, V, [], St); - _ -> - NewSz = TSz + 1, - {Ki, Vi} = info_obj(size, NewSz), - Key = <>, - db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), - NewSz + {ok, <>, V} -> + %% object exists + TSz; + {ok, <>, _} -> + do_insert_bag_( + Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); + _ when TSz =:= false -> + Key = <>, + db_put(Ref, Key, V, [], St); + _ -> + NewSz = TSz + 1, + {Ki, Vi} = info_obj(size, NewSz), + Key = <>, + db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), + NewSz end. %% server-side part @@ -1266,7 +1276,7 @@ do_delete(Key, #st{ref = Ref, maintain_size = false} = St) -> do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> CurSz = read_info(size, 0, Ets), case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of - {ok, _} -> + {ok, _} -> return_catch( fun() -> NewSz = CurSz -1, @@ -1274,85 +1284,84 @@ do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), ets_insert_info(Ets, size, NewSz) end); - not_found -> - false + not_found -> + false end. do_delete_bag(Sz, Key, Ref, TSz, St) -> - Found = - with_iterator( - Ref, fun(I) -> - do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), - Ref, I) - end), + Found = with_iterator( + Ref, fun(I) -> + do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), + Ref, I) + end), case {Found, TSz} of - {[], _} -> - TSz; - {_, false} -> - db_write(Ref, [{delete, K} || K <- Found], [], St); - {_, _} -> - N = length(Found), - NewSz = TSz - N, - {Ki, Vi} = info_obj(size, NewSz), - db_write(Ref, [{put, Ki, Vi} | + {[], _} -> + TSz; + {_, false} -> + db_write(Ref, [{delete, K} || K <- Found], [], St); + {_, _} -> + N = length(Found), + NewSz = TSz - N, + {Ki, Vi} = info_obj(size, NewSz), + db_write(Ref, [{put, Ki, Vi} | [{delete, K} || K <- Found]], [], St), - NewSz + NewSz end. do_delete_bag_(Sz, K, Res, Ref, I) -> case Res of - {ok, K, _} -> - do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), - Ref, I); - {ok, <> = Key, _} -> - [Key | - do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), - Ref, I)]; - _ -> - [] + {ok, K, _} -> + do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), + Ref, I); + {ok, <> = Key, _} -> + [Key | + do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), + Ref, I)]; + _ -> + [] end. do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, - maintain_size = MaintainSize} = St) -> + maintain_size = MaintainSize} = St) -> Fun = fun(_, Key, Acc) -> [Key|Acc] end, Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), case {Keys, MaintainSize} of - {[], _} -> - ok; - {_, false} -> - db_write(Ref, [{delete, K} || K <- Keys], [], St), - ok; - {_, true} -> - CurSz = read_info(size, 0, Ets), - NewSz = max(CurSz - length(Keys), 0), - {Ki, Vi} = info_obj(size, NewSz), - db_write(Ref, [{put, Ki, Vi} | + {[], _} -> + ok; + {_, false} -> + db_write(Ref, [{delete, K} || K <- Keys], [], St), + ok; + {_, true} -> + CurSz = read_info(size, 0, Ets), + NewSz = max(CurSz - length(Keys), 0), + {Ki, Vi} = info_obj(size, NewSz), + db_write(Ref, [{put, Ki, Vi} | [{delete, K} || K <- Keys]], [], St), - ets_insert_info(Ets, size, NewSz), - ok + ets_insert_info(Ets, size, NewSz), + ok end. recover_size_info(#st{ ref = Ref - , tab = Tab - , type = Type - , maintain_size = MaintainSize - } = St) -> + , tab = Tab + , type = Type + , maintain_size = MaintainSize + } = St) -> %% TODO: shall_update_size_info is obsolete, remove case shall_update_size_info(Tab) of - true -> - Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, - 0, [{'_',[],['$_']}], 3), - write_info_(size, Sz, St); - false -> - case MaintainSize of - true -> - %% info initialized by rocksdb_to_ets/2 - %% TODO: if there is no stored size, recompute it - ignore; - false -> - %% size is not maintained, ensure it's marked accordingly - delete_info_(size, St) - end + true -> + Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, + 0, [{'_',[],['$_']}], 3), + write_info_(size, Sz, St); + false -> + case MaintainSize of + true -> + %% info initialized by rocksdb_to_ets/2 + %% TODO: if there is no stored size, recompute it + ignore; + false -> + %% size is not maintained, ensure it's marked accordingly + delete_info_(size, St) + end end, St. @@ -1440,15 +1449,14 @@ do_select(Ref, Tab, _Type, MS, AccKeys, Limit) when is_boolean(AccKeys) -> i_do_select(I, #sel{keypat = Pfx, compiled_ms = MS, limit = Limit} = Sel, AccKeys, Acc) -> - StartKey = - case Pfx of - <<>> -> - <>; - _ -> - Pfx - end, + StartKey = case Pfx of + <<>> -> + <>; + _ -> + Pfx + end, select_traverse(?rocksdb:iterator_move(I, StartKey), Limit, - Pfx, MS, I, Sel, AccKeys, Acc). + Pfx, MS, I, Sel, AccKeys, Acc). needs_key_only([{HP,_,Body}]) -> BodyVars = lists:flatmap(fun extract_vars/1, Body), @@ -1512,18 +1520,18 @@ map_vars([], _) -> select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel, AccKeys, Acc) -> case is_prefix(Pfx, K) of - true -> - Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), - case ets:match_spec_run([Rec], MS) of - [] -> - select_traverse( - ?rocksdb:iterator_move(I, next), Limit, Pfx, MS, - I, Sel, AccKeys, Acc); - [Match] -> + true -> + Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), + case ets:match_spec_run([Rec], MS) of + [] -> + select_traverse( + ?rocksdb:iterator_move(I, next), Limit, Pfx, MS, + I, Sel, AccKeys, Acc); + [Match] -> Acc1 = if AccKeys -> - [{K, Match}|Acc]; + [{K, Match}|Acc]; true -> - [Match|Acc] + [Match|Acc] end, traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) end; @@ -1550,22 +1558,22 @@ decr(infinity) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> {lists:reverse(Acc), fun() -> - with_iterator(Ref, - fun(NewI) -> + with_iterator(Ref, + fun(NewI) -> select_traverse(iterator_next(NewI, K), Limit, Pfx, MS, NewI, Sel, AccKeys, []) - end) + end) end}; traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). iterator_next(I, K) -> case ?rocksdb:iterator_move(I, K) of - {ok, K, _} -> - ?rocksdb:iterator_move(I, next); - Other -> - Other + {ok, K, _} -> + ?rocksdb:iterator_move(I, next); + Other -> + Other end. keypat([H|T], KeyPos) -> @@ -1612,16 +1620,37 @@ db_delete(Ref, K, Opts, St) -> write_result(ok, _, _, _) -> ok; -write_result(Res, Op, Args, #st{on_write_error = Rpt}) -> +write_result(Res, Op, Args, #st{tab = Tab, on_write_error = Rpt, on_write_error_store = OWEStore}) -> RptOp = rpt_op(Rpt), + maybe_store_error(OWEStore, Res, Tab, Op, Args, erlang:system_time(millisecond)), mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n", - [Op | Args] ++ [Res]), + [Op | Args] ++ [Res]), if Rpt == fatal; Rpt == error -> throw(badarg); true -> ok end. +maybe_store_error(undefined, _, _, _, _, _) -> + ok; +maybe_store_error(Table, Err, IntTable, put, [_, K, _, _], Time) -> + insert_error(Table, IntTable, K, Err, Time); +maybe_store_error(Table, Err, IntTable, delete, [_, K, _], Time) -> + insert_error(Table, IntTable, K, Err, Time); +maybe_store_error(Table, Err, IntTable, write, [_, List, _], Time) -> + lists:map(fun + ({put, K, _}) -> + insert_error(Table, IntTable, K, Err, Time); + ({delete, K}) -> + insert_error(Table, IntTable, K, Err, Time) + end, List). + +insert_error(Table, {Type, _, _}, K, Err, Time) -> + {_, K1} = decode_key(K), + ets:insert(Table, {{Type, K1}, Err, Time}); +insert_error(Table, Type, K, Err, Time) when is_atom(Type) -> + ets:insert(Table, {{Type, K}, Err, Time}). + rpt_fmt([_|T]) -> lists:append(["~p" | [", ~p" || _ <- T]]). @@ -1636,9 +1665,9 @@ valid_mnesia_op(Op) -> ; Op==warning ; Op==error ; Op==fatal -> - true; + true; true -> - false + false end. %% ---------------------------------------------------------------------------- diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl index aed46cd..d7315da 100644 --- a/test/mnesia_rocksdb_SUITE.erl +++ b/test/mnesia_rocksdb_SUITE.erl @@ -29,7 +29,6 @@ groups() -> error_handling(_Config) -> mnesia_rocksdb_error_handling:run(). - init_per_suite(Config) -> Config. diff --git a/test/mnesia_rocksdb_error_handling.erl b/test/mnesia_rocksdb_error_handling.erl index 84470a4..4d45037 100644 --- a/test/mnesia_rocksdb_error_handling.erl +++ b/test/mnesia_rocksdb_error_handling.erl @@ -31,6 +31,14 @@ setup() -> create_tab(Type, Level, MaintainSz) -> TabName = tab_name(Type, Level, MaintainSz), + %% create error store before the table + case ets:info(?MODULE) of + undefined -> + ?MODULE = ets:new(?MODULE, [bag, public, named_table]), + ok; + _ -> + ok + end, UserProps = user_props(Level, MaintainSz), {atomic, ok} = mnesia:create_table(TabName, [{rdb, [node()]}, {user_properties, UserProps}]), @@ -43,7 +51,8 @@ tab_name(Type, Level, MaintainSz) -> user_props(Level, MaintainSz) -> [{maintain_sz, MaintainSz}, - {rocksdb_opts, [{on_write_error, Level}]}]. + {rocksdb_opts, [ {on_write_error, Level} + , {on_write_error_store, ?MODULE} ]}]. start_mnesia() -> mnesia_rocksdb_tlib:start_mnesia(reset), @@ -94,7 +103,11 @@ expect_error(Level, Tab) -> ok after 1000 -> error({expected_error, [Level, Tab]}) - end. + + end, + %% Also verify that an error entry has been written into the error store. + 1 = ets:select_delete(?MODULE, [{{{Tab, '_'}, '_', '_'}, [], [true]}]), + ok. rpt_tag(fatal ) -> mnesia_fatal; rpt_tag(error ) -> mnesia_error;