Consistent support for merge ops #4
12
README.md
12
README.md
@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
|
|||||||
it yourself. If you only need the size occasionally, you may traverse the
|
it yourself. If you only need the size occasionally, you may traverse the
|
||||||
table to count the elements.
|
table to count the elements.
|
||||||
|
|
||||||
|
When `mrdb` transactions abort, they will return a stacktrace caught
|
||||||
|
from within the transaction fun, giving much better debugging info.
|
||||||
|
This is different from how mnesia does it.
|
||||||
|
|
||||||
|
If behavior closer to mnesia's abort returns are needed, say, for backwards
|
||||||
|
compatibility, this can be controlled by setting the environment variable
|
||||||
|
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
|
||||||
|
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
|
||||||
|
For really performance-critical transactions which may abort often, it might
|
||||||
|
make a difference to set this option to `true`, since there is a cost involved
|
||||||
|
in producing stacktraces.
|
||||||
|
|
||||||
|
|
||||||
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
{deps,
|
{deps,
|
||||||
[
|
[
|
||||||
{sext, "1.8.0"},
|
{sext, "1.8.0"},
|
||||||
{rocksdb, {git, "https://gitlab.com/barrel-db/erlang-rocksdb.git", {tag,"1.8.0"}}},
|
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||||
{hut, "1.4.0"}
|
{hut, "1.4.0"}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
{"1.2.0",
|
{"1.2.0",
|
||||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||||
{<<"rocksdb">>,
|
{<<"rocksdb">>,
|
||||||
{git,"https://gitlab.com/barrel-db/erlang-rocksdb.git",
|
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||||
{ref,"fced5f637de7991c5948e28414ba3790b0476c4b"}},
|
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||||
0},
|
0},
|
||||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||||
[
|
[
|
||||||
|
@ -1489,11 +1489,7 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
|
|||||||
%% not yet created
|
%% not yet created
|
||||||
CFs = cfs(CFs0),
|
CFs = cfs(CFs0),
|
||||||
file:make_dir(MP),
|
file:make_dir(MP),
|
||||||
OpenOpts = [ {create_if_missing, true}
|
OpenRes = rocksdb_open(MP, Opts, CFs),
|
||||||
, {create_missing_column_families, true}
|
|
||||||
, {merge_operator, erlang_merge_operator}
|
|
||||||
| Opts ],
|
|
||||||
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
|
|
||||||
map_cfs(OpenRes, CFs, Alias, Acc0);
|
map_cfs(OpenRes, CFs, Alias, Acc0);
|
||||||
false ->
|
false ->
|
||||||
{error, enoent};
|
{error, enoent};
|
||||||
@ -1504,9 +1500,15 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
|
|||||||
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
open_opts(Opts) ->
|
||||||
|
[ {create_if_missing, true}
|
||||||
|
, {create_missing_column_families, true}
|
||||||
|
, {merge_operator, erlang_merge_operator}
|
||||||
|
| Opts ].
|
||||||
|
|
||||||
rocksdb_open(MP, Opts, CFs) ->
|
rocksdb_open(MP, Opts, CFs) ->
|
||||||
%% rocksdb:open(MP, Opts, CFs),
|
%% rocksdb:open(MP, Opts, CFs),
|
||||||
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
|
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
|
||||||
|
|
||||||
is_open(Alias, #st{backends = Bs}) ->
|
is_open(Alias, #st{backends = Bs}) ->
|
||||||
case maps:find(Alias, Bs) of
|
case maps:find(Alias, Bs) of
|
||||||
|
43
src/mrdb.erl
43
src/mrdb.erl
@ -53,6 +53,7 @@
|
|||||||
, delete/2 , delete/3
|
, delete/2 , delete/3
|
||||||
, delete_object/2, delete_object/3
|
, delete_object/2, delete_object/3
|
||||||
, match_delete/2
|
, match_delete/2
|
||||||
|
, merge/3 , merge/4
|
||||||
, clear_table/1
|
, clear_table/1
|
||||||
, batch_write/2 , batch_write/3
|
, batch_write/2 , batch_write/3
|
||||||
, update_counter/3, update_counter/4
|
, update_counter/3, update_counter/4
|
||||||
@ -297,14 +298,34 @@ do_activity(F, Alias, Ctxt) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
try_f(F, Ctxt) ->
|
try_f(F, Ctxt) ->
|
||||||
|
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
|
||||||
|
|
||||||
|
try_f(false, F, Ctxt) ->
|
||||||
try run_f(F, Ctxt) of
|
try run_f(F, Ctxt) of
|
||||||
Res ->
|
Res ->
|
||||||
commit_and_pop(Res)
|
commit_and_pop(Res)
|
||||||
catch
|
catch
|
||||||
|
throw:Something ->
|
||||||
|
abort_and_pop(throw, Something);
|
||||||
|
Cat:Err:T ->
|
||||||
|
%% Without capturing the stacktract here,
|
||||||
|
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||||
|
abort_and_pop(Cat, {Err, T})
|
||||||
|
end;
|
||||||
|
try_f(true, F, Ctxt) ->
|
||||||
|
try run_f(F, Ctxt) of
|
||||||
|
Res ->
|
||||||
|
commit_and_pop(Res)
|
||||||
|
catch
|
||||||
|
throw:Something ->
|
||||||
|
abort_and_pop(throw, Something);
|
||||||
Cat:Err ->
|
Cat:Err ->
|
||||||
|
%% Without capturing the stacktract here,
|
||||||
|
%% debugging gets pretty difficult
|
||||||
abort_and_pop(Cat, Err)
|
abort_and_pop(Cat, Err)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
run_f(F, Ctxt) ->
|
run_f(F, Ctxt) ->
|
||||||
push_ctxt(Ctxt),
|
push_ctxt(Ctxt),
|
||||||
F().
|
F().
|
||||||
@ -411,7 +432,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
|
|||||||
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
||||||
|
|
||||||
check_tx_opts(Opts) ->
|
check_tx_opts(Opts) ->
|
||||||
case maps:without([no_snapshot, retries], Opts) of
|
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
|
||||||
Other when map_size(Other) > 0 ->
|
Other when map_size(Other) > 0 ->
|
||||||
abort({invalid_tx_opts, maps:keys(Other)});
|
abort({invalid_tx_opts, maps:keys(Other)});
|
||||||
_ ->
|
_ ->
|
||||||
@ -523,6 +544,11 @@ re_throw(Cat, Err) ->
|
|||||||
mnesia_compatible_aborts() ->
|
mnesia_compatible_aborts() ->
|
||||||
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
||||||
|
|
||||||
|
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
|
||||||
|
Bool;
|
||||||
|
mnesia_compatible_aborts(_) ->
|
||||||
|
mnesia_compatible_aborts().
|
||||||
|
|
||||||
fix_error({aborted, Err}) ->
|
fix_error({aborted, Err}) ->
|
||||||
Err;
|
Err;
|
||||||
fix_error(Err) ->
|
fix_error(Err) ->
|
||||||
@ -715,6 +741,21 @@ insert(Tab, Obj0, Opts) ->
|
|||||||
EncVal = encode_val(Obj, Ref),
|
EncVal = encode_val(Obj, Ref),
|
||||||
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
||||||
|
|
||||||
|
merge(Tab, Key, MergeOp) ->
|
||||||
|
merge(Tab, Key, MergeOp, []).
|
||||||
|
|
||||||
|
merge(Tab, Key, MergeOp, Opts) ->
|
||||||
|
#{encoding := Enc} = Ref = ensure_ref(Tab),
|
||||||
|
case Enc of
|
||||||
|
{_, {value, term}} ->
|
||||||
|
merge_(Ref, Key, MergeOp, Opts);
|
||||||
|
_ ->
|
||||||
|
abort(badarg)
|
||||||
|
end.
|
||||||
|
|
||||||
|
merge_(Ref, Key, MergeOp, Opts) ->
|
||||||
|
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
|
||||||
|
|
||||||
validate_obj(Obj, #{mode := mnesia}) ->
|
validate_obj(Obj, #{mode := mnesia}) ->
|
||||||
Obj;
|
Obj;
|
||||||
validate_obj(Obj, #{attr_pos := AP,
|
validate_obj(Obj, #{attr_pos := AP,
|
||||||
|
@ -269,7 +269,7 @@ mrdb_abort(Config) ->
|
|||||||
Pre = mrdb:read(tx_abort, a),
|
Pre = mrdb:read(tx_abort, a),
|
||||||
D0 = get_dict(),
|
D0 = get_dict(),
|
||||||
TRes = try mrdb:activity(
|
TRes = try mrdb:activity(
|
||||||
tx, rdb,
|
{tx, #{mnesia_compatible => true}}, rdb,
|
||||||
fun() ->
|
fun() ->
|
||||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||||
error(abort_here),
|
error(abort_here),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user