Compare commits

...

12 Commits

8 changed files with 578 additions and 310 deletions

View File

@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
i_show_table(I, Move, Limit, Ref) -> i_show_table(I, Move, Limit, Ref) ->
case rocksdb:iterator_move(I, Move) of case rocksdb:iterator_move(I, Move) of
{ok, EncKey, EncVal} -> {ok, EncKey, EncVal} ->
{Type,Val} = K = decode_key(EncKey, Ref),
case EncKey of Val = decode_val(EncVal, K, Ref),
<< ?INFO_TAG, K/binary >> -> io:fwrite("~p~n", [Val]),
K1 = decode_key(K, Ref),
V = decode_val(EncVal, K1, Ref),
{info,V};
_ ->
K = decode_key(EncKey, Ref),
V = decode_val(EncVal, K, Ref),
{data,V}
end,
io:fwrite("~p: ~p~n", [Type, Val]),
i_show_table(I, next, Limit-1, Ref); i_show_table(I, next, Limit-1, Ref);
_ -> _ ->
ok ok
@ -675,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 -> slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
#{semantics := Sem} = Ref = get_ref(Tab), #{semantics := Sem} = Ref = get_ref(Tab),
Start = case Ref of First = fun(I) -> rocksdb:iterator_move(I, first) end,
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
_ -> first
end,
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
F = case Sem of F = case Sem of
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end; bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end _ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end

View File

@ -250,31 +250,6 @@ get_info_res(Res, Default) ->
error(E) error(E)
end. end.
%% Admin info: metadata written by the admin proc to keep track of
%% the derived status of tables (such as detected version and encoding
%% of existing standalone tables.)
%%
write_admin_info(K, V, Alias, Name) ->
mrdb:rdb_put(get_ref({admin, Alias}),
admin_info_key(K, Name),
term_to_binary(V)).
read_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
_ ->
error
end.
delete_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
admin_info_key(K, Name) ->
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
%% Table metadata info maintained by users %% Table metadata info maintained by users
%% %%
write_info(Alias, Tab, K, V) -> write_info(Alias, Tab, K, V) ->
@ -285,7 +260,6 @@ write_info_(Ref, Tab, K, V) ->
write_info_encv(Ref, Tab, K, V) -> write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext), EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, V, []). mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) -> delete_info(Alias, Tab, K) ->
@ -293,30 +267,8 @@ delete_info(Alias, Tab, K) ->
delete_info_(Ref, Tab, K) -> delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext), EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
maybe_delete_standalone_info(Ref, K),
mrdb:rdb_delete(Ref, EncK, []). mrdb:rdb_delete(Ref, EncK, []).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
EncV = mnesia_rocksdb_lib:encode_val(V, term),
rocksdb:put(DbRef, Key, EncV, []);
_ ->
ok
end.
maybe_delete_standalone_info(Ref, K) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
rocksdb:delete(DbRef, Key, []);
_ ->
ok
end.
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 -> write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}). call(Alias, {write_table_property, Tab, Prop}).
@ -813,10 +765,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
false -> false ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St); create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
{false, MP} -> {false, MP} ->
create_table_as_standalone(Alias, Name, true, MP, TRec, St); create_table_as_standalone(Alias, Name, MP, TRec, St);
{true, MP} -> {true, MP} ->
?log(debug, "will create ~p as standalone and migrate", [Name]), ?log(debug, "will create ~p as standalone and migrate", [Name]),
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of case create_table_as_standalone(Alias, Name, MP, TRec, St) of
{ok, OldTRec, _} -> {ok, OldTRec, _} ->
?log(info, "Migrating ~p to column family", [Name]), ?log(info, "Migrating ~p to column family", [Name]),
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St); create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
@ -825,8 +777,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
end end
end; end;
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) -> create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
{Exists, MP} = table_exists_as_standalone(Name), {_, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St). create_table_as_standalone(Alias, Name, MP, TRec, St).
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) -> create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
?log(debug, "Migrate to cf (~p)", [Name]), ?log(debug, "Migrate to cf (~p)", [Name]),
@ -1106,29 +1058,12 @@ table_exists_as_standalone(Name) ->
end, end,
{Exists, MP}. {Exists, MP}.
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) -> create_table_as_standalone(Alias, Name, MP, TRec, St) ->
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
{ok, #{type := standalone, vsn := Vsn1,
encoding := Enc1} = Cf, _St1} = Ok ->
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
Alias, Name),
case Vsn1 of
1 ->
load_info(Alias, Name, Cf);
_ ->
skip
end,
Ok;
Other ->
Other
end.
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
Vsn = check_version(TRec), Vsn = check_version(TRec),
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)}, TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St). do_open_standalone(true, Alias, Name, MP, TRec1, St).
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0, do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
#st{standalone = Ts} = St) -> #st{standalone = Ts} = St) ->
Opts = rocksdb_opts_from_trec(TRec0), Opts = rocksdb_opts_from_trec(TRec0),
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
@ -1138,146 +1073,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
DbRec1 = DbRec#{ cfs => CfNames, DbRec1 = DbRec#{ cfs => CfNames,
mountpoint => MP }, mountpoint => MP },
TRec = maps:merge(TRec0, DbRec#{type => standalone}), TRec = maps:merge(TRec0, DbRec#{type => standalone}),
TRec1 = guess_table_vsn_and_encoding(Exists, TRec), {ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{error, _} = Err -> {error, _} = Err ->
?log(debug, "open_db error: ~p", [Err]), ?log(debug, "open_db error: ~p", [Err]),
Err Err
end. end.
%% When opening a standalone table, chances are it's a legacy table
%% where legacy encoding is already in place. We try to read the
%% first object and apply legacy encoding. If successful, we set
%% legacy encoding in the TRec. If we migrate the data to a column
%% family, we should apply the defined encoding for the cf.
%%
%% The first object can either be an info object (in the legacy case)
%% or a data object, with a sext-encoded key, and a term_to_binary-
%% encoded object as value, where the key position is set to [].
%% The info objects would be sext-encoded key + term-encoded value.
guess_table_vsn_and_encoding(false, TRec) ->
TRec;
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
alias := Alias, name := Name} = R) ->
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
{ok, {V, E}} ->
R#{vsn => V, encoding => E};
error ->
R1 = set_default_guess(R),
mrdb:with_rdb_iterator(
R1, fun(I) ->
guess_table_vsn_and_encoding_(
mrdb:rdb_iterator_move(I, first), I, As, R1)
end)
end.
set_default_guess(#{type := standalone} = R) ->
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
1 ->
R#{vsn => 1, encoding => {sext, {object, term}}};
V ->
R#{vsn => V}
end.
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
Arity = length(As) + 1,
case K of
<<?INFO_TAG, EncK/binary>> ->
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
mnesia_rocksdb_lib:decode(V, term)},
%% This is a vsn 1 standalone table
R#{vsn => 1, encoding => {sext, {object, term}}}
catch
error:_ ->
R
end;
_ ->
Enc = guess_obj_encoding(K, V, Arity),
R#{encoding => Enc}
end;
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
R.
guess_obj_encoding(K, V, Arity) ->
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
guess_encoding(Bin) ->
try {sext, sext:decode(Bin)}
catch
error:_ ->
try {term, binary_to_term(Bin)}
catch
error:_ -> raw
end
end.
guess_key_encoding(Bin) ->
case guess_encoding(Bin) of
raw -> raw;
{Enc, _} -> Enc
end.
guess_val_encoding(Bin, Arity) ->
case guess_encoding(Bin) of
raw -> {value, raw};
{Enc, Term} ->
if is_tuple(Term), size(Term) == Arity,
element(2, Term) == [] ->
{object, Enc};
true ->
{value, Enc}
end
end.
%% This is slightly different from `rocksdb:is_empty/1`, since it allows %% This is slightly different from `rocksdb:is_empty/1`, since it allows
%% for the presence of some metadata, and still considers it empty if there %% for the presence of some metadata, and still considers it empty if there
%% is no user data. %% is no user data.
table_is_empty(#{} = DbRec) -> table_is_empty(#{} = DbRec) ->
Start = iterator_data_start(DbRec),
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
DbRec, fun(I) -> DbRec, fun(I) ->
case mrdb:rdb_iterator_move(I, Start) of case mrdb:rdb_iterator_move(I, first) of
{ok, _, _} -> false; {ok, _, _} -> false;
_ -> true _ -> true
end end
end). end).
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
iterator_data_start(_) -> first.
load_info(Alias, Name, Cf) ->
ARef = get_ref({admin, Alias}),
mrdb:with_rdb_iterator(
Cf, fun(I) ->
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
end).
load_info_(Res, I, ARef, Tab) ->
case Res of
{ok, << ?INFO_TAG, K/binary >>, V} ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_encv(ARef, Tab, DecK, V);
<<131,_/binary>> = Value ->
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
try binary_to_term(Value) of
_DecVal ->
%% We haven't been storing erlang-term encoded data as info,
%% so assume this is double-encoded and correct
write_info_encv(ARef, Tab, DecK, Value)
catch
error:_ ->
skip
end;
_ ->
skip
end,
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
_ ->
ok
end.
check_version(TRec) -> check_version(TRec) ->
user_property(mrdb_version, TRec, ?VSN). user_property(mrdb_version, TRec, ?VSN).
@ -1489,24 +1302,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
%% not yet created %% not yet created
CFs = cfs(CFs0), CFs = cfs(CFs0),
file:make_dir(MP), file:make_dir(MP),
OpenOpts = [ {create_if_missing, true} OpenRes = rocksdb_open(MP, Opts, CFs),
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ],
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
map_cfs(OpenRes, CFs, Alias, Acc0); map_cfs(OpenRes, CFs, Alias, Acc0);
false -> false ->
{error, enoent}; {error, enoent};
true -> true ->
%% Assumption: even an old rocksdb database file will have at least "default" %% Assumption: even an old rocksdb database file will have at least "default"
{ok,CFs} = rocksdb:list_column_families(MP, Opts), {ok,CFs} = rocksdb:list_column_families(MP, Opts),
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0) map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
end. end.
open_opts(Opts) ->
[ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ].
rocksdb_open(MP, Opts, CFs) -> rocksdb_open(MP, Opts, CFs) ->
%% rocksdb:open(MP, Opts, CFs), %% rocksdb:open(MP, Opts, CFs),
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs). mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
is_open(Alias, #st{backends = Bs}) -> is_open(Alias, #st{backends = Bs}) ->
case maps:find(Alias, Bs) of case maps:find(Alias, Bs) of
@ -1605,7 +1420,6 @@ close_and_delete_standalone(#{alias := Alias,
case get_table_mountpoint(Alias, Name, St) of case get_table_mountpoint(Alias, Name, St) of
{ok, MP} -> {ok, MP} ->
close_and_delete(DbRef, MP), close_and_delete(DbRef, MP),
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)}; St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
error -> error ->
St St

View File

@ -53,6 +53,7 @@
, delete/2 , delete/3 , delete/2 , delete/3
, delete_object/2, delete_object/3 , delete_object/2, delete_object/3
, match_delete/2 , match_delete/2
, merge/3 , merge/4
, clear_table/1 , clear_table/1
, batch_write/2 , batch_write/3 , batch_write/2 , batch_write/3
, update_counter/3, update_counter/4 , update_counter/3, update_counter/4
@ -60,14 +61,17 @@
, get_batch/1 , get_batch/1
, snapshot/1 , snapshot/1
, release_snapshot/1 , release_snapshot/1
, first/1 , first/2 , first/1 , first/2
, next/2 , next/3 , next/2 , next/3
, prev/2 , prev/3 , prev/2 , prev/3
, last/1 , last/2 , last/1 , last/2
, select/2 , select/3 , select/2 , select/3
, select_reverse/2, select_reverse/3
, select/1 , select/1
, fold/3 , fold/4 , fold/5 , fold/3 , fold/4 , fold/5
, rdb_fold/4 , rdb_fold/5 , fold_reverse/3 , fold_reverse/4, fold_reverse/5
, rdb_fold/4 , rdb_fold/5
, rdb_fold_reverse/4, rdb_fold_reverse/5
, write_info/3 , write_info/3
, read_info/2 , read_info/2
, read_info/1 , read_info/1
@ -539,7 +543,6 @@ re_throw(Cat, Err) ->
throw -> throw(Err) throw -> throw(Err)
end. end.
mnesia_compatible_aborts() -> mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false). mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
@ -660,10 +663,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
inherit_ctxt(Ref, R) -> inherit_ctxt(Ref, R) ->
maps:merge(Ref, maps:with([snapshot, activity], R)). maps:merge(Ref, maps:with([snapshot, activity], R)).
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%% @equiv with_iterator(Tab, Fun, [])
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res. -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
with_iterator(Tab, Fun) -> with_iterator(Tab, Fun) ->
with_iterator(Tab, Fun, []). with_iterator(Tab, Fun, []).
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%%
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res. -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
with_iterator(Tab, Fun, Opts) -> with_iterator(Tab, Fun, Opts) ->
R = ensure_ref(Tab), R = ensure_ref(Tab),
@ -740,6 +758,21 @@ insert(Tab, Obj0, Opts) ->
EncVal = encode_val(Obj, Ref), EncVal = encode_val(Obj, Ref),
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts). insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
merge(Tab, Key, MergeOp) ->
merge(Tab, Key, MergeOp, []).
merge(Tab, Key, MergeOp, Opts) ->
#{encoding := Enc} = Ref = ensure_ref(Tab),
case Enc of
{_, {value, term}} ->
merge_(Ref, Key, MergeOp, Opts);
_ ->
abort(badarg)
end.
merge_(Ref, Key, MergeOp, Opts) ->
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
validate_obj(Obj, #{mode := mnesia}) -> validate_obj(Obj, #{mode := mnesia}) ->
Obj; Obj;
validate_obj(Obj, #{attr_pos := AP, validate_obj(Obj, #{attr_pos := AP,
@ -1284,6 +1317,13 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:select(ensure_ref(Tab), Pat, Limit). mrdb_select:select(ensure_ref(Tab), Pat, Limit).
select_reverse(Tab, Pat) ->
select_reverse(Tab, Pat, infinity).
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
select(Cont) -> select(Cont) ->
mrdb_select:select(Cont). mrdb_select:select(Cont).
@ -1338,6 +1378,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit). mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
fold_reverse(Tab, Fun, Acc) ->
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3) rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) -> , is_binary(Prefix) ->
rdb_fold(Tab, Fun, Acc, Prefix, infinity). rdb_fold(Tab, Fun, Acc, Prefix, infinity).
@ -1347,6 +1397,15 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit). mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
, is_binary(Prefix) ->
true = valid_limit(Limit),
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) -> valid_limit(L) ->
case L of case L of
infinity -> infinity ->
@ -1358,15 +1417,7 @@ valid_limit(L) ->
end. end.
write_info(Tab, K, V) -> write_info(Tab, K, V) ->
R = ensure_ref(Tab), #{alias := Alias} = ensure_ref(Tab),
Alias = case R of
#{type := standalone, vsn := 1, alias := A} = TRef ->
%% Also write on legacy info format
write_info_standalone(TRef, K, V),
A;
#{alias := A} ->
A
end,
write_info_(ensure_ref({admin, Alias}), Tab, K, V). write_info_(ensure_ref({admin, Alias}), Tab, K, V).
write_info_(#{} = R, Tab, K, V) -> write_info_(#{} = R, Tab, K, V) ->
@ -1383,13 +1434,8 @@ read_info(Tab, K) ->
read_info(Tab, K, Default) when K==size; K==memory -> read_info(Tab, K, Default) when K==size; K==memory ->
read_direct_info_(ensure_ref(Tab), K, Default); read_direct_info_(ensure_ref(Tab), K, Default);
read_info(Tab, K, Default) -> read_info(Tab, K, Default) ->
#{alias := Alias} = R = ensure_ref(Tab), #{alias := Alias} = ensure_ref(Tab),
case R of mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
#{type := standalone, vsn := 1} = TRef ->
read_info_standalone(TRef, K, Default);
#{alias := Alias} ->
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
end.
read_direct_info_(R, memory, _Def) -> read_direct_info_(R, memory, _Def) ->
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0); get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
@ -1413,26 +1459,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
%%rocksdb_boolean(<<"1">>) -> true; %%rocksdb_boolean(<<"1">>) -> true;
%%rocksdb_boolean(<<"0">>) -> false. %%rocksdb_boolean(<<"0">>) -> false.
write_info_standalone(#{} = R, K, V) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
EncV = term_to_binary(V),
rdb_put(R, EncK, EncV, write_opts(R, [])).
read_info_standalone(#{} = R, K, Default) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
get_info_res(Res, Default) ->
case Res of
not_found ->
Default;
{ok, Bin} ->
%% no fancy tricks when encoding/decoding info values
binary_to_term(Bin);
{error, E} ->
error(E)
end.
%% insert_bag_v2(Ref, K, V, Opts) -> %% insert_bag_v2(Ref, K, V, Opts) ->
%% rdb_merge(Ref, K, {list_append, [V]} %% rdb_merge(Ref, K, {list_append, [V]}

View File

@ -9,6 +9,10 @@
, fold/4 , fold/4
, rev_fold/4 , rev_fold/4
, index_ref/2 , index_ref/2
, select/3
, select/4
, select_reverse/3
, select_reverse/4
]). ]).
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator() -record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
@ -33,6 +37,11 @@
-export_type([ ix_iterator/0 ]). -export_type([ ix_iterator/0 ]).
-spec index_ref(mrdb:ref_or_tab(), mrdb:index_position()) -> mrdb:db_ref().
index_ref(Tab, Ix) ->
#{} = R = mrdb:ensure_ref(Tab),
ensure_index_ref(Ix, R).
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res. -spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) -> with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
{ok, I} = iterator(Tab, IxPos), {ok, I} = iterator(Tab, IxPos),
@ -88,6 +97,26 @@ fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
iter_fold(I, Start, Dir, FoldFun, Acc) iter_fold(I, Start, Dir, FoldFun, Acc)
end). end).
select(Tab, Ix, MS) ->
select(Tab, Ix, MS, infinity).
select(Tab, Ix, MS, Limit) ->
#{} = R = mrdb:ensure_ref(Tab),
#{} = IxR = ensure_index_ref(Ix, R),
MSpre = pre_match_spec(MS),
mrdb_select:select(IxR#{ derive_obj_f => mk_derive_obj_f(R)
, pre_ms => MSpre }, MS, Limit).
select_reverse(Tab, Ix, MS) ->
select_reverse(Tab, Ix, MS, infinity).
select_reverse(Tab, Ix, MS, Limit) ->
#{} = R = mrdb:ensure_ref(Tab),
#{} = IxR = ensure_index_ref(Ix, R),
MSpre = pre_match_spec(MS),
mrdb_select:select_reverse(IxR#{ derive_obj_f => mk_derive_obj_f(R)
, pre_ms => MSpre }, MS, Limit).
iter_fold(I, Start, Dir, Fun, Acc) -> iter_fold(I, Start, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc). iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
@ -96,10 +125,6 @@ iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
iter_fold_({error, _}, _, _, _, Acc) -> iter_fold_({error, _}, _, _, _, Acc) ->
Acc. Acc.
index_ref(Tab, Pos) ->
TRef = mrdb:ensure_ref(Tab),
ensure_index_ref(Pos, TRef).
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) -> iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} -> {ok, {{FKey, PKey}}} ->
@ -121,6 +146,29 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
Other Other
end. end.
pre_match_spec([{{KeyPat,ObjPat}, Gs, MatchBody} | T]) ->
[{{{KeyPat,'_'},ObjPat}, Gs, MatchBody} | pre_match_spec(T)];
pre_match_spec([H|T]) ->
[H | pre_match_spec(T)];
pre_match_spec([]) ->
[].
%% Used for mrdb_select:select()
%% The select operation folds over index keys, and for matching keys,
%% calls the `derive_obj_f/1` fun, which normally just does `Obj -> [Obj]`.
%% In the case of indexes, it fetches the object, if it exists, and then
%% returns `[{IndexKey, Object}]`, which is what the matchspec should operate on.
%%
mk_derive_obj_f(Ref) ->
fun({{IxK, Key}}) ->
case mrdb:read(Ref, Key, []) of
[Obj] ->
[{IxK, Obj}];
[] ->
[]
end
end.
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any(). -spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
opt_read(R, Key) -> opt_read(R, Key) ->
case mrdb:read(R, Key, []) of case mrdb:read(R, Key, []) of

View File

@ -1,11 +1,15 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_select). -module(mrdb_select).
-export([ select/3 %% (Ref, MatchSpec, Limit) -export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit) , select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select/1 %% (Cont) , select_reverse/3
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit) , select_reverse/4
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit) , select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, fold_reverse/5
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
]). ]).
-export([continuation_info/2]). -export([continuation_info/2]).
@ -25,26 +29,41 @@
, compiled_ms , compiled_ms
, limit , limit
, key_only = false % TODO: not used , key_only = false % TODO: not used
, direction = forward % TODO: not used , direction = forward
, pre_ms
, derive_obj_f = fun unit_l/1
}). }).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) -> select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit). select(Ref, MS, false, forward, Limit).
select(Ref, MS, AccKeys, Limit) select(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, forward, Limit).
select_reverse(Ref, MS, Limit) ->
select(Ref, MS, false, reverse, Limit).
select_reverse(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, reverse, Limit).
select(Ref, MS, AccKeys, Dir, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) -> when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit), Sel = mk_sel(Ref, MS, Dir, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end). mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) -> mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref), MSpre = maps:get(pre_ms, Ref, MS),
Keypat = keypat(MSpre, keypos(Tab), Ref),
#sel{tab = Tab, #sel{tab = Tab,
ref = Ref, ref = Ref,
keypat = Keypat, keypat = Keypat,
ms = MS, ms = MSpre,
compiled_ms = ets:match_spec_compile(MS), pre_ms = MSpre,
compiled_ms = ms_compile(MS),
key_only = needs_key_only(MS), key_only = needs_key_only(MS),
limit = Limit}. direction = Dir,
limit = Limit,
derive_obj_f = derive_f(Ref)}.
select(Cont) -> select(Cont) ->
case Cont of case Cont of
@ -59,11 +78,18 @@ continuation_info(_, _) -> undefined.
continuation_info_(ref, #sel{ref = Ref}) -> Ref; continuation_info_(ref, #sel{ref = Ref}) -> Ref;
continuation_info_(ms, #sel{ms = MS }) -> MS; continuation_info_(ms, #sel{ms = MS }) -> MS;
continuation_info_(pre_ms, #sel{pre_ms = MSp}) -> MSp;
continuation_info_(limit, #sel{limit = L }) -> L; continuation_info_(limit, #sel{limit = L }) -> L;
continuation_info_(direction, #sel{direction = Dir}) -> Dir; continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined. continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) -> fold(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, forward, Limit).
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
{AccKeys, F} = {AccKeys, F} =
if is_function(Fun, 3) -> if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) -> {true, fun({K, Obj}, Acc1) ->
@ -74,7 +100,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
true -> true ->
mrdb:abort(invalid_fold_fun) mrdb:abort(invalid_fold_fun)
end, end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc). fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
fold_('$end_of_table', _, Acc) -> fold_('$end_of_table', _, Acc) ->
Acc; Acc;
@ -86,12 +112,57 @@ fold_({L, Cont}, Fun, Acc) ->
rdb_fold(Ref, Fun, Acc, Prefix, Limit) -> rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
Ref, fun(I) -> Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)), MovRes = fwd_init_seek(I, Prefix),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit) i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end). end).
first(#{vsn := 1}) -> <<?DATA_START>>; rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
first(_) -> first. mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
fwd_init_seek_tgt(<<>> ) -> first;
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
rev_init_seek(I, Pfx) ->
case rev_init_seek_tgt(Pfx) of
last ->
i_move(I, last);
{seek, Bin} ->
%% An 'incremented' prefix.
%% This will fail if we seek past the end of the table.
%% Then, try to seek_for_prev instead (fails if table empty).
%% This because rocksdb lacks a "seek backward to last matching prefix".
case i_move(I, {seek, Bin}) of
{error, invalid_iterator} ->
i_move(I, {seek_for_prev, Bin});
{ok, _, _} = Ok ->
Ok
end
end.
rev_init_seek_tgt(<<>>) -> last;
rev_init_seek_tgt(Prefix) ->
case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end.
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
PfxI = binary:decode_unsigned(Pfx),
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
case PfxI + 1 of
I1 when I1 >= MaxI -> last;
I1 ->
binary:encode_unsigned(I1)
end.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 -> i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
@ -104,20 +175,37 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) -> i_rdb_fold(_, _, _, _, Acc, _) ->
Acc. Acc.
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS , compiled_ms = MS
, limit = Limit , limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) -> , direction = Dir
StartKey = case {Pfx, Vsn, Enc} of , ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
{<<>>, 1, {sext, _}} -> {MoveRes, Sel} = case Enc of
<<?DATA_START>>; {term, _} ->
{_, _, {term, _}} -> %% No defined ordering - do forward select
<<>>; {i_move(I, first), Sel0#sel{direction = forward}};
_ -> _ ->
Pfx case Dir of
end, forward ->
select_traverse(rocksdb:iterator_move(I, StartKey), Limit, {fwd_init_seek(I, Pfx), Sel0};
Pfx, MS, I, Sel, AccKeys, Acc). reverse ->
{rev_init_seek(I, Pfx), Sel0}
end
end,
select_traverse(MoveRes, Limit, Pfx, MS, I, Sel, AccKeys, Acc).
needs_key_only([Pat]) -> needs_key_only([Pat]) ->
needs_key_only_(Pat); needs_key_only_(Pat);
@ -193,16 +281,17 @@ map_vars([H|T], P) ->
map_vars([], _) -> map_vars([], _) ->
[]. [].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
AccKeys, Acc) -> AccKeys, Acc) ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
true -> true ->
DecKey = decode_key(K, R), DecKey = decode_key(K, R),
Rec = decode_val(V, DecKey, R), Rec0 = decode_val(V, DecKey, R),
case ets:match_spec_run([Rec], MS) of RecL = derive_object(Rec0, Sel),
case ms_run(RecL, MS) of
[] -> [] ->
select_traverse( select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS, rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
I, Sel, AccKeys, Acc); I, Sel, AccKeys, Acc);
[Match] -> [Match] ->
Acc1 = if AccKeys -> Acc1 = if AccKeys ->
@ -212,6 +301,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
end, end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end; end;
false when Dir == reverse, K > Pfx ->
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
false when Limit == infinity -> false when Limit == infinity ->
lists:reverse(Acc); lists:reverse(Acc);
false -> false ->
@ -220,6 +312,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) -> select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}). select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
next_or_prev(forward) -> next;
next_or_prev(reverse) -> prev.
select_return(infinity, {L, '$end_of_table'}) -> select_return(infinity, {L, '$end_of_table'}) ->
L; L;
select_return(_, Ret) -> select_return(_, Ret) ->
@ -239,29 +334,40 @@ decr(I) when is_integer(I) ->
decr(infinity) -> decr(infinity) ->
infinity. infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc), {lists:reverse(Acc),
fun(sel) -> Sel; fun(sel) -> Sel;
(cont) -> (cont) ->
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
Ref, Ref,
fun(NewI) -> fun(NewI) ->
select_traverse(iterator_next(NewI, K), select_traverse(iterator_next(NewI, K, Dir),
Limit, Pfx, MS, NewI, Sel, Limit, Pfx, MS, NewI, Sel,
AccKeys, []) AccKeys, [])
end) end)
end}; end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) -> iterator_next(I, K, Dir) ->
case rocksdb:iterator_move(I, K) of case i_move(I, K, Dir) of
{ok, K, _} -> {ok, K, _} ->
rocksdb:iterator_move(I, next); rocksdb:iterator_move(I, next_or_prev(Dir));
Other -> Other ->
Other Other
end. end.
i_move(I, Tgt) ->
rocksdb:iterator_move(I, Tgt).
i_move(I, K, reverse) ->
rocksdb:iterator_move(I, {seek_for_prev, K});
i_move(I, K, forward) ->
rocksdb:iterator_move(I, K).
derive_object(R, #sel{derive_obj_f = F}) ->
F(R).
keypat([H|T], KeyPos, Ref) -> keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)). keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
@ -283,3 +389,14 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadP
keypat_pfx(_, _, _) -> keypat_pfx(_, _, _) ->
<<>>. <<>>.
derive_f(#{derive_obj_f := F}) when is_function(F, 1) -> F;
derive_f(_) -> fun unit_l/1.
unit_l(X) ->
[X].
ms_compile(MS) ->
ets:match_spec_compile(MS).
ms_run(RecL, MS) ->
ets:match_spec_run(RecL, MS).

View File

@ -27,6 +27,9 @@
, mrdb_two_procs_tx_inner_restart/1 , mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1 , mrdb_two_procs_snap/1
, mrdb_three_procs/1 , mrdb_three_procs/1
, create_counters/1
, update_counters/1
, restart_node/1
]). ]).
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -42,7 +45,8 @@ all() ->
groups() -> groups() ->
[ [
{all_tests, [sequence], [ {group, checks} {all_tests, [sequence], [ {group, checks}
, {group, mrdb} ]} , {group, mrdb}
, {group, counters}]}
%% , error_handling ]} %% , error_handling ]}
, {checks, [sequence], [ encoding_sext_attrs , {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary , encoding_binary_binary
@ -57,6 +61,10 @@ groups() ->
, mrdb_two_procs_tx_inner_restart , mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap , mrdb_two_procs_snap
, mrdb_three_procs ]} , mrdb_three_procs ]}
, {counters, [sequence], [ create_counters
, update_counters
, restart_node
, update_counters ]}
]. ].
@ -749,6 +757,35 @@ get_attempt() ->
#{activity := #{attempt := Attempt}} = mrdb:current_context(), #{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt. Attempt.
create_counters(_Config) ->
create_tab(counters, []),
mrdb:insert(counters, {counters, c0, 1}),
mrdb:update_counter(counters, c1, 1),
[{counters, c0, 1}] = mrdb:read(counters, c0),
[{counters, c1, 1}] = mrdb:read(counters, c1),
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
ok.
restart_node(_Config) ->
mnesia:stop(),
ok = mnesia:start(),
ct:log("mnesia restarted", []),
ok.
update_counters(_Config) ->
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
ok = mrdb:update_counter(counters, c0, 1),
ok = mrdb:update_counter(counters, c1, 1),
ct:log("Incremented c0 and c1 by 1", []),
C0 = C0Prev + 1,
C1 = C1Prev + 1,
[{counters, c0, C0}] = mrdb:read(counters, c0),
[{counters, c1, C1}] = mrdb:read(counters, c1),
ct:log("c0: ~p, c1: ~p", [C0, C1]),
ok.
create_tabs(Tabs, Config) -> create_tabs(Tabs, Config) ->
Res = lists:map(fun create_tab/1, Tabs), Res = lists:map(fun create_tab/1, Tabs),
tr_ct:trace_checkpoint(?TABS_CREATED, Config), tr_ct:trace_checkpoint(?TABS_CREATED, Config),

View File

@ -236,12 +236,25 @@ test_index_plugin(Config) ->
if Type == rdb -> if Type == rdb ->
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})), Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})), Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
ok = test_select(Tab,{pfx},[{'_', [], ['$_']}]),
MS2 = [{{<<"whi">>,{Tab,"truth",'_'}},[],['$_']}],
[_] = mrdb_index:select(Tab, {pfx}, MS2),
ok = test_select(Tab, {pfx}, MS2),
[{Tab,"foobar","sentence"}] = mrdb:index_read( [{Tab,"foobar","sentence"}] = mrdb:index_read(
Tab, <<"foo">>, {pfx}); Tab, <<"foo">>, {pfx});
true -> true ->
ok ok
end. end.
test_select(Tab, Ix, MS) ->
Res = mrdb_index:select(Tab, Ix, MS),
ct:log("mrdb_index:select(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, Res]),
RevRes = mrdb_index:select_reverse(Tab, Ix, MS),
ct:log("mrdb_index:select_reverse(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, RevRes]),
{Res,Res} = {Res, lists:reverse(RevRes)},
ok.
ixtype(T) when T==bag; ixtype(T) when T==bag;
T==ordered -> T==ordered ->
{{pfx}, T}; {{pfx}, T};

226
test/mrdb_fold_SUITE.erl Normal file
View File

@ -0,0 +1,226 @@
-module(mrdb_fold_SUITE).
-export([
all/0
, groups/0
, suite/0
, init_per_suite/1
, end_per_suite/1
, init_per_group/2
, end_per_group/2
, init_per_testcase/2
, end_per_testcase/2
]).
-export([
no_prefix_fwd_rdb_fold/1
, prefixed_fwd_rdb_fold/1
, no_prefix_rev_rdb_fold/1
, prefixed_rev_rdb_fold/1
, prefixed_rev_rdb_fold_max/1
, fwd_fold/1
, filtered_fwd_fold/1
, rev_fold/1
, filtered_rev_fold/1
, fwd_select/1
, rev_select/1
, select_cont/1
, rev_select_cont/1
]).
-include_lib("common_test/include/ct.hrl").
-record(r, {k, v}).
suite() ->
[].
all() ->
[{group, all_tests}].
groups() ->
[
{all_tests, [sequence], [ {group, rdb_fold}
, {group, fold}
, {group, select} ]}
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
, prefixed_fwd_rdb_fold
, no_prefix_rev_rdb_fold
, prefixed_rev_rdb_fold
, prefixed_rev_rdb_fold_max ]}
, {fold, [sequence], [ fwd_fold
, filtered_fwd_fold
, rev_fold
, filtered_rev_fold ]}
, {select, [sequence], [ fwd_select
, rev_select
, select_cont
, rev_select_cont ]}
].
init_per_suite(Config) ->
mnesia:stop(),
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
Config.
end_per_suite(_Config) ->
mnesia:stop(),
ok.
init_per_group(G, Config) when G==rdb_fold; G==fold ->
mk_tab(G),
Config;
init_per_group(_, Config) ->
Config.
end_per_group(_, _Config) ->
ok.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
mk_tab(G) ->
T = tab_name(G),
case create_tab(T) of
{atomic, ok} -> fill_tab(T);
{aborted,{already_exists,T}} -> ok
end.
tab_name(fold ) -> r;
tab_name(select ) -> r;
tab_name(rdb_fold) -> t.
create_tab(T) ->
Opts = tab_opts(T),
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
tab_opts(r) ->
[{attributes, [k, v]}, {type, ordered_set}];
tab_opts(t) ->
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
no_prefix_fwd_rdb_fold(_Config) ->
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = lists:reverse(raw_objs()),
{Res, Res} = {Res, Expected}.
fwd_fold(_Config) ->
Res = mrdb:fold(r, fun simple_acc/2, []),
Expected = lists:reverse(objs()),
{Res, Res} = {Res, Expected}.
prefixed_fwd_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
{Res, Res} = {Res, Expected}.
filtered_fwd_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
no_prefix_rev_rdb_fold(_Config) ->
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = raw_objs(),
{Res, Res} = {Res, Expected}.
rev_fold(_Config) ->
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
Expected = objs(),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
filtered_rev_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold_max(_Config) ->
Pfx = <<255,255>>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
fwd_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select(r, MS),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
rev_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select_reverse(r, MS),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = filtered_objs(MS),
select_cont_loop(mrdb:select(r, MS, 1), Expected).
rev_select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = lists:reverse(filtered_objs(MS)),
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
select_cont_loop(mrdb:select(Cont), Rest);
select_cont_loop({[], '$end_of_table'}, []) ->
ok.
simple_acc(#r{} = Obj, Acc) ->
[Obj | Acc].
simple_rdb_acc(K, V, Acc) ->
[{K,V} | Acc].
fill_tab(t = Tab) ->
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
ok;
fill_tab(r = Tab) ->
[mrdb:insert(Tab, Obj) || Obj <- objs()],
ok.
prefixed_raw_objs(Pfx) ->
[Obj || {K,_} = Obj <- raw_objs(),
is_prefix(Pfx, K)].
raw_objs() ->
[ {<<"aa">> , <<"1">>}
, {<<"aa1">>, <<"2">>}
, {<<"ab">> , <<"3">>}
, {<<"ab1">>, <<"4">>}
, {<<255,255>>, <<"5">>} ].
filtered_objs(MS) ->
MSC = ets:match_spec_compile(MS),
ets:match_spec_run(objs(), MSC).
objs() ->
[ #r{k = {a,a,1}, v = 1}
, #r{k = {a,b,2}, v = 2}
, #r{k = {a,b,3}, v = 3}
, #r{k = {a,c,4}, v = 4}
, #r{k = {b,b,5}, v = 5}
].
%% copied from mrdb_select.erl
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.