Merge pull request #13 from aeternity/GH12-check-return-values
Check return values on update operations.
This commit is contained in:
commit
49ff8bcca0
6
Makefile
6
Makefile
@ -12,9 +12,13 @@ docs:
|
||||
check:
|
||||
$(REBAR3) dialyzer
|
||||
|
||||
test:
|
||||
eunit:
|
||||
$(REBAR3) eunit $(suite)
|
||||
|
||||
ct:
|
||||
$(REBAR3) ct $(suite)
|
||||
|
||||
test: eunit ct
|
||||
|
||||
conf_clean:
|
||||
@:
|
||||
|
62
README.md
62
README.md
@ -33,6 +33,68 @@ This means that a prefix key identifies the start of the sequence of
|
||||
entries whose keys match the prefix. The backend uses this to optimize
|
||||
selects on prefix keys.
|
||||
|
||||
## Customization
|
||||
|
||||
RocksDB supports a number of customization options. These can be specified
|
||||
by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`,
|
||||
for example:
|
||||
|
||||
```erlang
|
||||
mnesia:create_table(foo, [{rocksdb_copies, [node()]},
|
||||
...
|
||||
{user_properties,
|
||||
[{rocksdb_opts, [{max_open_files, 1024}]}]
|
||||
}])
|
||||
```
|
||||
|
||||
Consult the [RocksDB documentation](https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning)
|
||||
for information on configuration parameters. Also see the section below on handling write errors.
|
||||
|
||||
The default configuration for tables in `mnesia_rocksdb` is:
|
||||
```
|
||||
default_open_opts() ->
|
||||
[ {create_if_missing, true}
|
||||
, {cache_size,
|
||||
list_to_integer(get_env_default("ROCKSDB_CACHE_SIZE", "32212254"))}
|
||||
, {block_size, 1024}
|
||||
, {max_open_files, 100}
|
||||
, {write_buffer_size,
|
||||
list_to_integer(get_env_default(
|
||||
"ROCKSDB_WRITE_BUFFER_SIZE", "4194304"))}
|
||||
, {compression,
|
||||
list_to_atom(get_env_default("ROCKSDB_COMPRESSION", "true"))}
|
||||
, {use_bloomfilter, true}
|
||||
].
|
||||
```
|
||||
|
||||
It is also possible, for larger databases, to produce a tuning parameter file.
|
||||
This is experimental, and mostly copied from `mnesia_leveldb`. Consult the
|
||||
source code in `mnesia_rocksdb_tuning.erl` and `mnesia_rocksdb_params.erl`.
|
||||
Contributions are welcome.
|
||||
|
||||
## Handling of errors in write operations
|
||||
|
||||
The RocksDB update operations return either `ok` or `{error, any()}`.
|
||||
Since the actual updates are performed after the 'point-of-no-return',
|
||||
returning an `error` result will cause mnesia to behave unpredictably, since
|
||||
the operations are expected to simply work.
|
||||
|
||||
An `on_write_error` option can be provided, per-table, in the `rocksdb_opts`
|
||||
user property (see [Customization](#customization) above). Supported values indicate at which level an error
|
||||
indication should be reported. Mnesia may save reported events in RAM, and may
|
||||
also print them, depending on the debug level (controlled with `mnesia:set_debug_level/1`).
|
||||
|
||||
Mnesia debug levels are, in increasing detail, `none | verbose | debug | trace`
|
||||
The supported values for `on_write_error` are:
|
||||
|
||||
| Value | Saved at debug level | Printed at debug level | Action |
|
||||
| ------- | -------------------- | ---------------------- | --------- |
|
||||
| debug | unless none | verbose, debug, trace | ignore |
|
||||
| verbose | unless none | verbose, debug, trace | ignore |
|
||||
| warning | always | always | ignore |
|
||||
| error | always | always | exception |
|
||||
| fatal | always | always | core dump |
|
||||
|
||||
## Caveats
|
||||
|
||||
Avoid placing `bag` tables in RocksDB. Although they work, each write
|
||||
|
@ -9,6 +9,7 @@
|
||||
[
|
||||
{test,
|
||||
[
|
||||
{deps, [{proper, "1.2.0"}]}
|
||||
{deps, [ {proper, "1.2.0"}
|
||||
, {meck, "0.8.13"}]}
|
||||
]}
|
||||
]}.
|
||||
|
@ -165,24 +165,28 @@
|
||||
%% RECORDS
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
-record(sel, {alias, % TODO: not used
|
||||
tab,
|
||||
ref,
|
||||
keypat,
|
||||
ms, % TODO: not used
|
||||
compiled_ms,
|
||||
limit,
|
||||
key_only = false, % TODO: not used
|
||||
direction = forward}). % TODO: not used
|
||||
-record(sel, { alias % TODO: not used
|
||||
, tab
|
||||
, ref
|
||||
, keypat
|
||||
, ms % TODO: not used
|
||||
, compiled_ms
|
||||
, limit
|
||||
, key_only = false % TODO: not used
|
||||
, direction = forward}). % TODO: not used
|
||||
|
||||
-type on_write_error() :: debug | verbose | warning | error | fatal.
|
||||
-define(WRITE_ERR_DEFAULT, verbose).
|
||||
|
||||
-record(st, { ets
|
||||
, ref
|
||||
, alias
|
||||
, tab
|
||||
, type
|
||||
, size_warnings % integer()
|
||||
, maintain_size % boolean()
|
||||
}).
|
||||
, ref
|
||||
, alias
|
||||
, tab
|
||||
, type
|
||||
, size_warnings % integer()
|
||||
, maintain_size % boolean()
|
||||
, on_write_error = ?WRITE_ERR_DEFAULT :: on_write_error()
|
||||
}).
|
||||
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% CONVENIENCE API
|
||||
@ -338,21 +342,32 @@ prefixes(_) ->
|
||||
%% set is OK as ordered_set is a kind of set.
|
||||
check_definition(Alias, Tab, Nodes, Props) ->
|
||||
Id = {Alias, Nodes},
|
||||
Props1 = lists:map(
|
||||
fun({type, T} = P) ->
|
||||
if T==set; T==ordered_set; T==bag ->
|
||||
P;
|
||||
true ->
|
||||
mnesia:abort({combine_error,
|
||||
Tab,
|
||||
[Id, {type,T}]})
|
||||
end;
|
||||
({user_properties, _} = P) ->
|
||||
%% should perhaps verify rocksdb options...
|
||||
P;
|
||||
(P) -> P
|
||||
end, Props),
|
||||
{ok, Props1}.
|
||||
try lists:map(
|
||||
fun({type, T} = P) ->
|
||||
if T==set; T==ordered_set; T==bag ->
|
||||
P;
|
||||
true ->
|
||||
mnesia:abort({combine_error,
|
||||
Tab,
|
||||
[Id, {type,T}]})
|
||||
end;
|
||||
({user_properties, UPs} = P) ->
|
||||
RdbOpts = proplists:get_value(rocksdb_opts, UPs, []),
|
||||
OWE = proplists:get_value(on_write_error, RdbOpts, ?WRITE_ERR_DEFAULT),
|
||||
case valid_mnesia_op(OWE) of
|
||||
true ->
|
||||
P;
|
||||
false ->
|
||||
throw({error, {invalid, {on_write_error, OWE}}})
|
||||
end;
|
||||
(P) -> P
|
||||
end, Props) of
|
||||
Props1 ->
|
||||
{ok, Props1}
|
||||
catch
|
||||
throw:Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%% -> ok | {error, exists}
|
||||
create_table(_Alias, Tab, _Props) ->
|
||||
@ -400,7 +415,9 @@ load_table_(Alias, Tab, Type, RdbOpts) ->
|
||||
%% transform_table on a rocksdb_table that has indexing.
|
||||
?dbg("ERR: table:~p already loaded pid:~p stack:~p~n",
|
||||
[Tab, _Pid, _Stack]),
|
||||
ok
|
||||
ok;
|
||||
{error, Other} ->
|
||||
mnesia:abort(Other)
|
||||
end.
|
||||
|
||||
close_table(Alias, Tab) ->
|
||||
@ -798,26 +815,23 @@ slot_iter_set(Res, _, _, _) when element(1, Res) =/= ok ->
|
||||
'$end_of_table'.
|
||||
|
||||
update_counter(Alias, Tab, C, Val) when is_integer(Val) ->
|
||||
case call(Alias, Tab, {update_counter, C, Val}) of
|
||||
badarg ->
|
||||
mnesia:abort(badarg);
|
||||
Res ->
|
||||
Res
|
||||
end.
|
||||
call(Alias, Tab, {update_counter, C, Val}).
|
||||
|
||||
%% server-side part
|
||||
do_update_counter(C, Val, Ref) ->
|
||||
do_update_counter(C, Val, Ref, St) ->
|
||||
Enc = encode_key(C),
|
||||
case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of
|
||||
{ok, EncVal} ->
|
||||
case decode_val(EncVal) of
|
||||
{_, _, Old} = Rec when is_integer(Old) ->
|
||||
Res = Old+Val,
|
||||
?rocksdb:put(Ref, Enc,
|
||||
encode_val(
|
||||
setelement(3, Rec, Res)),
|
||||
[]),
|
||||
Res;
|
||||
return_catch(
|
||||
fun() ->
|
||||
db_put(Ref, Enc,
|
||||
encode_val(
|
||||
setelement(3, Rec, Res)),
|
||||
[], St)
|
||||
end);
|
||||
_ ->
|
||||
badarg
|
||||
end;
|
||||
@ -877,16 +891,22 @@ start_proc(Alias, Tab, Type, RdbOpts) ->
|
||||
|
||||
init({Alias, Tab, Type, RdbOpts}) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
||||
St = #st{ ets = Ets
|
||||
, ref = Ref
|
||||
, alias = Alias
|
||||
, tab = Tab
|
||||
, type = Type
|
||||
, size_warnings = 0
|
||||
, maintain_size = should_maintain_size(Tab)
|
||||
},
|
||||
{ok, recover_size_info(St)}.
|
||||
try
|
||||
{ok, Ref, Ets} = do_load_table(Tab, RdbOpts),
|
||||
St = #st{ ets = Ets
|
||||
, ref = Ref
|
||||
, alias = Alias
|
||||
, tab = Tab
|
||||
, type = Type
|
||||
, size_warnings = 0
|
||||
, maintain_size = should_maintain_size(Tab)
|
||||
},
|
||||
OnWriteError = proplists:get_value(on_write_error, RdbOpts, St#st.on_write_error),
|
||||
{ok, recover_size_info(St#st{on_write_error = OnWriteError})}
|
||||
catch
|
||||
throw:badarg ->
|
||||
{error, write_error}
|
||||
end.
|
||||
|
||||
do_load_table(Tab, RdbOpts) ->
|
||||
MPd = data_mountpoint(Tab),
|
||||
@ -906,13 +926,13 @@ handle_call({write_info, Key, Value}, _From, #st{} = St) ->
|
||||
_ = write_info_(Key, Value, St),
|
||||
{reply, ok, St};
|
||||
handle_call({update_counter, C, Incr}, _From, #st{ref = Ref} = St) ->
|
||||
{reply, do_update_counter(C, Incr, Ref), St};
|
||||
{reply, do_update_counter(C, Incr, Ref, St), St};
|
||||
handle_call({insert, Key, Val}, _From, St) ->
|
||||
do_insert(Key, Val, St),
|
||||
{reply, ok, St};
|
||||
Res = do_insert(Key, Val, St),
|
||||
{reply, Res, St};
|
||||
handle_call({delete, Key}, _From, St) ->
|
||||
do_delete(Key, St),
|
||||
{reply, ok, St};
|
||||
Res = do_delete(Key, St),
|
||||
{reply, Res, St};
|
||||
handle_call(clear_table, _From, #st{ets = Ets, tab = Tab, ref = Ref} = St) ->
|
||||
MPd = data_mountpoint(Tab),
|
||||
?dbg("Attempting clear_table(~p)~n", [Tab]),
|
||||
@ -1162,16 +1182,19 @@ size_warning(Alias, Tab) ->
|
||||
gen_server:cast(ProcName, size_warning).
|
||||
|
||||
%% server-side end of insert/3.
|
||||
do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false}) ->
|
||||
do_insert_bag(Ref, K, V, false);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_insert_bag(Ref, K, V, CurSz),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok;
|
||||
do_insert(K, V, #st{ref = Ref, maintain_size = false}) ->
|
||||
?rocksdb:put(Ref, K, V, []);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) ->
|
||||
do_insert(K, V, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
|
||||
return_catch(fun() -> do_insert_bag(Ref, K, V, false, St) end);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
|
||||
return_catch(
|
||||
fun() ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_insert_bag(Ref, K, V, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok
|
||||
end);
|
||||
do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) ->
|
||||
return_catch(fun() -> db_put(Ref, K, V, [], St) end);
|
||||
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
|
||||
IsNew =
|
||||
case ?rocksdb:get(Ref, K, []) of
|
||||
{ok, _} ->
|
||||
@ -1181,70 +1204,81 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true}) ->
|
||||
end,
|
||||
case IsNew of
|
||||
true ->
|
||||
NewSz = read_info(size, 0, Ets) + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
?rocksdb:write(Ref, [{put, Ki, Vi}, {put, K, V}], []),
|
||||
ets_insert_info(Ets, size, NewSz);
|
||||
return_catch(
|
||||
fun() ->
|
||||
NewSz = read_info(size, 0, Ets) + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
L = [{put, Ki, Vi}, {put, K, V}],
|
||||
write_result(mnesia_rocksdb_lib:write(Ref, L, []),
|
||||
write, [Ref, L, []], St), % may throw
|
||||
ets_insert_info(Ets, size, NewSz)
|
||||
end);
|
||||
false ->
|
||||
?rocksdb:put(Ref, K, V, [])
|
||||
return_catch(fun() -> db_put(Ref, K, V, [], St) end)
|
||||
end,
|
||||
ok.
|
||||
|
||||
do_insert_bag(Ref, K, V, CurSz) ->
|
||||
do_insert_bag(Ref, K, V, CurSz, St) ->
|
||||
KSz = byte_size(K),
|
||||
with_iterator(
|
||||
Ref, fun(I) ->
|
||||
do_insert_bag_(
|
||||
KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz)
|
||||
KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St)
|
||||
end).
|
||||
|
||||
|
||||
%% There's a potential access pattern that would force counters to
|
||||
%% creep upwards and eventually hit the limit. This could be addressed,
|
||||
%% with compaction. TODO.
|
||||
do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz) when Prev < ?MAX_BAG ->
|
||||
do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
|
||||
case Res of
|
||||
{ok, <<K:Sz/binary, _:?BAG_CNT>>, V} ->
|
||||
%% object exists
|
||||
TSz;
|
||||
{ok, <<K:Sz/binary, N:?BAG_CNT>>, _} ->
|
||||
do_insert_bag_(
|
||||
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz);
|
||||
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St);
|
||||
_ when TSz =:= false ->
|
||||
Key = <<K/binary, (Prev+1):?BAG_CNT>>,
|
||||
?rocksdb:put(Ref, Key, V, []);
|
||||
db_put(Ref, Key, V, [], St);
|
||||
_ ->
|
||||
NewSz = TSz + 1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
Key = <<K/binary, (Prev+1):?BAG_CNT>>,
|
||||
?rocksdb:write(Ref, [{put, Ki, Vi}, {put, Key, V}], []),
|
||||
db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St),
|
||||
NewSz
|
||||
end.
|
||||
|
||||
%% server-side part
|
||||
do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false}) ->
|
||||
do_delete_bag(byte_size(Key), Key, Ref, false);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true}) ->
|
||||
Sz = byte_size(Key),
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_delete_bag(Sz, Key, Ref, CurSz),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok;
|
||||
do_delete(Key, #st{ref = Ref, maintain_size = false}) ->
|
||||
?rocksdb:delete(Ref, Key, []);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true}) ->
|
||||
do_delete(Key, #st{ref = Ref, type = bag, maintain_size = false} = St) ->
|
||||
return_catch(fun() -> do_delete_bag(byte_size(Key), Key, Ref, false, St) end);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St) ->
|
||||
return_catch(
|
||||
fun() ->
|
||||
Sz = byte_size(Key),
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = do_delete_bag(Sz, Key, Ref, CurSz, St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok
|
||||
end);
|
||||
do_delete(Key, #st{ref = Ref, maintain_size = false} = St) ->
|
||||
return_catch(fun() -> db_delete(Ref, Key, [], St) end);
|
||||
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of
|
||||
{ok, _} ->
|
||||
NewSz = CurSz -1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
ok = ?rocksdb:write(Ref, [{delete, Key}, {put, Ki, Vi}], []),
|
||||
ets_insert_info(Ets, size, NewSz);
|
||||
return_catch(
|
||||
fun() ->
|
||||
NewSz = CurSz -1,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
|
||||
ets_insert_info(Ets, size, NewSz)
|
||||
end);
|
||||
not_found ->
|
||||
false
|
||||
end.
|
||||
|
||||
do_delete_bag(Sz, Key, Ref, TSz) ->
|
||||
do_delete_bag(Sz, Key, Ref, TSz, St) ->
|
||||
Found =
|
||||
with_iterator(
|
||||
Ref, fun(I) ->
|
||||
@ -1255,13 +1289,13 @@ do_delete_bag(Sz, Key, Ref, TSz) ->
|
||||
{[], _} ->
|
||||
TSz;
|
||||
{_, false} ->
|
||||
?rocksdb:write(Ref, [{delete, K} || K <- Found], []);
|
||||
db_write(Ref, [{delete, K} || K <- Found], [], St);
|
||||
{_, _} ->
|
||||
N = length(Found),
|
||||
NewSz = TSz - N,
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
?rocksdb:write(Ref, [{put, Ki, Vi} |
|
||||
[{delete, K} || K <- Found]], []),
|
||||
db_write(Ref, [{put, Ki, Vi} |
|
||||
[{delete, K} || K <- Found]], [], St),
|
||||
NewSz
|
||||
end.
|
||||
|
||||
@ -1279,21 +1313,21 @@ do_delete_bag_(Sz, K, Res, Ref, I) ->
|
||||
end.
|
||||
|
||||
do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type,
|
||||
maintain_size = MaintainSize}) ->
|
||||
maintain_size = MaintainSize} = St) ->
|
||||
Fun = fun(_, Key, Acc) -> [Key|Acc] end,
|
||||
Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30),
|
||||
case {Keys, MaintainSize} of
|
||||
{[], _} ->
|
||||
ok;
|
||||
{_, false} ->
|
||||
?rocksdb:write(Ref, [{delete, K} || K <- Keys], []),
|
||||
db_write(Ref, [{delete, K} || K <- Keys], [], St),
|
||||
ok;
|
||||
{_, true} ->
|
||||
CurSz = read_info(size, 0, Ets),
|
||||
NewSz = max(CurSz - length(Keys), 0),
|
||||
{Ki, Vi} = info_obj(size, NewSz),
|
||||
?rocksdb:write(Ref, [{put, Ki, Vi} |
|
||||
[{delete, K} || K <- Keys]], []),
|
||||
db_write(Ref, [{put, Ki, Vi} |
|
||||
[{delete, K} || K <- Keys]], [], St),
|
||||
ets_insert_info(Ets, size, NewSz),
|
||||
ok
|
||||
end.
|
||||
@ -1339,8 +1373,8 @@ property(Tab, Prop, Default) ->
|
||||
exit:_ -> Default
|
||||
end.
|
||||
|
||||
write_info_(Item, Val, #st{ets = Ets, ref = Ref}) ->
|
||||
rocksdb_insert_info(Ref, Item, Val),
|
||||
write_info_(Item, Val, #st{ets = Ets, ref = Ref} = St) ->
|
||||
rocksdb_insert_info(Ref, Item, Val, St),
|
||||
ets_insert_info(Ets, Item, Val).
|
||||
|
||||
ets_insert_info(Ets, Item, Val) ->
|
||||
@ -1349,14 +1383,14 @@ ets_insert_info(Ets, Item, Val) ->
|
||||
ets_delete_info(Ets, Item) ->
|
||||
ets:delete(Ets, {info, Item}).
|
||||
|
||||
rocksdb_insert_info(Ref, Item, Val) ->
|
||||
rocksdb_insert_info(Ref, Item, Val, St) ->
|
||||
EncKey = info_key(Item),
|
||||
EncVal = encode_val(Val),
|
||||
?rocksdb:put(Ref, EncKey, EncVal, []).
|
||||
db_put(Ref, EncKey, EncVal, [], St).
|
||||
|
||||
rocksdb_delete_info(Ref, Item) ->
|
||||
rocksdb_delete_info(Ref, Item, St) ->
|
||||
EncKey = info_key(Item),
|
||||
?rocksdb:delete(Ref, EncKey, []).
|
||||
db_delete(Ref, EncKey, [], St).
|
||||
|
||||
info_obj(Item, Val) ->
|
||||
{info_key(Item), encode_val(Val)}.
|
||||
@ -1364,8 +1398,8 @@ info_obj(Item, Val) ->
|
||||
info_key(Item) ->
|
||||
<<?INFO_TAG, (encode_key(Item))/binary>>.
|
||||
|
||||
delete_info_(Item, #st{ets = Ets, ref = Ref}) ->
|
||||
rocksdb_delete_info(Ref, Item),
|
||||
delete_info_(Item, #st{ets = Ets, ref = Ref} = St) ->
|
||||
rocksdb_delete_info(Ref, Item, St),
|
||||
ets_delete_info(Ets, Item).
|
||||
|
||||
read_info(Item, Default, Ets) ->
|
||||
@ -1555,6 +1589,58 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos) when is_tuple(HeadPat) ->
|
||||
keypat_pfx(_, _) ->
|
||||
<<>>.
|
||||
|
||||
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% Db wrappers
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
return_catch(F) when is_function(F, 0) ->
|
||||
try F()
|
||||
catch
|
||||
throw:badarg ->
|
||||
badarg
|
||||
end.
|
||||
|
||||
db_put(Ref, K, V, Opts, St) ->
|
||||
write_result(mnesia_rocksdb_lib:put(Ref, K, V, Opts), put, [Ref, K, V, Opts], St).
|
||||
|
||||
db_write(Ref, List, Opts, St) ->
|
||||
write_result(mnesia_rocksdb_lib:write(Ref, List, Opts), write, [Ref, List, Opts], St).
|
||||
|
||||
db_delete(Ref, K, Opts, St) ->
|
||||
write_result(mnesia_rocksdb_lib:delete(Ref, K, Opts), delete, [Ref, K, Opts], St).
|
||||
|
||||
write_result(ok, _, _, _) ->
|
||||
ok;
|
||||
write_result(Res, Op, Args, #st{on_write_error = Rpt}) ->
|
||||
RptOp = rpt_op(Rpt),
|
||||
mnesia_lib:RptOp("FAILED rocksdb:~p(" ++ rpt_fmt(Args) ++ ") -> ~p~n",
|
||||
[Op | Args] ++ [Res]),
|
||||
if Rpt == fatal; Rpt == error ->
|
||||
throw(badarg);
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
rpt_fmt([_|T]) ->
|
||||
lists:append(["~p" | [", ~p" || _ <- T]]).
|
||||
|
||||
rpt_op(debug) ->
|
||||
dbg_out;
|
||||
rpt_op(Op) ->
|
||||
Op.
|
||||
|
||||
valid_mnesia_op(Op) ->
|
||||
if Op==debug
|
||||
; Op==verbose
|
||||
; Op==warning
|
||||
; Op==error
|
||||
; Op==fatal ->
|
||||
true;
|
||||
true ->
|
||||
false
|
||||
end.
|
||||
|
||||
%% ----------------------------------------------------------------------------
|
||||
%% COMMON PRIVATE
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
17
src/mnesia_rocksdb_lib.erl
Normal file
17
src/mnesia_rocksdb_lib.erl
Normal file
@ -0,0 +1,17 @@
|
||||
%%% @doc RocksDB update wrappers, in separate module for easy tracing and mocking.
|
||||
%%%
|
||||
-module(mnesia_rocksdb_lib).
|
||||
|
||||
-export([put/4,
|
||||
write/3,
|
||||
delete/3]).
|
||||
|
||||
|
||||
put(Ref, K, V, Opts) ->
|
||||
rocksdb:put(Ref, K, V, Opts).
|
||||
|
||||
write(Ref, L, Opts) ->
|
||||
rocksdb:write(Ref, L, Opts).
|
||||
|
||||
delete(Ref, K, Opts) ->
|
||||
rocksdb:delete(Ref, K, Opts).
|
49
test/mnesia_rocksdb_SUITE.erl
Normal file
49
test/mnesia_rocksdb_SUITE.erl
Normal file
@ -0,0 +1,49 @@
|
||||
-module(mnesia_rocksdb_SUITE).
|
||||
|
||||
-export([
|
||||
all/0
|
||||
, groups/0
|
||||
, suite/0
|
||||
, init_per_suite/1
|
||||
, end_per_suite/1
|
||||
, init_per_group/2
|
||||
, end_per_group/2
|
||||
, init_per_testcase/2
|
||||
, end_per_testcase/2
|
||||
]).
|
||||
|
||||
-export([error_handling/1]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
suite() ->
|
||||
[].
|
||||
|
||||
all() ->
|
||||
[{group, all_tests}].
|
||||
|
||||
groups() ->
|
||||
[{all_tests, [sequence], [error_handling]}].
|
||||
|
||||
|
||||
error_handling(_Config) ->
|
||||
mnesia_rocksdb_error_handling:run().
|
||||
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(_, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
103
test/mnesia_rocksdb_error_handling.erl
Normal file
103
test/mnesia_rocksdb_error_handling.erl
Normal file
@ -0,0 +1,103 @@
|
||||
-module(mnesia_rocksdb_error_handling).
|
||||
|
||||
-export([run/0,
|
||||
run/4]).
|
||||
|
||||
|
||||
run() ->
|
||||
setup(),
|
||||
%% run only one test for 'fatal', to save time.
|
||||
[run(Type, Op, L, MaintainSz) || MaintainSz <- [false, true],
|
||||
Type <- [set, bag],
|
||||
Op <- [insert, update, delete],
|
||||
L <- levels()]
|
||||
++ [run(set, insert, fatal, false)].
|
||||
|
||||
run(Type, Op, Level, MaintainSz) ->
|
||||
setup(),
|
||||
{ok, Tab} = create_tab(Type, Level, MaintainSz),
|
||||
mnesia:dirty_write({Tab, a, 1}), % pre-existing data
|
||||
with_mock(Level, Op, Tab, fun() ->
|
||||
try_write(Op, Type, Tab),
|
||||
expect_error(Level, Tab)
|
||||
end).
|
||||
|
||||
levels() ->
|
||||
[debug, verbose, warning, error].
|
||||
|
||||
setup() ->
|
||||
mnesia:stop(),
|
||||
start_mnesia().
|
||||
|
||||
create_tab(Type, Level, MaintainSz) ->
|
||||
TabName = tab_name(Type, Level, MaintainSz),
|
||||
UserProps = user_props(Level, MaintainSz),
|
||||
{atomic, ok} = mnesia:create_table(TabName, [{rdb, [node()]},
|
||||
{user_properties, UserProps}]),
|
||||
{ok, TabName}.
|
||||
|
||||
tab_name(Type, Level, MaintainSz) ->
|
||||
binary_to_atom(iolist_to_binary(
|
||||
["t" | [["_", atom_to_list(A)]
|
||||
|| A <- [?MODULE, Type, Level, MaintainSz]]]), utf8).
|
||||
|
||||
user_props(Level, MaintainSz) ->
|
||||
[{maintain_sz, MaintainSz},
|
||||
{rocksdb_opts, [{on_write_error, Level}]}].
|
||||
|
||||
start_mnesia() ->
|
||||
mnesia_rocksdb_tlib:start_mnesia(reset),
|
||||
ok.
|
||||
|
||||
with_mock(Level, Op, Tab, F) ->
|
||||
mnesia:subscribe(system),
|
||||
mnesia:set_debug_level(debug),
|
||||
meck:new(mnesia_rocksdb_lib, [passthrough]),
|
||||
meck:expect(mnesia_rocksdb_lib, put, 4, {error, some_put_error}),
|
||||
meck:expect(mnesia_rocksdb_lib, write, 3, {error, some_write_error}),
|
||||
meck:expect(mnesia_rocksdb_lib, delete, 3, {error,some_delete_error}),
|
||||
try {Level, Op, Tab, F()} of
|
||||
{_, _, _, ok} ->
|
||||
ok;
|
||||
Other ->
|
||||
io:fwrite("OTHER: ~p~n", [Other]),
|
||||
ok
|
||||
catch
|
||||
exit:{{aborted,_},_} ->
|
||||
Level = error,
|
||||
ok
|
||||
after
|
||||
mnesia:set_debug_level(none),
|
||||
mnesia:unsubscribe(system),
|
||||
meck:unload(mnesia_rocksdb_lib)
|
||||
end.
|
||||
|
||||
try_write(insert, set, Tab) ->
|
||||
mnesia:dirty_write({Tab, b, 2});
|
||||
try_write(insert, bag, Tab) ->
|
||||
mnesia:dirty_write({Tab, a, 2});
|
||||
try_write(update, _, Tab) ->
|
||||
mnesia:dirty_write({Tab, a, 1});
|
||||
try_write(delete, _, Tab) ->
|
||||
mnesia:dirty_delete({Tab, a}).
|
||||
|
||||
|
||||
expect_error(Level, Tab) ->
|
||||
Tag = rpt_tag(Level),
|
||||
receive
|
||||
{mnesia_system_event, {mnesia_fatal, Fmt, Args, _Core}} ->
|
||||
Tag = mnesia_fatal,
|
||||
io:fwrite("EVENT(~p, ~p):~n ~s", [Tag, Tab, io_lib:fwrite(Fmt, Args)]),
|
||||
ok;
|
||||
{mnesia_system_event, {Tag, Fmt, Args}} ->
|
||||
io:fwrite("EVENT(~p, ~p):~n ~s", [Tag, Tab, io_lib:fwrite(Fmt, Args)]),
|
||||
ok
|
||||
after 1000 ->
|
||||
error({expected_error, [Level, Tab]})
|
||||
end.
|
||||
|
||||
rpt_tag(fatal ) -> mnesia_fatal;
|
||||
rpt_tag(error ) -> mnesia_error;
|
||||
rpt_tag(warning) -> mnesia_warning;
|
||||
rpt_tag(verbose) -> mnesia_info;
|
||||
rpt_tag(debug ) -> mnesia_info.
|
Loading…
x
Reference in New Issue
Block a user