Merge pull request #15 from aeternity/tb-add-error-store

Add caller error store to error handling
This commit is contained in:
Tino Breddin 2019-11-21 14:37:32 +01:00 committed by GitHub
commit 8d3079ff25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 284 additions and 222 deletions

View File

@ -1,11 +1,13 @@
# mnesia_rocksdb # mnesia_rocksdb
A RocksDb backend for Mnesia
A RocksDB backend for Mnesia.
This permits Erlang/OTP applications to use RocksDB as a backend for This permits Erlang/OTP applications to use RocksDB as a backend for
mnesia tables. It is based on Klarna's `mnesia_eleveldb`. mnesia tables. It is based on Klarna's `mnesia_eleveldb`.
## Prerequisites ## Prerequisites
- rocksdb
- rocksdb (included as dependency)
- Erlang/OTP 20.0 or newer (https://github.com/erlang/otp) - Erlang/OTP 20.0 or newer (https://github.com/erlang/otp)
## Getting started ## Getting started
@ -76,13 +78,16 @@ Contributions are welcome.
The RocksDB update operations return either `ok` or `{error, any()}`. The RocksDB update operations return either `ok` or `{error, any()}`.
Since the actual updates are performed after the 'point-of-no-return', Since the actual updates are performed after the 'point-of-no-return',
returning an `error` result will cause mnesia to behave unpredictably, since returning an `error` result will cause mnesia to behave unpredictably,
the operations are expected to simply work. since the operations are expected to simply work.
### Option 1: `on_write_error`
An `on_write_error` option can be provided, per-table, in the `rocksdb_opts` An `on_write_error` option can be provided, per-table, in the `rocksdb_opts`
user property (see [Customization](#customization) above). Supported values indicate at which level an error user property (see [Customization](#customization) above).
indication should be reported. Mnesia may save reported events in RAM, and may Supported values indicate at which level an error indication should be reported.
also print them, depending on the debug level (controlled with `mnesia:set_debug_level/1`). Mnesia may save reported events in RAM, and may also print them,
depending on the debug level (controlled with `mnesia:set_debug_level/1`).
Mnesia debug levels are, in increasing detail, `none | verbose | debug | trace` Mnesia debug levels are, in increasing detail, `none | verbose | debug | trace`
The supported values for `on_write_error` are: The supported values for `on_write_error` are:
@ -95,6 +100,22 @@ The supported values for `on_write_error` are:
| error | always | always | exception | | error | always | always | exception |
| fatal | always | always | core dump | | fatal | always | always | core dump |
### Option 2: `on_write_error_store`
An `on_write_error_store` option can be provided, per-table, in the `rocksdb_opts`
user property (see [Customization](#customization) above).
When set, the backend will use the value of the option as the name for an ETS table
which is used as storage for runtime write errors. The table must be set up outside
of the backend by the clients themselves.
Entries to the table are in the form of a tuple `{{Table, Key}, Error, InsertedAt}`
where `Table` refers to the Mnesia table name, `Key` is the primary key being used by Mnesia,
`Error` is the error encountered by the backend, and `InsertedAt` refers to the time
the error was encountered as system time in milliseconds.
The backend will only insert entries and otherwise not manage the table. Thus, clients
are expected to clean up the table during runtime to prevent memory leakage.
## Caveats ## Caveats
Avoid placing `bag` tables in RocksDB. Although they work, each write Avoid placing `bag` tables in RocksDB. Although they work, each write

BIN
rebar3

Binary file not shown.

View File

@ -23,7 +23,6 @@
-module(mnesia_rocksdb). -module(mnesia_rocksdb).
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% BEHAVIOURS %% BEHAVIOURS
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -31,7 +30,6 @@
-behaviour(mnesia_backend_type). -behaviour(mnesia_backend_type).
-behaviour(gen_server). -behaviour(gen_server).
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% EXPORTS %% EXPORTS
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -64,13 +62,13 @@
%% schema level callbacks %% schema level callbacks
-export([semantics/2, -export([semantics/2,
check_definition/4, check_definition/4,
create_table/3, create_table/3,
load_table/4, load_table/4,
close_table/2, close_table/2,
sync_close_table/2, sync_close_table/2,
delete_table/2, delete_table/2,
info/3]). info/3]).
%% table synch calls %% table synch calls
-export([sender_init/4, -export([sender_init/4,
@ -173,19 +171,24 @@
, compiled_ms , compiled_ms
, limit , limit
, key_only = false % TODO: not used , key_only = false % TODO: not used
, direction = forward}). % TODO: not used , direction = forward % TODO: not used
}).
-type on_write_error() :: debug | verbose | warning | error | fatal. -type on_write_error() :: debug | verbose | warning | error | fatal.
-type on_write_error_store() :: atom() | undefined.
-define(WRITE_ERR_DEFAULT, verbose). -define(WRITE_ERR_DEFAULT, verbose).
-define(WRITE_ERR_STORE_DEFAULT, undefined).
-record(st, { ets -record(st, { ets
, ref , ref
, alias , alias
, tab , tab
, type , type
, size_warnings % integer() , size_warnings :: integer()
, maintain_size % boolean() , maintain_size :: boolean()
, on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error() , on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error()
, on_write_error_store = ?WRITE_ERR_STORE_DEFAULT :: on_write_error_store()
}). }).
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -209,7 +212,6 @@ register(Alias) ->
default_alias() -> default_alias() ->
rocksdb_copies. rocksdb_copies.
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% DEBUG API %% DEBUG API
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -342,33 +344,38 @@ prefixes(_) ->
%% set is OK as ordered_set is a kind of set. %% set is OK as ordered_set is a kind of set.
check_definition(Alias, Tab, Nodes, Props) -> check_definition(Alias, Tab, Nodes, Props) ->
Id = {Alias, Nodes}, Id = {Alias, Nodes},
try lists:map( try
fun({type, T} = P) -> Props1 = lists:map(fun(E) -> check_definition_entry(Tab, Id, E) end, Props),
if T==set; T==ordered_set; T==bag -> {ok, Props1}
P;
true ->
mnesia:abort({combine_error,
Tab,
[Id, {type,T}]})
end;
({user_properties, UPs} = P) ->
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
case valid_mnesia_op(OWE) of
true ->
P;
false ->
throw({error, {invalid, {on_write_error, OWE}}})
end;
(P) -> P
end, Props) of
Props1 ->
{ok, Props1}
catch catch
throw:Error -> throw:Error ->
Error Error
end. end.
check_definition_entry(_Tab, _Id, {type, T} = P) when T==set; T==ordered_set; T==bag ->
P;
check_definition_entry(Tab, Id, {type, T}) ->
mnesia:abort({combine_error, Tab, [Id, {type, T}]});
check_definition_entry(_Tab, _Id, {user_properties, UPs} = P) ->
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
OWEStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT),
case valid_mnesia_op(OWE) of
true ->
case OWEStore of
undefined ->
P;
V when is_atom(V) ->
P;
V ->
throw({error, {invalid_configuration, {on_write_error_store, V}}})
end;
false ->
throw({error, {invalid_configuration, {on_write_error, OWE}}})
end;
check_definition_entry(_Tab, _Id, P) ->
P.
%% -> ok | {error, exists} %% -> ok | {error, exists}
create_table(_Alias, Tab, _Props) -> create_table(_Alias, Tab, _Props) ->
create_mountpoint(Tab). create_mountpoint(Tab).
@ -509,13 +516,13 @@ info(_Alias, Tab, memory) ->
end; end;
info(Alias, Tab, size) -> info(Alias, Tab, size) ->
case retrieve_size(Alias, Tab) of case retrieve_size(Alias, Tab) of
{ok, Size} -> {ok, Size} ->
if Size < 10000 -> ok; if Size < 10000 -> ok;
true -> size_warning(Alias, Tab) true -> size_warning(Alias, Tab)
end, end,
Size; Size;
Error -> Error ->
Error Error
end; end;
info(_Alias, Tab, Item) -> info(_Alias, Tab, Item) ->
case try_read_info(Tab, Item, undefined) of case try_read_info(Tab, Item, undefined) of
@ -527,10 +534,10 @@ info(_Alias, Tab, Item) ->
retrieve_size(_Alias, Tab) -> retrieve_size(_Alias, Tab) ->
case try_read_info(Tab, size, 0) of case try_read_info(Tab, size, 0) of
{ok, Size} -> {ok, Size} ->
{ok, Size}; {ok, Size};
Error -> Error ->
Error Error
end. end.
try_read_info(Tab, Item, Default) -> try_read_info(Tab, Item, Default) ->
@ -627,10 +634,10 @@ first(Alias, Tab) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_first(I) -> i_first(I) ->
case ?rocksdb:iterator_move(I, <<?DATA_START>>) of case ?rocksdb:iterator_move(I, <<?DATA_START>>) of
{ok, First, _} -> {ok, First, _} ->
decode_key(First); decode_key(First);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
%% Not relevant for an ordered_set %% Not relevant for an ordered_set
@ -652,12 +659,12 @@ last(Alias, Tab) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_last(I) -> i_last(I) ->
case ?rocksdb:iterator_move(I, last) of case ?rocksdb:iterator_move(I, last) of
{ok, << ?INFO_TAG, _/binary >>, _} -> {ok, << ?INFO_TAG, _/binary >>, _} ->
'$end_of_table'; '$end_of_table';
{ok, Last, _} -> {ok, Last, _} ->
decode_key(Last); decode_key(Last);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
%% Since we replace the key with [] in the record, we have to put it back %% Since we replace the key with [] in the record, we have to put it back
@ -666,33 +673,34 @@ lookup(Alias, Tab, Key) ->
Enc = encode_key(Key), Enc = encode_key(Key),
{Ref, Type} = call(Alias, Tab, get_ref), {Ref, Type} = call(Alias, Tab, get_ref),
case Type of case Type of
bag -> lookup_bag(Ref, Key, Enc, keypos(Tab)); bag ->
_ -> lookup_bag(Ref, Key, Enc, keypos(Tab));
case ?rocksdb:get(Ref, Enc, []) of _ ->
{ok, EncVal} -> case ?rocksdb:get(Ref, Enc, []) of
[setelement(keypos(Tab), decode_val(EncVal), Key)]; {ok, EncVal} ->
_ -> [setelement(keypos(Tab), decode_val(EncVal), Key)];
[] _ ->
end []
end
end. end.
lookup_bag(Ref, K, Enc, KP) -> lookup_bag(Ref, K, Enc, KP) ->
Sz = byte_size(Enc), Sz = byte_size(Enc),
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc),
K, I, KP) K, I, KP)
end). end).
lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) -> lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) ->
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP); lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP);
lookup_bag_(Sz, Enc, Res, K, I, KP) -> lookup_bag_(Sz, Enc, Res, K, I, KP) ->
case Res of case Res of
{ok, <<Enc:Sz/binary, _:?BAG_CNT>>, V} -> {ok, <<Enc:Sz/binary, _:?BAG_CNT>>, V} ->
[setelement(KP, decode_val(V), K)| [setelement(KP, decode_val(V), K)|
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)];
_ -> _ ->
[] []
end. end.
match_delete(Alias, Tab, Pat) when is_atom(Pat) -> match_delete(Alias, Tab, Pat) when is_atom(Pat) ->
@ -749,23 +757,23 @@ prev(Alias, Tab, Key0) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_prev(I, Key) -> i_prev(I, Key) ->
case ?rocksdb:iterator_move(I, Key) of case ?rocksdb:iterator_move(I, Key) of
{ok, _, _} -> {ok, _, _} ->
i_move_to_prev(I, Key); i_move_to_prev(I, Key);
{error, invalid_iterator} -> {error, invalid_iterator} ->
i_last(I) i_last(I)
end. end.
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_move_to_prev(I, Key) -> i_move_to_prev(I, Key) ->
case ?rocksdb:iterator_move(I, prev) of case ?rocksdb:iterator_move(I, prev) of
{ok, << ?INFO_TAG, _/binary >>, _} -> {ok, << ?INFO_TAG, _/binary >>, _} ->
'$end_of_table'; '$end_of_table';
{ok, Prev, _} when Prev < Key -> {ok, Prev, _} when Prev < Key ->
decode_key(Prev); decode_key(Prev);
{ok, _, _} -> {ok, _, _} ->
i_move_to_prev(I, Key); i_move_to_prev(I, Key);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
repair_continuation(Cont, _Ms) -> repair_continuation(Cont, _Ms) ->
@ -821,22 +829,22 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) ->
do_update_counter(C, Val, Ref, St) -> do_update_counter(C, Val, Ref, St) ->
Enc = encode_key(C), Enc = encode_key(C),
case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of
{ok, EncVal} -> {ok, EncVal} ->
case decode_val(EncVal) of case decode_val(EncVal) of
{_, _, Old} = Rec when is_integer(Old) -> {_, _, Old} = Rec when is_integer(Old) ->
Res = Old+Val, Res = Old+Val,
return_catch( return_catch(
fun() -> fun() ->
db_put(Ref, Enc, db_put(Ref, Enc,
encode_val( encode_val(
setelement(3, Rec, Res)), setelement(3, Rec, Res)),
[], St) [], St)
end); end);
_ -> _ ->
badarg badarg
end; end;
_ -> _ ->
badarg badarg
end. end.
%% PRIVATE %% PRIVATE
@ -893,6 +901,8 @@ init({Alias, Tab, Type, RdbOpts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
try try
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts), {ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
OWEStore = proplists:get_value(on_write_error_store, RdbOpts, ?WRITE_ERR_STORE_DEFAULT),
St = #st{ ets = Ets St = #st{ ets = Ets
, ref = Ref , ref = Ref
, alias = Alias , alias = Alias
@ -900,9 +910,10 @@ init({Alias, Tab, Type, RdbOpts}) ->
, type = Type , type = Type
, size_warnings = 0 , size_warnings = 0
, maintain_size = should_maintain_size(Tab) , maintain_size = should_maintain_size(Tab)
, on_write_error = OWE
, on_write_error_store = OWEStore
}, },
OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error), {ok, recover_size_info(St)}
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}
catch catch
throw:badarg -> throw:badarg ->
{error, write_error} {error, write_error}
@ -1195,15 +1206,14 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St
do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) ->
return_catch(fun() -> db_put(Ref, K, V, [], St) end); return_catch(fun() -> db_put(Ref, K, V, [], St) end);
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
IsNew = IsNew = case ?rocksdb:get(Ref, K, []) of
case ?rocksdb:get(Ref, K, []) of {ok, _} ->
{ok, _} -> false;
false; _ ->
_ -> true
true end,
end,
case IsNew of case IsNew of
true -> true ->
return_catch( return_catch(
fun() -> fun() ->
NewSz = read_info(size, 0, Ets) + 1, NewSz = read_info(size, 0, Ets) + 1,
@ -1213,8 +1223,8 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
write, [Ref, L, []], St), % may throw write, [Ref, L, []], St), % may throw
ets_insert_info(Ets, size, NewSz) ets_insert_info(Ets, size, NewSz)
end); end);
false -> false ->
return_catch(fun() -> db_put(Ref, K, V, [], St) end) return_catch(fun() -> db_put(Ref, K, V, [], St) end)
end, end,
ok. ok.
@ -1222,9 +1232,9 @@ do_insert_bag(Ref, K, V, CurSz, St) ->
KSz = byte_size(K), KSz = byte_size(K),
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
do_insert_bag_( do_insert_bag_(
KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St)
end). end).
%% There's a potential access pattern that would force counters to %% There's a potential access pattern that would force counters to
@ -1232,21 +1242,21 @@ do_insert_bag(Ref, K, V, CurSz, St) ->
%% with compaction. TODO. %% with compaction. TODO.
do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
case Res of case Res of
{ok, <<K:Sz/binary, _:?BAG_CNT>>, V} -> {ok, <<K:Sz/binary, _:?BAG_CNT>>, V} ->
%% object exists %% object exists
TSz; TSz;
{ok, <<K:Sz/binary, N:?BAG_CNT>>, _} -> {ok, <<K:Sz/binary, N:?BAG_CNT>>, _} ->
do_insert_bag_( do_insert_bag_(
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St);
_ when TSz =:= false -> _ when TSz =:= false ->
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
db_put(Ref, Key, V, [], St); db_put(Ref, Key, V, [], St);
_ -> _ ->
NewSz = TSz + 1, NewSz = TSz + 1,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St),
NewSz NewSz
end. end.
%% server-side part %% server-side part
@ -1266,7 +1276,7 @@ do_delete(Key, #st{ref = Ref, maintain_size = false} = St) ->
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of
{ok, _} -> {ok, _} ->
return_catch( return_catch(
fun() -> fun() ->
NewSz = CurSz -1, NewSz = CurSz -1,
@ -1274,85 +1284,84 @@ do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
ets_insert_info(Ets, size, NewSz) ets_insert_info(Ets, size, NewSz)
end); end);
not_found -> not_found ->
false false
end. end.
do_delete_bag(Sz, Key, Ref, TSz, St) -> do_delete_bag(Sz, Key, Ref, TSz, St) ->
Found = Found = with_iterator(
with_iterator( Ref, fun(I) ->
Ref, fun(I) -> do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key),
do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), Ref, I)
Ref, I) end),
end),
case {Found, TSz} of case {Found, TSz} of
{[], _} -> {[], _} ->
TSz; TSz;
{_, false} -> {_, false} ->
db_write(Ref, [{delete, K} || K <- Found], [], St); db_write(Ref, [{delete, K} || K <- Found], [], St);
{_, _} -> {_, _} ->
N = length(Found), N = length(Found),
NewSz = TSz - N, NewSz = TSz - N,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
db_write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Found]], [], St), [{delete, K} || K <- Found]], [], St),
NewSz NewSz
end. end.
do_delete_bag_(Sz, K, Res, Ref, I) -> do_delete_bag_(Sz, K, Res, Ref, I) ->
case Res of case Res of
{ok, K, _} -> {ok, K, _} ->
do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next),
Ref, I); Ref, I);
{ok, <<K:Sz/binary, _:?BAG_CNT>> = Key, _} -> {ok, <<K:Sz/binary, _:?BAG_CNT>> = Key, _} ->
[Key | [Key |
do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next),
Ref, I)]; Ref, I)];
_ -> _ ->
[] []
end. end.
do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type,
maintain_size = MaintainSize} = St) -> maintain_size = MaintainSize} = St) ->
Fun = fun(_, Key, Acc) -> [Key|Acc] end, Fun = fun(_, Key, Acc) -> [Key|Acc] end,
Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30),
case {Keys, MaintainSize} of case {Keys, MaintainSize} of
{[], _} -> {[], _} ->
ok; ok;
{_, false} -> {_, false} ->
db_write(Ref, [{delete, K} || K <- Keys], [], St), db_write(Ref, [{delete, K} || K <- Keys], [], St),
ok; ok;
{_, true} -> {_, true} ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
NewSz = max(CurSz - length(Keys), 0), NewSz = max(CurSz - length(Keys), 0),
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
db_write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Keys]], [], St), [{delete, K} || K <- Keys]], [], St),
ets_insert_info(Ets, size, NewSz), ets_insert_info(Ets, size, NewSz),
ok ok
end. end.
recover_size_info(#st{ ref = Ref recover_size_info(#st{ ref = Ref
, tab = Tab , tab = Tab
, type = Type , type = Type
, maintain_size = MaintainSize , maintain_size = MaintainSize
} = St) -> } = St) ->
%% TODO: shall_update_size_info is obsolete, remove %% TODO: shall_update_size_info is obsolete, remove
case shall_update_size_info(Tab) of case shall_update_size_info(Tab) of
true -> true ->
Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end,
0, [{'_',[],['$_']}], 3), 0, [{'_',[],['$_']}], 3),
write_info_(size, Sz, St); write_info_(size, Sz, St);
false -> false ->
case MaintainSize of case MaintainSize of
true -> true ->
%% info initialized by rocksdb_to_ets/2 %% info initialized by rocksdb_to_ets/2
%% TODO: if there is no stored size, recompute it %% TODO: if there is no stored size, recompute it
ignore; ignore;
false -> false ->
%% size is not maintained, ensure it's marked accordingly %% size is not maintained, ensure it's marked accordingly
delete_info_(size, St) delete_info_(size, St)
end end
end, end,
St. St.
@ -1440,15 +1449,14 @@ do_select(Ref, Tab, _Type, MS, AccKeys, Limit) when is_boolean(AccKeys) ->
i_do_select(I, #sel{keypat = Pfx, i_do_select(I, #sel{keypat = Pfx,
compiled_ms = MS, compiled_ms = MS,
limit = Limit} = Sel, AccKeys, Acc) -> limit = Limit} = Sel, AccKeys, Acc) ->
StartKey = StartKey = case Pfx of
case Pfx of <<>> ->
<<>> -> <<?DATA_START>>;
<<?DATA_START>>; _ ->
_ -> Pfx
Pfx end,
end,
select_traverse(?rocksdb:iterator_move(I, StartKey), Limit, select_traverse(?rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc). Pfx, MS, I, Sel, AccKeys, Acc).
needs_key_only([{HP,_,Body}]) -> needs_key_only([{HP,_,Body}]) ->
BodyVars = lists:flatmap(fun extract_vars/1, Body), BodyVars = lists:flatmap(fun extract_vars/1, Body),
@ -1512,18 +1520,18 @@ map_vars([], _) ->
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel, select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel,
AccKeys, Acc) -> AccKeys, Acc) ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
true -> true ->
Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)),
case ets:match_spec_run([Rec], MS) of case ets:match_spec_run([Rec], MS) of
[] -> [] ->
select_traverse( select_traverse(
?rocksdb:iterator_move(I, next), Limit, Pfx, MS, ?rocksdb:iterator_move(I, next), Limit, Pfx, MS,
I, Sel, AccKeys, Acc); I, Sel, AccKeys, Acc);
[Match] -> [Match] ->
Acc1 = if AccKeys -> Acc1 = if AccKeys ->
[{K, Match}|Acc]; [{K, Match}|Acc];
true -> true ->
[Match|Acc] [Match|Acc]
end, end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end; end;
@ -1550,22 +1558,22 @@ decr(infinity) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc), {lists:reverse(Acc),
fun() -> fun() ->
with_iterator(Ref, with_iterator(Ref,
fun(NewI) -> fun(NewI) ->
select_traverse(iterator_next(NewI, K), select_traverse(iterator_next(NewI, K),
Limit, Pfx, MS, NewI, Sel, Limit, Pfx, MS, NewI, Sel,
AccKeys, []) AccKeys, [])
end) end)
end}; end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) -> iterator_next(I, K) ->
case ?rocksdb:iterator_move(I, K) of case ?rocksdb:iterator_move(I, K) of
{ok, K, _} -> {ok, K, _} ->
?rocksdb:iterator_move(I, next); ?rocksdb:iterator_move(I, next);
Other -> Other ->
Other Other
end. end.
keypat([H|T], KeyPos) -> keypat([H|T], KeyPos) ->
@ -1612,16 +1620,37 @@ db_delete(Ref, K, Opts, St) ->
write_result(ok, _, _, _) -> write_result(ok, _, _, _) ->
ok; ok;
write_result(Res, Op, Args, #st{on_write_error = Rpt}) -> write_result(Res, Op, Args, #st{tab = Tab, on_write_error = Rpt, on_write_error_store = OWEStore}) ->
RptOp = rpt_op(Rpt), RptOp = rpt_op(Rpt),
maybe_store_error(OWEStore, Res, Tab, Op, Args, erlang:system_time(millisecond)),
mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n", mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
[Op | Args] ++ [Res]), [Op | Args] ++ [Res]),
if Rpt == fatal; Rpt == error -> if Rpt == fatal; Rpt == error ->
throw(badarg); throw(badarg);
true -> true ->
ok ok
end. end.
maybe_store_error(undefined, _, _, _, _, _) ->
ok;
maybe_store_error(Table, Err, IntTable, put, [_, K, _, _], Time) ->
insert_error(Table, IntTable, K, Err, Time);
maybe_store_error(Table, Err, IntTable, delete, [_, K, _], Time) ->
insert_error(Table, IntTable, K, Err, Time);
maybe_store_error(Table, Err, IntTable, write, [_, List, _], Time) ->
lists:map(fun
({put, K, _}) ->
insert_error(Table, IntTable, K, Err, Time);
({delete, K}) ->
insert_error(Table, IntTable, K, Err, Time)
end, List).
insert_error(Table, {Type, _, _}, K, Err, Time) ->
{_, K1} = decode_key(K),
ets:insert(Table, {{Type, K1}, Err, Time});
insert_error(Table, Type, K, Err, Time) when is_atom(Type) ->
ets:insert(Table, {{Type, K}, Err, Time}).
rpt_fmt([_|T]) -> rpt_fmt([_|T]) ->
lists:append(["~p" | [", ~p" || _ <- T]]). lists:append(["~p" | [", ~p" || _ <- T]]).
@ -1636,9 +1665,9 @@ valid_mnesia_op(Op) ->
; Op==warning ; Op==warning
; Op==error ; Op==error
; Op==fatal -> ; Op==fatal ->
true; true;
true -> true ->
false false
end. end.
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------

View File

@ -29,7 +29,6 @@ groups() ->
error_handling(_Config) -> error_handling(_Config) ->
mnesia_rocksdb_error_handling:run(). mnesia_rocksdb_error_handling:run().
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Config.

View File

@ -31,6 +31,14 @@ setup() ->
create_tab(Type, Level, MaintainSz) -> create_tab(Type, Level, MaintainSz) ->
TabName = tab_name(Type, Level, MaintainSz), TabName = tab_name(Type, Level, MaintainSz),
%% create error store before the table
case ets:info(?MODULE) of
undefined ->
?MODULE = ets:new(?MODULE, [bag, public, named_table]),
ok;
_ ->
ok
end,
UserProps = user_props(Level, MaintainSz), UserProps = user_props(Level, MaintainSz),
{atomic, ok} = mnesia:create_table(TabName, [{rdb, [node()]}, {atomic, ok} = mnesia:create_table(TabName, [{rdb, [node()]},
{user_properties, UserProps}]), {user_properties, UserProps}]),
@ -43,7 +51,8 @@ tab_name(Type, Level, MaintainSz) ->
user_props(Level, MaintainSz) -> user_props(Level, MaintainSz) ->
[{maintain_sz, MaintainSz}, [{maintain_sz, MaintainSz},
{rocksdb_opts, [{on_write_error, Level}]}]. {rocksdb_opts, [ {on_write_error, Level}
, {on_write_error_store, ?MODULE} ]}].
start_mnesia() -> start_mnesia() ->
mnesia_rocksdb_tlib:start_mnesia(reset), mnesia_rocksdb_tlib:start_mnesia(reset),
@ -94,7 +103,11 @@ expect_error(Level, Tab) ->
ok ok
after 1000 -> after 1000 ->
error({expected_error, [Level, Tab]}) error({expected_error, [Level, Tab]})
end.
end,
%% Also verify that an error entry has been written into the error store.
1 = ets:select_delete(?MODULE, [{{{Tab, '_'}, '_', '_'}, [], [true]}]),
ok.
rpt_tag(fatal ) -> mnesia_fatal; rpt_tag(fatal ) -> mnesia_fatal;
rpt_tag(error ) -> mnesia_error; rpt_tag(error ) -> mnesia_error;