Compare commits
No commits in common. "uw-erlang-rocksdb-dist" and "master" have entirely different histories.
uw-erlang-
...
master
@ -4,7 +4,7 @@
|
|||||||
{deps,
|
{deps,
|
||||||
[
|
[
|
||||||
{sext, "1.8.0"},
|
{sext, "1.8.0"},
|
||||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref, "09df04e"}}},
|
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||||
{hut, "1.4.0"}
|
{hut, "1.4.0"}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||||
{<<"rocksdb">>,
|
{<<"rocksdb">>,
|
||||||
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||||
{ref,"09df04e02f250a5075bb93cafdd3b637cab81d96"}},
|
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||||
0},
|
0},
|
||||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||||
[
|
[
|
||||||
|
|||||||
55
src/mrdb.erl
55
src/mrdb.erl
@ -99,11 +99,6 @@
|
|||||||
, encode_val/2
|
, encode_val/2
|
||||||
, decode_val/3 ]).
|
, decode_val/3 ]).
|
||||||
|
|
||||||
%% TODO: OTP 28.3 dialyzer is quite pesky about opaque types. Find a way
|
|
||||||
%% to structure this, and firm up the type specs.
|
|
||||||
-dialyzer([no_opaque, no_return]).
|
|
||||||
-dialyzer([{nowarn_function, [activity/3]}]).
|
|
||||||
|
|
||||||
-export_type( [ mrdb_iterator/0
|
-export_type( [ mrdb_iterator/0
|
||||||
, itr_handle/0
|
, itr_handle/0
|
||||||
, iterator_action/0
|
, iterator_action/0
|
||||||
@ -184,7 +179,7 @@
|
|||||||
-type attr_pos() :: #{atom() := pos()}.
|
-type attr_pos() :: #{atom() := pos()}.
|
||||||
|
|
||||||
-type db_ref() :: #{ name => table()
|
-type db_ref() :: #{ name => table()
|
||||||
, alias => alias()
|
, alias => atom()
|
||||||
, vsn => non_neg_integer()
|
, vsn => non_neg_integer()
|
||||||
, db_ref := db_handle()
|
, db_ref := db_handle()
|
||||||
, cf_handle := cf_handle()
|
, cf_handle := cf_handle()
|
||||||
@ -199,11 +194,6 @@
|
|||||||
, activity => activity()
|
, activity => activity()
|
||||||
, _ => _}.
|
, _ => _}.
|
||||||
|
|
||||||
-type context() :: #{ 'activity' => activity()
|
|
||||||
, 'alias' => alias()
|
|
||||||
, 'retries' => retries()
|
|
||||||
, 'db_ref' => db_ref() }.
|
|
||||||
|
|
||||||
-type error() :: {error, any()}.
|
-type error() :: {error, any()}.
|
||||||
|
|
||||||
-type ref_or_tab() :: table() | db_ref().
|
-type ref_or_tab() :: table() | db_ref().
|
||||||
@ -295,7 +285,7 @@ release_snapshot(SHandle) ->
|
|||||||
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
||||||
%% `batch | async_dirty | sync_dirty' are synonyms.
|
%% `batch | async_dirty | sync_dirty' are synonyms.
|
||||||
%% @end
|
%% @end
|
||||||
-spec activity(activity_type(), alias(), fun( () -> any() )) -> any().
|
-spec activity(activity_type(), alias(), fun( () -> Res )) -> Res.
|
||||||
activity(Type, Alias, F) ->
|
activity(Type, Alias, F) ->
|
||||||
#{db_ref := DbRef} = ensure_ref({admin, Alias}),
|
#{db_ref := DbRef} = ensure_ref({admin, Alias}),
|
||||||
Ctxt = case tx_type(Type) of
|
Ctxt = case tx_type(Type) of
|
||||||
@ -313,7 +303,6 @@ activity(Type, Alias, F) ->
|
|||||||
end,
|
end,
|
||||||
do_activity(F, Alias, Ctxt).
|
do_activity(F, Alias, Ctxt).
|
||||||
|
|
||||||
-spec do_activity(fun(() -> any()), alias(), context()) -> any().
|
|
||||||
do_activity(F, Alias, Ctxt) ->
|
do_activity(F, Alias, Ctxt) ->
|
||||||
try try_f(F, Ctxt)
|
try try_f(F, Ctxt)
|
||||||
catch
|
catch
|
||||||
@ -330,11 +319,11 @@ try_f(false, F, Ctxt) ->
|
|||||||
commit_and_pop(Res)
|
commit_and_pop(Res)
|
||||||
catch
|
catch
|
||||||
throw:Something ->
|
throw:Something ->
|
||||||
abort_and_pop(false, throw, Something);
|
abort_and_pop(throw, Something);
|
||||||
Cat:Err:T ->
|
Cat:Err:T ->
|
||||||
%% Without capturing the stacktract here,
|
%% Without capturing the stacktract here,
|
||||||
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||||
abort_and_pop(false, Cat, {Err, T})
|
abort_and_pop(Cat, {Err, T})
|
||||||
end;
|
end;
|
||||||
try_f(true, F, Ctxt) ->
|
try_f(true, F, Ctxt) ->
|
||||||
try run_f(F, Ctxt) of
|
try run_f(F, Ctxt) of
|
||||||
@ -342,11 +331,11 @@ try_f(true, F, Ctxt) ->
|
|||||||
commit_and_pop(Res)
|
commit_and_pop(Res)
|
||||||
catch
|
catch
|
||||||
throw:Something ->
|
throw:Something ->
|
||||||
abort_and_pop(true, throw, Something);
|
abort_and_pop(throw, Something);
|
||||||
Cat:Err ->
|
Cat:Err ->
|
||||||
%% Without capturing the stacktract here,
|
%% Without capturing the stacktract here,
|
||||||
%% debugging gets pretty difficult
|
%% debugging gets pretty difficult
|
||||||
abort_and_pop(true, Cat, Err)
|
abort_and_pop(Cat, Err)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
@ -379,7 +368,7 @@ retry_activity(F, Alias, #{activity := #{ type := Type
|
|||||||
retry_activity(F, Alias, Ctxt1)
|
retry_activity(F, Alias, Ctxt1)
|
||||||
end;
|
end;
|
||||||
error ->
|
error ->
|
||||||
return_abort(mnesia_compatible_aborts(Ctxt), Type, error, retry_limit)
|
return_abort(Type, error, retry_limit)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_activity_(inner, F, Alias, Ctxt) ->
|
retry_activity_(inner, F, Alias, Ctxt) ->
|
||||||
@ -436,7 +425,6 @@ current_context() ->
|
|||||||
undefined
|
undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec tx_type(activity_type()) -> {'tx', map()} | 'batch'.
|
|
||||||
tx_type(T) ->
|
tx_type(T) ->
|
||||||
case T of
|
case T of
|
||||||
_ when T==batch;
|
_ when T==batch;
|
||||||
@ -523,8 +511,8 @@ commit_and_pop(Res) ->
|
|||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec abort_and_pop(boolean(), atom(), any()) -> no_return().
|
-spec abort_and_pop(atom(), any()) -> no_return().
|
||||||
abort_and_pop(Compat, Cat, Err) ->
|
abort_and_pop(Cat, Err) ->
|
||||||
%% We can pop the context right away, since there is no
|
%% We can pop the context right away, since there is no
|
||||||
%% complex failure handling (like retry-on-busy) for rollback.
|
%% complex failure handling (like retry-on-busy) for rollback.
|
||||||
#{activity := #{type := Type, handle := H}} = pop_ctxt(),
|
#{activity := #{type := Type, handle := H}} = pop_ctxt(),
|
||||||
@ -532,29 +520,23 @@ abort_and_pop(Compat, Cat, Err) ->
|
|||||||
tx -> ok = rdb_transaction_rollback(H);
|
tx -> ok = rdb_transaction_rollback(H);
|
||||||
batch -> ok = release_batches(H)
|
batch -> ok = release_batches(H)
|
||||||
end,
|
end,
|
||||||
return_abort(Compat, Type, Cat, Err).
|
return_abort(Type, Cat, Err).
|
||||||
|
|
||||||
-spec return_abort(boolean(), batch | tx, atom(), any()) -> no_return().
|
-spec return_abort(batch | tx, atom(), any()) -> no_return().
|
||||||
return_abort(_, batch, Cat, Err) ->
|
return_abort(batch, Cat, Err) ->
|
||||||
re_throw(Cat, Err);
|
re_throw(Cat, Err);
|
||||||
return_abort(Compat, tx, Cat, Err) ->
|
return_abort(tx, Cat, Err) ->
|
||||||
case Compat of
|
case mnesia_compatible_aborts() of
|
||||||
true ->
|
true ->
|
||||||
%% Mnesia always captures stack traces, but this could actually become a
|
%% Mnesia always captures stack traces, but this could actually become a
|
||||||
%% performance issue in some cases (generally, it's better not to lazily
|
%% performance issue in some cases (generally, it's better not to lazily
|
||||||
%% produce stack traces.) Since we want to push the option checking for
|
%% produce stack traces.) Since we want to pushe the option checking for
|
||||||
%% mnesia-abort-style compatibility to AFTER detecting an abort, we don't
|
%% mnesia-abort-style compatibility to AFTER detecting an abort, we don't
|
||||||
%% order a stack trace initially, and instead insert an empty list.
|
%% order a stack trace initially, and instead insert an empty list.
|
||||||
%% (The exact stack trace wouldn't be the same anyway.)
|
%% (The exact stack trace wouldn't be the same anyway.)
|
||||||
%% NOTE: This behavior changed in mnesia-4.23.5.1, so if we really want
|
|
||||||
%% to stay compatible, we have to special-case things.
|
|
||||||
Err1 =
|
Err1 =
|
||||||
case Cat of
|
case Cat of
|
||||||
error ->
|
error -> {fix_error(Err), []};
|
||||||
case newer_mnesia() of
|
|
||||||
true -> fix_error(Err);
|
|
||||||
false -> {fix_error(Err), []}
|
|
||||||
end;
|
|
||||||
exit -> fix_error(Err);
|
exit -> fix_error(Err);
|
||||||
throw -> {throw, Err}
|
throw -> {throw, Err}
|
||||||
end,
|
end,
|
||||||
@ -563,11 +545,6 @@ return_abort(Compat, tx, Cat, Err) ->
|
|||||||
re_throw(Cat, Err)
|
re_throw(Cat, Err)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Some rollback return values changed in mnesia in vsn 4.23.5.1
|
|
||||||
newer_mnesia() ->
|
|
||||||
{ok, Vsn} = application:get_key(mnesia, vsn),
|
|
||||||
("4.23.5" < Vsn).
|
|
||||||
|
|
||||||
-spec re_throw(atom(), any()) -> no_return().
|
-spec re_throw(atom(), any()) -> no_return().
|
||||||
re_throw(Cat, Err) ->
|
re_throw(Cat, Err) ->
|
||||||
case Cat of
|
case Cat of
|
||||||
|
|||||||
@ -235,7 +235,6 @@ compare_txs(Type, F) ->
|
|||||||
ct:log("Mrdb = ~p/~p", [Type, EMr]),
|
ct:log("Mrdb = ~p/~p", [Type, EMr]),
|
||||||
case {Type, EMn, EMr} of
|
case {Type, EMn, EMr} of
|
||||||
{error, {some_value, [_|_]}, {some_value, []}} -> ok;
|
{error, {some_value, [_|_]}, {some_value, []}} -> ok;
|
||||||
{error, E, E} -> ok;
|
|
||||||
{throw, {throw, some_value}, {throw, some_value}} -> ok;
|
{throw, {throw, some_value}, {throw, some_value}} -> ok;
|
||||||
{exit, some_value, some_value} -> ok;
|
{exit, some_value, some_value} -> ok;
|
||||||
{abort, some_value, some_value} -> ok
|
{abort, some_value, some_value} -> ok
|
||||||
@ -278,30 +277,21 @@ mrdb_abort(Config) ->
|
|||||||
mrdb:insert(tx_abort, {tx_abort, a, 1}),
|
mrdb:insert(tx_abort, {tx_abort, a, 1}),
|
||||||
Pre = mrdb:read(tx_abort, a),
|
Pre = mrdb:read(tx_abort, a),
|
||||||
D0 = get_dict(),
|
D0 = get_dict(),
|
||||||
ActivityF = fun() ->
|
|
||||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
|
||||||
error(abort_here),
|
|
||||||
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
|
|
||||||
noooo
|
|
||||||
end,
|
|
||||||
TRes = try mrdb:activity(
|
TRes = try mrdb:activity(
|
||||||
{tx, #{mnesia_compatible => true}}, rdb,
|
{tx, #{mnesia_compatible => true}}, rdb,
|
||||||
ActivityF)
|
fun() ->
|
||||||
|
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||||
|
error(abort_here),
|
||||||
|
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
|
||||||
|
noooo
|
||||||
|
end)
|
||||||
catch
|
catch
|
||||||
exit:{aborted, {abort_here, []}} ->
|
error:abort_here ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
dictionary_unchanged(D0),
|
dictionary_unchanged(D0),
|
||||||
ok = TRes,
|
ok = TRes,
|
||||||
Pre = mrdb:read(tx_abort, a),
|
Pre = mrdb:read(tx_abort, a),
|
||||||
TRes1 = try mrdb:activity(tx, rdb, ActivityF)
|
|
||||||
catch
|
|
||||||
error:{abort_here, [_|_]} ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
dictionary_unchanged(D0),
|
|
||||||
ok = TRes1,
|
|
||||||
Pre = mrdb:read(tx_abort, a),
|
|
||||||
delete_tabs(Created),
|
delete_tabs(Created),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user