Compare commits

..

No commits in common. "master" and "2.0.1" have entirely different histories.

18 changed files with 177 additions and 810 deletions

View File

@ -2,35 +2,15 @@ version: 2.1
executors:
aebuilder:
parameters:
otp:
type: string
docker:
- image: aeternity/builder:focal-<< parameters.otp >>
- image: aeternity/builder
user: builder
environment:
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
jobs:
build_and_test:
parameters:
otp:
type: string
executor:
name: aebuilder
otp: << parameters.otp >>
build:
executor: aebuilder
steps:
- checkout
- run: make test
- store_artifacts:
path: _build/test/logs
workflows:
commit:
jobs:
- build_and_test:
matrix:
parameters:
otp: ["otp24", "otp25", "otp26"]

View File

@ -131,18 +131,6 @@ depend on having an up to date size count at all times, you need to maintain
it yourself. If you only need the size occasionally, you may traverse the
table to count the elements.
When `mrdb` transactions abort, they will return a stacktrace caught
from within the transaction fun, giving much better debugging info.
This is different from how mnesia does it.
If behavior closer to mnesia's abort returns are needed, say, for backwards
compatibility, this can be controlled by setting the environment variable
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
For really performance-critical transactions which may abort often, it might
make a difference to set this option to `true`, since there is a cost involved
in producing stacktraces.
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###

View File

@ -2,5 +2,4 @@
{application,mnesia_rocksdb}.
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.

View File

@ -4,8 +4,8 @@
{deps,
[
{sext, "1.8.0"},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
{hut, "1.4.0"}
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
{hut, "1.3.0"}
]}.
{xref_checks, [
@ -14,10 +14,6 @@
deprecated_function_calls
]}.
{dialyzer, [{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
]}.
{profiles,
[
{test,

View File

@ -1,4 +1,11 @@
%% -*- erlang-mode -*-
case os:getenv("ERLANG_ROCKSDB_OPTS") of
false ->
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
_ ->
%% If manually set, we assume it's throught through
skip
end.
case os:getenv("DEBUG") of
"true" ->
Opts = proplists:get_value(erl_opts, CONFIG, []),

View File

@ -1,15 +1,15 @@
{"1.2.0",
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
{<<"rocksdb">>,
{git,"https://github.com/emqx/erlang-rocksdb.git",
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
0},
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
[
{pkg_hash,[
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
{pkg_hash_ext,[
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
].

BIN
rebar3

Binary file not shown.

View File

@ -346,11 +346,14 @@ semantics(_Alias, index_fun) -> fun index_f/4;
semantics(_Alias, _) -> undefined.
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
case info(Alias, Tab, {index_consistent, PosInfo}) of
true -> true;
_ -> false
end.
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
when is_boolean(Bool) ->
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
%% PRIVATE FUN
@ -451,13 +454,8 @@ close_table(Alias, Tab) ->
error ->
ok;
_ ->
case get(mnesia_dumper_dets) of
undefined ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab);
_ ->
ok
end
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab)
end.
close_table_(Alias, Tab) ->
@ -800,7 +798,7 @@ handle_call({create_table, Tab, Props}, _From,
exit:{aborted, Error} ->
{reply, {aborted, Error}, St}
end;
handle_call({load_table, _LoadReason, Props}, _,
handle_call({load_table, _LoadReason, Props}, _From,
#st{alias = Alias, tab = Tab} = St) ->
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{status = active}};
@ -828,7 +826,7 @@ handle_call({delete, Key}, _From, St) ->
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
Res = mrdb:match_delete(get_ref(Tab), Pat),
{reply, Res, St};
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
{reply, ok, St#st{status = undefined}};
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->

View File

@ -33,7 +33,6 @@
, read_info/2 %% (Alias, Tab)
, read_info/4 %% (Alias, Tab, Key, Default)
, write_info/4 %% (Alias, Tab, Key, Value)
, delete_info/3 %% (Alias, Tab, Key)
, write_table_property/3 %% (Alias, Tab, Property)
]).
@ -281,20 +280,9 @@ write_info(Alias, Tab, K, V) ->
write_info_(get_ref({admin, Alias}), Tab, K, V).
write_info_(Ref, Tab, K, V) ->
write_info_encv(Ref, Tab, K, term_to_binary(V)).
write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) ->
delete_info_(get_ref({admin, Alias}), Tab, K).
delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
maybe_delete_standalone_info(Ref, K),
mrdb:rdb_delete(Ref, EncK, []).
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
@ -307,16 +295,6 @@ maybe_write_standalone_info(Ref, K, V) ->
ok
end.
maybe_delete_standalone_info(Ref, K) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
rocksdb:delete(DbRef, Key, []);
_ ->
ok
end.
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}).
@ -345,6 +323,7 @@ call(Alias, Req, Timeout) ->
end.
start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
@ -426,14 +405,12 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
%% We need to store the persistent ref explicitly here,
%% since mnesia knows nothing of our admin table.
AdminTab = {admin, Alias},
Stats = mrdb_stats:new(),
CfI = update_cf_info(AdminTab, #{ status => open
, name => AdminTab
, vsn => ?VSN
, encoding => {sext,{value,term}}
, attr_pos => #{key => 1,
value => 2}
, stats => Stats
, mountpoint => MP
, properties =>
#{ attributes => [key, val]
@ -533,17 +510,12 @@ intersection(A, B) ->
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
case find_cf(Alias, Name, Backend, St) of
{ok, TRec} ->
{reply, {ok, TRec}, St};
error ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
{error, _} = Error ->
{reply, Error, St}
end
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, St1};
{error, _} = Error ->
{reply, Error, St}
end;
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
try
@ -656,14 +628,8 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
case {Op, lists:member(I, Index)} of
{delete, true} ->
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
delete_info(Alias, Main, {index_consistent, I}),
maybe_update_pt(Main, CfM1),
update_cf(Alias, Main, CfM1, St);
{create, _} ->
%% Due to a previous bug, this marker might linger
%% In any case, it mustn't be there for a newly created index
delete_info(Alias, Main, {index_consistent, I}),
St;
_ ->
St
end;
@ -674,7 +640,6 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
maybe_update_main(_, _, _, St) ->
St.
%% The pt may not have been created yet. If so, don't do it here.
maybe_update_pt(Name, Ref) ->
case get_pt(Name, error) of
@ -1258,18 +1223,7 @@ load_info_(Res, I, ARef, Tab) ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_encv(ARef, Tab, DecK, V);
<<131,_/binary>> = Value ->
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
try binary_to_term(Value) of
_DecVal ->
%% We haven't been storing erlang-term encoded data as info,
%% so assume this is double-encoded and correct
write_info_encv(ARef, Tab, DecK, Value)
catch
error:_ ->
skip
end;
write_info_(ARef, Tab, DecK, V);
_ ->
skip
end,
@ -1489,26 +1443,24 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
%% not yet created
CFs = cfs(CFs0),
file:make_dir(MP),
OpenRes = rocksdb_open(MP, Opts, CFs),
OpenOpts = [ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ],
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
map_cfs(OpenRes, CFs, Alias, Acc0);
false ->
{error, enoent};
true ->
%% Assumption: even an old rocksdb database file will have at least "default"
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
end.
open_opts(Opts) ->
[ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ].
rocksdb_open(MP, Opts, CFs) ->
%% rocksdb:open(MP, Opts, CFs),
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
is_open(Alias, #st{backends = Bs}) ->
case maps:find(Alias, Bs) of

View File

@ -42,5 +42,4 @@ start_link() ->
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.

View File

@ -53,7 +53,6 @@
, delete/2 , delete/3
, delete_object/2, delete_object/3
, match_delete/2
, merge/3 , merge/4
, clear_table/1
, batch_write/2 , batch_write/3
, update_counter/3, update_counter/4
@ -106,8 +105,6 @@
-include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl").
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
-type tab_name() :: atom().
-type alias() :: atom().
-type admin_tab() :: {admin, alias()}.
@ -118,9 +115,7 @@
| index()
| retainer().
-type inner() :: non_neg_integer().
-type outer() :: non_neg_integer().
-type retries() :: outer() | {inner(), outer()}.
-type retries() :: non_neg_integer().
%% activity type 'ets' makes no sense in this context
-type mnesia_activity_type() :: transaction
@ -146,7 +141,7 @@
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := 'undefined' | retries() }.
, attempt := non_neg_integer() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
@ -259,14 +254,6 @@ release_snapshot(SHandle) ->
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
%% the commit set. It will then be re-run again, until the retry count is exhausted.
%%
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
%% attempt, but only on outer retries (the first retry is always an outer retry).
%%
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
%%
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
@ -282,92 +269,41 @@ activity(Type, Alias, F) ->
#{ alias => Alias
, db_ref => DbRef }, TxCtxt);
batch ->
Batch = init_batch_ref(DbRef),
Batch = get_batch_(DbRef),
#{ activity => #{ type => batch
, handle => Batch }
, alias => Alias
, db_ref => DbRef }
end,
do_activity(F, Alias, Ctxt).
do_activity(F, Alias, Ctxt, false).
do_activity(F, Alias, Ctxt) ->
try try_f(F, Ctxt)
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt)
end.
try_f(F, Ctxt) ->
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
try_f(false, F, Ctxt) ->
try run_f(F, Ctxt) of
do_activity(F, Alias, Ctxt, WithLock) ->
try run_f(F, Ctxt, WithLock, Alias) of
Res ->
commit_and_pop(Res)
try commit_and_pop(Res)
catch
throw:{?MODULE, busy} ->
do_activity(F, Alias, Ctxt, true)
end
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err:T ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult. Incompatible with mnesia, though.
abort_and_pop(Cat, {Err, T})
end;
try_f(true, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult
abort_and_pop(Cat, Err)
end.
run_f(F, Ctxt) ->
run_f(F, Ctxt, false, _) ->
push_ctxt(Ctxt),
F().
F();
run_f(F, Ctxt, true, Alias) ->
mrdb_mutex:do(
Alias,
fun() ->
push_ctxt(incr_attempt(Ctxt)),
F()
end).
incr_attempt(0, {_,O}) when O > 0 ->
{outer, {0,1}};
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
is_integer(Ri), is_integer(Ro) ->
if I < Ri -> {inner, {I+1, O}};
O < Ro -> {outer, {0, O+1}};
true ->
error
end;
incr_attempt(_, _) ->
error.
retry_activity(F, Alias, #{activity := #{ type := Type
, attempt := A
, retries := R} = Act} = Ctxt) ->
case incr_attempt(A, R) of
{RetryCtxt, A1} ->
Act1 = Act#{attempt := A1},
Ctxt1 = Ctxt#{activity := Act1},
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt1)
end;
error ->
return_abort(Type, error, retry_limit)
end.
retry_activity_(inner, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, inner_retries, 1),
try_f(F, Ctxt);
retry_activity_(outer, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, outer_retries, 1),
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Act1 = Act#{handle := TxH},
Act1 = Act#{attempt := A+1, handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of
true ->
@ -432,7 +368,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
check_tx_opts(Opts) ->
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
case maps:without([no_snapshot, retries], Opts) of
Other when map_size(Other) > 0 ->
abort({invalid_tx_opts, maps:keys(Other)});
_ ->
@ -440,14 +376,10 @@ check_tx_opts(Opts) ->
end.
check_retries(#{retries := Retries} = Opts) ->
case Retries of
_ when is_integer(Retries), Retries >= 0 ->
Opts#{retries := {0, Retries}};
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
Inner >= 0, Outer >= 0 ->
Opts;
_ ->
error({invalid_tx_option, {retries, Retries}})
if is_integer(Retries), Retries >= 0 ->
Opts;
true ->
error({invalid_tx_option, {retries, Retries}})
end.
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
@ -462,7 +394,7 @@ create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 0 })}.
, attempt => 1})}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of
@ -482,7 +414,8 @@ commit_and_pop(Res) ->
Res;
{error, {error, "Resource busy" ++ _ = Busy}} ->
case A of
#{retries := {I,O}} when I > 0; O > 0 ->
#{retries := Retries, attempt := Att}
when Att =< Retries ->
throw({?MODULE, busy});
_ ->
error({error, Busy})
@ -544,11 +477,6 @@ re_throw(Cat, Err) ->
mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
Bool;
mnesia_compatible_aborts(_) ->
mnesia_compatible_aborts().
fix_error({aborted, Err}) ->
Err;
fix_error(Err) ->
@ -579,13 +507,10 @@ rdb_write_batch_and_pop(BatchRef, C) ->
pop_ctxt()
end.
rdb_release_batch(?BATCH_REF_DUMMY) ->
ok;
rdb_release_batch(H) ->
rocksdb:release_batch(H).
%% @doc Aborts an ongoing {@link activity/2}
-spec abort(_) -> no_return().
abort(Reason) ->
case mnesia_compatible_aborts() of
true ->
@ -604,7 +529,7 @@ new_tx(#{activity := _}, _) ->
new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) ->
@ -614,7 +539,7 @@ tx_ref(Tab, TxH) ->
#{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH});
R ->
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
end.
-spec tx_commit(tx_handle() | db_ref()) -> ok.
@ -741,21 +666,6 @@ insert(Tab, Obj0, Opts) ->
EncVal = encode_val(Obj, Ref),
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
merge(Tab, Key, MergeOp) ->
merge(Tab, Key, MergeOp, []).
merge(Tab, Key, MergeOp, Opts) ->
#{encoding := Enc} = Ref = ensure_ref(Tab),
case Enc of
{_, {value, term}} ->
merge_(Ref, Key, MergeOp, Opts);
_ ->
abort(badarg)
end.
merge_(Ref, Key, MergeOp, Opts) ->
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
validate_obj(Obj, #{mode := mnesia}) ->
Obj;
validate_obj(Obj, #{attr_pos := AP,
@ -979,7 +889,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
_ when is_atom(Ix) ->
{attr_pos(Ix, Ref), ordered};
{_} ->
{Ix, ordered};
Ix;
_ when is_integer(Ix) ->
{Ix, ordered}
end,
@ -1100,20 +1010,18 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
get_batch(_) ->
{error, badarg}.
init_batch_ref(DbRef) ->
get_batch_(DbRef) ->
Ref = make_ref(),
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
{ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
Ref.
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
get_batch_(DbRef, BatchRef) ->
Key = batch_ref_key(BatchRef),
case pdict_get(Key) of
undefined ->
error(stale_batch_ref);
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
#{DbRef := Batch} ->
Batch;
Map ->
{ok, Batch} = rdb_batch(),
@ -1142,9 +1050,7 @@ write_batches(BatchRef, Opts) ->
pdict_erase(Key),
ret_batch_write_acc(
maps:fold(
fun(_, ?BATCH_REF_DUMMY, Acc) ->
Acc;
(DbRef, Batch, Acc) ->
fun(DbRef, Batch, Acc) ->
case rocksdb:write_batch(DbRef, Batch, Opts) of
ok ->
Acc;
@ -1588,9 +1494,8 @@ rdb_iterator(R) -> rdb_iterator(R, []).
rdb_iterator(R, Opts) ->
rdb_iterator_(R, read_opts(R, Opts)).
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
%% Note: versions before 1.8.0 expected DbRef as first argument
rocksdb:transaction_iterator(TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
rocksdb:iterator(DbRef, CfH, ROpts).

View File

@ -6,14 +6,11 @@
, iterator_move/2
, iterator/2
, iterator_close/1
, fold/4
, rev_fold/4
, index_ref/2
]).
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
-record(mrdb_ix_iter, { i :: mrdb:iterator()
, type = set :: set | bag
, sub :: pid() | mrdb:db_ref()
, sub :: mrdb:ref() | pid()
}).
-type ix_iterator() :: #mrdb_ix_iter{}.
@ -22,7 +19,7 @@
-type object() :: tuple().
-record(subst, { i :: mrdb:mrdb_iterator()
-record(subst, { i :: mrdb:iterator()
, vals_f
, cur
, mref }).
@ -65,41 +62,6 @@ iterator(Tab, IxPos) ->
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Folds over the index table corresponding to Tab and IxPos.
%% The fold fun is called with the index key and the corresponding object,
%% or [] if there is no such object.
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, first, next, FoldFun, Acc).
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Like fold/4 above, but folds over the index in the reverse order.
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
with_iterator(
Tab, IxPos,
fun(I) ->
iter_fold(I, Start, Dir, FoldFun, Acc)
end).
iter_fold(I, Start, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
iter_fold_({error, _}, _, _, _, Acc) ->
Acc.
index_ref(Tab, Pos) ->
TRef = mrdb:ensure_ref(Tab),
ensure_index_ref(Pos, TRef).
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} ->
@ -121,7 +83,6 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
Other
end.
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
opt_read(R, Key) ->
case mrdb:read(R, Key, []) of
[Obj] ->

View File

@ -3,109 +3,79 @@
-export([ do/2 ]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-export([ ensure_tab/0 ]).
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
%% critical section.
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%%
%% Releasing the resource is done by notifying the server.
%%
%% Previous implementations tested:
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
%% This is a pretty simple implementation, but it lacks ordering, and appeared
%% to sometimes lead to starvation.
%% * A duplicate_bag ets table: These are defined such that insertion order is
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
%% Unfortunately, performance plummeted when tested with > 25 concurrent
%% processes. While this is likely a very high number, given that we're talking
%% about contention in a system using optimistic concurrency, it's never good
%% if performance falls off a cliff.
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
do(Rsrc, F) when is_function(F, 0) ->
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
true = claim(Rsrc),
try F()
after
mrdb_mutex_serializer:done(Rsrc, Ref)
release(Rsrc)
end.
-ifdef(TEST).
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
end.
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
setup() ->
case whereis(mrdb_mutex_serializer) of
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
end.
cleanup(Pid) ->
unlink(Pid),
exit(Pid, kill).
swarm_do() ->
Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() ->
send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,1000)],
await_pids(Pids),
Results = fetch(Pid),
{incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
Pid ! {self(), result, N};
false ->
exit(not_even)
end
end).
-endif.

View File

@ -1,98 +0,0 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.

View File

@ -1,74 +0,0 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Statistics API for the mnesia_rocksdb plugin
%%
%% Some counters are maintained for each active alias. Currently, the following
%% counters are supported:
%% * inner_retries
%% * outer_retries
%%
-module(mrdb_stats).
-export([new/0]).
-export([incr/3,
get/1,
get/2]).
-type alias() :: mnesia_rocksdb:alias().
-type db_ref() :: mrdb:db_ref().
-type counter() :: atom().
-type increment() :: integer().
-type counters() :: #{ counter() := integer() }.
new() ->
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
, meta => ctr_meta()}.
ctr_meta() ->
#{ inner_retries => 1
, outer_retries => 2 }.
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
%%
%% Note that the first argument may also be a `db_ref()' map,
%% corresponding to `mrdb:get_ref({admin, Alias})'.
%% @end
incr(Alias, Ctr, N) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
incr_(Ref, Meta, Ctr, N);
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
incr_(Ref, Meta, Ctr, N).
-spec get(alias() | db_ref(), counter()) -> integer().
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
%% @end
get(Alias, Ctr) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
get_(Ref, Meta, Ctr);
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
get_(Ref, Meta, Ctr).
-spec get(alias() | db_ref()) -> counters().
%% @doc Fetches all known counters for `Alias', in the form of a map,
%% `#{Counter => Value}'.
%% @end
get(Alias) when is_atom(Alias) ->
get_(mrdb:get_ref({admin, Alias}));
get(Ref) when is_map(Ref) ->
get_(Ref).
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
lists:foldl(
fun({K, P}, M) ->
M#{K := counters:get(Ref, P)}
end, Meta, maps:to_list(Meta)).
get_(Ref, Meta, Attr) ->
Ix = maps:get(Attr, Meta),
counters:get(Ref, Ix).
incr_(Ref, Meta, Attr, N) ->
Ix = maps:get(Attr, Meta),
counters:add(Ref, Ix, N).

View File

@ -24,12 +24,8 @@
, mrdb_abort/1
, mrdb_two_procs/1
, mrdb_two_procs_tx_restart/1
, mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1
, mrdb_three_procs/1
, create_counters/1
, update_counters/1
, restart_node/1
]).
-include_lib("common_test/include/ct.hrl").
@ -45,8 +41,7 @@ all() ->
groups() ->
[
{all_tests, [sequence], [ {group, checks}
, {group, mrdb}
, {group, counters}]}
, {group, mrdb} ]}
%% , error_handling ]}
, {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary
@ -58,13 +53,8 @@ groups() ->
, mrdb_abort
, mrdb_two_procs
, mrdb_two_procs_tx_restart
, mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap
, mrdb_three_procs ]}
, {counters, [sequence], [ create_counters
, update_counters
, restart_node
, update_counters ]}
].
@ -277,7 +267,7 @@ mrdb_abort(Config) ->
Pre = mrdb:read(tx_abort, a),
D0 = get_dict(),
TRes = try mrdb:activity(
{tx, #{mnesia_compatible => true}}, rdb,
tx, rdb,
fun() ->
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
error(abort_here),
@ -297,17 +287,10 @@ mrdb_abort(Config) ->
mrdb_two_procs(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
tr_flags(
{self(), [call, sos, p, 'receive']},
{self(), [call, sos, p]},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) ->
@ -331,16 +314,11 @@ mrdb_two_procs_(Config) ->
Pre = mrdb:read(R, a),
go_ahead_other(POther),
await_other_down(POther, MRef, ?LINE),
%% The test proc is still inside the transaction, and POther
%% has terminated/committed. If we now read 'a', we should see
%% the value that POther wrote (no isolation).
[{R, a, 17}] = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(0, POther),
go_ahead_other(1, POther),
Do0 = get_dict(),
%% Our transaction should fail due to the resource conflict, and because
%% we're not using retries.
try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of
ok -> error(unexpected)
@ -361,7 +339,6 @@ mrdb_two_procs_tx_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
check_stats(rdb),
mrdb:insert(R, {R, a, 1}),
Pre = mrdb:read(R, a),
F0 = fun() ->
@ -377,7 +354,7 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = [{R, a, 17}],
Att = get_attempt(),
Expected = case Att of
0 -> Pre;
1 -> Pre;
_ -> OtherWrite
end,
Expected = mrdb:read(R, a),
@ -386,104 +363,14 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(0, POther),
go_ahead_other(1, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
check_stats(rdb),
[{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created),
ok.
mrdb_two_procs_tx_inner_restart(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
dbg_tr_opts()).
mrdb_two_procs_tx_inner_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
mrdb:insert(R, {R, a, 1}),
mrdb:insert(R, {R, b, 1}),
PreA = mrdb:read(R, a),
PreB = mrdb:read(R, b),
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
F0 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, a, 17}),
wait_for_other(Parent, ?LINE)
end,
F1 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, b, 147}),
wait_for_other(Parent, ?LINE)
end,
Spawn = fun(F) ->
Res = spawn_opt(
fun() ->
ok = mrdb:activity(tx, rdb, F)
end, [monitor]),
Res
end,
{P1, MRef1} = Spawn(F0),
{P2, MRef2} = Spawn(F1),
F2 = fun() ->
PostA = [{R, a, 17}], % once F0 (P1) has committed
PostB = [{R, b, 147}], % once F1 (P2) has committed
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
BRes = mrdb:read(R, b),
Att = get_attempt(),
ct:log("Att = ~p", [Att]),
case Att of
0 -> % This is the first run (no retry yet)
ARes = PreA,
BRes = PreB,
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
await_other_down(P1, MRef1, ?LINE),
PostA = mrdb:read(R, a); % now, P1's write should leak through here
{0, 1} -> % This is the first (outer) retry
go_ahead_other(0, P2), % Let P2 write
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
await_other_down(P2, MRef2, ?LINE),
PostA = mrdb:read(R, a), % now we should see writes from both P1
%% On OTP 23, the following step might fail due to timing, even
%% though the trace output looks as expected. Possibly some quirk
%% with leakage propagation time in rocksdb. Let's retry to be sure.
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
{1, 1} ->
PostA = mrdb:read(R, a),
PostB = mrdb:read(R, b),
ok
end,
mrdb:insert(R, {R, a, 18}),
mrdb:insert(R, {R, b, 18})
end,
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
check_stats(rdb),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b),
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
delete_tabs(Created),
ok.
try_until(Result, F) ->
try_until(Result, F, 10).
try_until(Result, F, N) when N > 0 ->
case F() of
Result ->
ok;
_ ->
receive after 100 -> ok end,
try_until(Result, F, N-1)
end;
try_until(Result, F, _) ->
error({badmatch, {Result, F}}).
%
%% For testing purposes, we use side-effects inside the transactions
@ -496,7 +383,7 @@ try_until(Result, F, _) ->
%% attempt, and ignore the sync ops on retries.
%%
-define(IF_FIRST(N, Expr),
if N == 0 ->
if N == 1 ->
Expr;
true ->
ok
@ -526,8 +413,8 @@ mrdb_two_procs_snap(Config) ->
go_ahead_other(Att, POther),
ARes = mrdb:read(R, a),
ARes = case Att of
0 -> Pre;
_ -> [{R, a, 17}]
1 -> Pre;
2 -> [{R, a, 17}]
end,
await_other_down(POther, MRef, ?LINE),
PreB = mrdb:read(R, b),
@ -547,7 +434,7 @@ mrdb_two_procs_snap(Config) ->
%% We make sure that P2 commits before finishing the other two, and P3 and the
%% main thread sync, so as to maximize the contention for the retry lock.
mrdb_three_procs(Config) ->
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
mrdb_three_procs_(Config) ->
R = ?FUNCTION_NAME,
@ -565,7 +452,7 @@ mrdb_three_procs_(Config) ->
spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows(
0, Parent, ?LINE,
1, Parent, ?LINE,
fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end),
@ -601,8 +488,8 @@ mrdb_three_procs_(Config) ->
fun() ->
Att = get_attempt(),
ARes = case Att of
0 -> [A0];
_ -> [A1]
1 -> [A0];
2 -> [A1]
end,
%% First, ensure that P2 tx is running
go_ahead_other(Att, P2),
@ -632,7 +519,6 @@ tr_opts() ->
, {?MODULE, await_other_down, 3, x}
, {?MODULE, do_when_p_allows, 4, x}
, {?MODULE, allow_p, 3, x}
, {?MODULE, check_stats, 1, x}
]}.
light_tr_opts() ->
@ -643,23 +529,6 @@ light_tr_opts() ->
, {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts())).
dbg_tr_opts() ->
tr_flags(
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb, current_context, 0, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {?MODULE, try_until, 3, x}
, {mrdb, activity, x} ], tr_opts())).
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
Opts#{patterns => Ps ++ Pats1}.
@ -673,13 +542,12 @@ wait_for_other(Parent, L) ->
wait_for_other(Att, Parent, L) ->
wait_for_other(Att, Parent, 1000, L).
wait_for_other(0, Parent, Timeout, L) ->
wait_for_other(1, Parent, Timeout, L) ->
MRef = monitor(process, Parent),
Parent ! {self(), ready},
receive
{Parent, cont} ->
demonitor(MRef),
Parent ! {self(), cont_ack},
ok;
{'DOWN', MRef, _, _, Reason} ->
ct:log("Parent died, Reason = ~p", [Reason]),
@ -718,13 +586,7 @@ go_ahead_other(Att, POther, Timeout) ->
go_ahead_other_(POther, Timeout) ->
receive
{POther, ready} ->
POther ! {self(), cont},
receive
{POther, cont_ack} ->
ok
after Timeout ->
error(cont_ack_timeout)
end
POther ! {self(), cont}
after Timeout ->
error(go_ahead_timeout)
end.
@ -757,35 +619,6 @@ get_attempt() ->
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt.
create_counters(_Config) ->
create_tab(counters, []),
mrdb:insert(counters, {counters, c0, 1}),
mrdb:update_counter(counters, c1, 1),
[{counters, c0, 1}] = mrdb:read(counters, c0),
[{counters, c1, 1}] = mrdb:read(counters, c1),
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
ok.
restart_node(_Config) ->
mnesia:stop(),
ok = mnesia:start(),
ct:log("mnesia restarted", []),
ok.
update_counters(_Config) ->
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
ok = mrdb:update_counter(counters, c0, 1),
ok = mrdb:update_counter(counters, c1, 1),
ct:log("Incremented c0 and c1 by 1", []),
C0 = C0Prev + 1,
C1 = C1Prev + 1,
[{counters, c0, C0}] = mrdb:read(counters, c0),
[{counters, c1, C1}] = mrdb:read(counters, c1),
ct:log("c0: ~p, c1: ~p", [C0, C1]),
ok.
create_tabs(Tabs, Config) ->
Res = lists:map(fun create_tab/1, Tabs),
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
@ -812,8 +645,3 @@ dictionary_unchanged(Old) ->
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.
check_stats(Alias) ->
Stats = mrdb_stats:get(Alias),
ct:log("Stats: ~p", [Stats]),
Stats.

View File

@ -33,7 +33,6 @@
-export([
index_plugin_mgmt/1
, add_indexes/1
, delete_indexes/1
, create_bag_index/1
, create_ordered_index/1
, test_1_ram_copies/1
@ -41,12 +40,10 @@
, fail_1_disc_only/1
, plugin_ram_copies1/1
, plugin_ram_copies2/1
, plugin_ram_copies3/1
, plugin_disc_copies/1
, fail_plugin_disc_only/1
, plugin_disc_copies_bag/1
, plugin_rdb_ordered/1
, plugin_rdb_none/1
, index_iterator/1
]).
@ -76,12 +73,10 @@ run(Config) ->
{pfx},mnesia_rocksdb, ix_prefixes),
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
test_index_plugin(cfg([pl3, rdb, none], Config)),
index_plugin_mgmt(Config),
ok.
@ -99,7 +94,6 @@ groups() ->
, create_ordered_index
, index_plugin_mgmt
, add_indexes
, delete_indexes
]}
, {access, [sequence], [
test_1_ram_copies
@ -110,12 +104,10 @@ groups() ->
, {plugin, [sequence], [
plugin_ram_copies1
, plugin_ram_copies2
, plugin_ram_copies3
, plugin_disc_copies
, fail_plugin_disc_only
, plugin_disc_copies_bag
, plugin_rdb_ordered
, plugin_rdb_none
]}
].
@ -185,14 +177,11 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
test(N, Type, T) ->
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
{attributes,[k,a,b,c]},
@ -215,7 +204,7 @@ add_del_indexes() ->
test_index_plugin(Config) ->
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
{index, [ixtype(IxType)]}]),
{index, [{{pfx}, IxType}]}]),
mnesia:dirty_write({Tab, "foobar", "sentence"}),
mnesia:dirty_write({Tab, "yellow", "sensor"}),
mnesia:dirty_write({Tab, "truth", "white"}),
@ -227,27 +216,13 @@ test_index_plugin(Config) ->
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx});
IxType == ordered; IxType == none ->
IxType == ordered ->
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx})
end,
if Type == rdb ->
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mrdb:index_read(
Tab, <<"foo">>, {pfx});
true ->
ok
end.
ixtype(T) when T==bag;
T==ordered ->
{{pfx}, T};
ixtype(none) ->
{pfx}.
create_bag_index(_Config) ->
{aborted, {combine_error, _, _}} =
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
@ -264,25 +239,6 @@ add_indexes(_Config) ->
{atomic, ok} = mnesia:add_table_index(T, a),
ok.
delete_indexes(_Config) ->
T = ?TAB(t1),
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
ok = mrdb:insert(T, {T, a, 1, 1}),
ok = mrdb:insert(T, {T, b, 1, 2}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
{atomic, ok} = mnesia:del_table_index(T, a),
ok = mrdb:delete(T, b),
ok = mrdb:insert(T, {T, c, 2, 3}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
ok.
ix_fold_acc(K, V, Acc) ->
[{K, V} | Acc].
index_plugin_mgmt(_Config) ->
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),

View File

@ -1,6 +1,6 @@
-module(mrdb_bench).
-compile([export_all, nowarn_export_all]).
-compile(export_all).
init() ->
mnesia:delete_schema([node()]),