Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ce0c09aec6 | |||
| 6c1a343c82 | |||
| 6246f3f2e1 | |||
| c6969238c7 | |||
| d6a4dca5f6 |
+1
-1
@@ -4,7 +4,7 @@
|
||||
{deps,
|
||||
[
|
||||
{sext, "1.8.0"},
|
||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref, "09df04e"}}},
|
||||
{hut, "1.4.0"}
|
||||
]}.
|
||||
|
||||
|
||||
+1
-1
@@ -2,7 +2,7 @@
|
||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||
{<<"rocksdb">>,
|
||||
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||
{ref,"09df04e02f250a5075bb93cafdd3b637cab81d96"}},
|
||||
0},
|
||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||
[
|
||||
|
||||
+93
-16
@@ -68,6 +68,7 @@
|
||||
, select/2 , select/3
|
||||
, select_reverse/2, select_reverse/3
|
||||
, select/1
|
||||
, ms/2
|
||||
, fold/3 , fold/4 , fold/5
|
||||
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
|
||||
, rdb_fold/4 , rdb_fold/5
|
||||
@@ -98,12 +99,18 @@
|
||||
, encode_val/2
|
||||
, decode_val/3 ]).
|
||||
|
||||
%% TODO: OTP 28.3 dialyzer is quite pesky about opaque types. Find a way
|
||||
%% to structure this, and firm up the type specs.
|
||||
-dialyzer([no_opaque, no_return]).
|
||||
-dialyzer([{nowarn_function, [activity/3]}]).
|
||||
|
||||
-export_type( [ mrdb_iterator/0
|
||||
, itr_handle/0
|
||||
, iterator_action/0
|
||||
, db_ref/0
|
||||
, ref_or_tab/0
|
||||
, index_position/0
|
||||
, match_pattern/0
|
||||
]).
|
||||
|
||||
-include("mnesia_rocksdb.hrl").
|
||||
@@ -125,6 +132,9 @@
|
||||
-type outer() :: non_neg_integer().
|
||||
-type retries() :: outer() | {inner(), outer()}.
|
||||
|
||||
-type matchpat_map() :: #{atom() => _}.
|
||||
-type match_pattern() :: matchpat_map() | ets:match_pattern().
|
||||
|
||||
%% activity type 'ets' makes no sense in this context
|
||||
-type mnesia_activity_type() :: transaction
|
||||
| sync_transaction
|
||||
@@ -174,7 +184,7 @@
|
||||
-type attr_pos() :: #{atom() := pos()}.
|
||||
|
||||
-type db_ref() :: #{ name => table()
|
||||
, alias => atom()
|
||||
, alias => alias()
|
||||
, vsn => non_neg_integer()
|
||||
, db_ref := db_handle()
|
||||
, cf_handle := cf_handle()
|
||||
@@ -189,6 +199,11 @@
|
||||
, activity => activity()
|
||||
, _ => _}.
|
||||
|
||||
-type context() :: #{ 'activity' => activity()
|
||||
, 'alias' => alias()
|
||||
, 'retries' => retries()
|
||||
, 'db_ref' => db_ref() }.
|
||||
|
||||
-type error() :: {error, any()}.
|
||||
|
||||
-type ref_or_tab() :: table() | db_ref().
|
||||
@@ -280,7 +295,7 @@ release_snapshot(SHandle) ->
|
||||
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
||||
%% `batch | async_dirty | sync_dirty' are synonyms.
|
||||
%% @end
|
||||
-spec activity(activity_type(), alias(), fun( () -> Res )) -> Res.
|
||||
-spec activity(activity_type(), alias(), fun( () -> any() )) -> any().
|
||||
activity(Type, Alias, F) ->
|
||||
#{db_ref := DbRef} = ensure_ref({admin, Alias}),
|
||||
Ctxt = case tx_type(Type) of
|
||||
@@ -298,6 +313,7 @@ activity(Type, Alias, F) ->
|
||||
end,
|
||||
do_activity(F, Alias, Ctxt).
|
||||
|
||||
-spec do_activity(fun(() -> any()), alias(), context()) -> any().
|
||||
do_activity(F, Alias, Ctxt) ->
|
||||
try try_f(F, Ctxt)
|
||||
catch
|
||||
@@ -314,11 +330,11 @@ try_f(false, F, Ctxt) ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
abort_and_pop(false, throw, Something);
|
||||
Cat:Err:T ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||
abort_and_pop(Cat, {Err, T})
|
||||
abort_and_pop(false, Cat, {Err, T})
|
||||
end;
|
||||
try_f(true, F, Ctxt) ->
|
||||
try run_f(F, Ctxt) of
|
||||
@@ -326,11 +342,11 @@ try_f(true, F, Ctxt) ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
abort_and_pop(true, throw, Something);
|
||||
Cat:Err ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult
|
||||
abort_and_pop(Cat, Err)
|
||||
abort_and_pop(true, Cat, Err)
|
||||
end.
|
||||
|
||||
|
||||
@@ -363,7 +379,7 @@ retry_activity(F, Alias, #{activity := #{ type := Type
|
||||
retry_activity(F, Alias, Ctxt1)
|
||||
end;
|
||||
error ->
|
||||
return_abort(Type, error, retry_limit)
|
||||
return_abort(mnesia_compatible_aborts(Ctxt), Type, error, retry_limit)
|
||||
end.
|
||||
|
||||
retry_activity_(inner, F, Alias, Ctxt) ->
|
||||
@@ -420,6 +436,7 @@ current_context() ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec tx_type(activity_type()) -> {'tx', map()} | 'batch'.
|
||||
tx_type(T) ->
|
||||
case T of
|
||||
_ when T==batch;
|
||||
@@ -506,8 +523,8 @@ commit_and_pop(Res) ->
|
||||
end
|
||||
end.
|
||||
|
||||
-spec abort_and_pop(atom(), any()) -> no_return().
|
||||
abort_and_pop(Cat, Err) ->
|
||||
-spec abort_and_pop(boolean(), atom(), any()) -> no_return().
|
||||
abort_and_pop(Compat, Cat, Err) ->
|
||||
%% We can pop the context right away, since there is no
|
||||
%% complex failure handling (like retry-on-busy) for rollback.
|
||||
#{activity := #{type := Type, handle := H}} = pop_ctxt(),
|
||||
@@ -515,23 +532,29 @@ abort_and_pop(Cat, Err) ->
|
||||
tx -> ok = rdb_transaction_rollback(H);
|
||||
batch -> ok = release_batches(H)
|
||||
end,
|
||||
return_abort(Type, Cat, Err).
|
||||
return_abort(Compat, Type, Cat, Err).
|
||||
|
||||
-spec return_abort(batch | tx, atom(), any()) -> no_return().
|
||||
return_abort(batch, Cat, Err) ->
|
||||
-spec return_abort(boolean(), batch | tx, atom(), any()) -> no_return().
|
||||
return_abort(_, batch, Cat, Err) ->
|
||||
re_throw(Cat, Err);
|
||||
return_abort(tx, Cat, Err) ->
|
||||
case mnesia_compatible_aborts() of
|
||||
return_abort(Compat, tx, Cat, Err) ->
|
||||
case Compat of
|
||||
true ->
|
||||
%% Mnesia always captures stack traces, but this could actually become a
|
||||
%% performance issue in some cases (generally, it's better not to lazily
|
||||
%% produce stack traces.) Since we want to pushe the option checking for
|
||||
%% produce stack traces.) Since we want to push the option checking for
|
||||
%% mnesia-abort-style compatibility to AFTER detecting an abort, we don't
|
||||
%% order a stack trace initially, and instead insert an empty list.
|
||||
%% (The exact stack trace wouldn't be the same anyway.)
|
||||
%% NOTE: This behavior changed in mnesia-4.23.5.1, so if we really want
|
||||
%% to stay compatible, we have to special-case things.
|
||||
Err1 =
|
||||
case Cat of
|
||||
error -> {fix_error(Err), []};
|
||||
error ->
|
||||
case newer_mnesia() of
|
||||
true -> fix_error(Err);
|
||||
false -> {fix_error(Err), []}
|
||||
end;
|
||||
exit -> fix_error(Err);
|
||||
throw -> {throw, Err}
|
||||
end,
|
||||
@@ -540,6 +563,11 @@ return_abort(tx, Cat, Err) ->
|
||||
re_throw(Cat, Err)
|
||||
end.
|
||||
|
||||
%% Some rollback return values changed in mnesia in vsn 4.23.5.1
|
||||
newer_mnesia() ->
|
||||
{ok, Vsn} = application:get_key(mnesia, vsn),
|
||||
("4.23.5" < Vsn).
|
||||
|
||||
-spec re_throw(atom(), any()) -> no_return().
|
||||
re_throw(Cat, Err) ->
|
||||
case Cat of
|
||||
@@ -1332,6 +1360,55 @@ select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit
|
||||
select(Cont) ->
|
||||
mrdb_select:select(Cont).
|
||||
|
||||
%% @doc Produce a match specification for select(), supporting map-based match patterns
|
||||
%%
|
||||
%% Using record syntax in match patterns tends to conflict with type checking. This
|
||||
%% function offers an alternative approach, drawing on the fact that mnesia_rocksdb
|
||||
%% keeps the record name and attribute names readily available as persistent terms.
|
||||
%%
|
||||
%% When using the map-based representation, the match pattern is built by matching
|
||||
%% attribute names to map elements; any attribute not found in the map gets set to '_'.
|
||||
%% Thus, ```[{#balance{key = {Acct,'$1'},_='_'},[{'>=','$1',Height}],['$_']}]''' can be
|
||||
%% created as ```ms(balance,[{#{key => {Acct,'$1'}},[{'>=','$1',Height}],['$_']}])'''.
|
||||
%%
|
||||
%% This has the advantage over `ms_transform' that it can handle bound variables
|
||||
%% in the match pattern. The function works on all mnesia table types.
|
||||
%% @end
|
||||
-spec ms(ref_or_tab(), [{match_pattern(), [_], [_]}]) -> ets:match_spec().
|
||||
ms(Tab, Pat) ->
|
||||
#{ attributes := Attrs
|
||||
, record_name := RecName } = any_tab_props(Tab),
|
||||
[{headpat(RecName, Attrs, Hd), Gs, Body}
|
||||
|| {Hd, Gs, Body} <- Pat].
|
||||
|
||||
any_tab_props(Tab) ->
|
||||
try mrdb_props(Tab)
|
||||
catch
|
||||
exit:{aborted,{bad_type,_}} ->
|
||||
mnesia_props(Tab)
|
||||
end.
|
||||
|
||||
mrdb_props(Tab) ->
|
||||
#{properties := Props} = get_ref(Tab),
|
||||
Props.
|
||||
|
||||
mnesia_props(Tab) ->
|
||||
try mnesia_props_(Tab)
|
||||
catch
|
||||
exit:{aborted, _} ->
|
||||
mnesia:abort({bad_type, Tab})
|
||||
end.
|
||||
|
||||
mnesia_props_(Tab) ->
|
||||
#{ record_name => mnesia:table_info(Tab, record_name)
|
||||
, attributes => mnesia:table_info(Tab, attributes) }.
|
||||
|
||||
headpat(RecName, Attrs, Hd) when is_map(Hd) ->
|
||||
list_to_tuple([RecName | [maps:get(A, Hd, '_')
|
||||
|| A <- Attrs]]);
|
||||
headpat(_, _, Hd) when is_tuple(Hd); is_atom(Hd) ->
|
||||
Hd.
|
||||
|
||||
clear_table(Tab) ->
|
||||
match_delete(Tab, '_').
|
||||
|
||||
|
||||
@@ -235,6 +235,7 @@ compare_txs(Type, F) ->
|
||||
ct:log("Mrdb = ~p/~p", [Type, EMr]),
|
||||
case {Type, EMn, EMr} of
|
||||
{error, {some_value, [_|_]}, {some_value, []}} -> ok;
|
||||
{error, E, E} -> ok;
|
||||
{throw, {throw, some_value}, {throw, some_value}} -> ok;
|
||||
{exit, some_value, some_value} -> ok;
|
||||
{abort, some_value, some_value} -> ok
|
||||
@@ -277,21 +278,30 @@ mrdb_abort(Config) ->
|
||||
mrdb:insert(tx_abort, {tx_abort, a, 1}),
|
||||
Pre = mrdb:read(tx_abort, a),
|
||||
D0 = get_dict(),
|
||||
ActivityF = fun() ->
|
||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||
error(abort_here),
|
||||
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
|
||||
noooo
|
||||
end,
|
||||
TRes = try mrdb:activity(
|
||||
{tx, #{mnesia_compatible => true}}, rdb,
|
||||
fun() ->
|
||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||
error(abort_here),
|
||||
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
|
||||
noooo
|
||||
end)
|
||||
ActivityF)
|
||||
catch
|
||||
error:abort_here ->
|
||||
exit:{aborted, {abort_here, []}} ->
|
||||
ok
|
||||
end,
|
||||
dictionary_unchanged(D0),
|
||||
ok = TRes,
|
||||
Pre = mrdb:read(tx_abort, a),
|
||||
TRes1 = try mrdb:activity(tx, rdb, ActivityF)
|
||||
catch
|
||||
error:{abort_here, [_|_]} ->
|
||||
ok
|
||||
end,
|
||||
dictionary_unchanged(D0),
|
||||
ok = TRes1,
|
||||
Pre = mrdb:read(tx_abort, a),
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user