Merge pull request 'Add mrdb:fold_reverse(), mrdb:select_reverse() etc. Remove vsn 1 support.' (#6) from uw-fold_reverse into master

Reviewed-on: #6
This commit is contained in:
Ulf Wiger 2025-06-25 03:13:52 +09:00
commit 4e3c9e83c8
5 changed files with 404 additions and 290 deletions

View File

@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
i_show_table(I, Move, Limit, Ref) -> i_show_table(I, Move, Limit, Ref) ->
case rocksdb:iterator_move(I, Move) of case rocksdb:iterator_move(I, Move) of
{ok, EncKey, EncVal} -> {ok, EncKey, EncVal} ->
{Type,Val} =
case EncKey of
<< ?INFO_TAG, K/binary >> ->
K1 = decode_key(K, Ref),
V = decode_val(EncVal, K1, Ref),
{info,V};
_ ->
K = decode_key(EncKey, Ref), K = decode_key(EncKey, Ref),
V = decode_val(EncVal, K, Ref), Val = decode_val(EncVal, K, Ref),
{data,V} io:fwrite("~p~n", [Val]),
end,
io:fwrite("~p: ~p~n", [Type, Val]),
i_show_table(I, next, Limit-1, Ref); i_show_table(I, next, Limit-1, Ref);
_ -> _ ->
ok ok
@ -675,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 -> slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
#{semantics := Sem} = Ref = get_ref(Tab), #{semantics := Sem} = Ref = get_ref(Tab),
Start = case Ref of First = fun(I) -> rocksdb:iterator_move(I, first) end,
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
_ -> first
end,
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
F = case Sem of F = case Sem of
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end; bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end _ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end

View File

@ -250,31 +250,6 @@ get_info_res(Res, Default) ->
error(E) error(E)
end. end.
%% Admin info: metadata written by the admin proc to keep track of
%% the derived status of tables (such as detected version and encoding
%% of existing standalone tables.)
%%
write_admin_info(K, V, Alias, Name) ->
mrdb:rdb_put(get_ref({admin, Alias}),
admin_info_key(K, Name),
term_to_binary(V)).
read_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
_ ->
error
end.
delete_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
admin_info_key(K, Name) ->
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
%% Table metadata info maintained by users %% Table metadata info maintained by users
%% %%
write_info(Alias, Tab, K, V) -> write_info(Alias, Tab, K, V) ->
@ -285,7 +260,6 @@ write_info_(Ref, Tab, K, V) ->
write_info_encv(Ref, Tab, K, V) -> write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext), EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, V, []). mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) -> delete_info(Alias, Tab, K) ->
@ -293,30 +267,8 @@ delete_info(Alias, Tab, K) ->
delete_info_(Ref, Tab, K) -> delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext), EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
maybe_delete_standalone_info(Ref, K),
mrdb:rdb_delete(Ref, EncK, []). mrdb:rdb_delete(Ref, EncK, []).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
EncV = mnesia_rocksdb_lib:encode_val(V, term),
rocksdb:put(DbRef, Key, EncV, []);
_ ->
ok
end.
maybe_delete_standalone_info(Ref, K) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
rocksdb:delete(DbRef, Key, []);
_ ->
ok
end.
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 -> write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}). call(Alias, {write_table_property, Tab, Prop}).
@ -813,10 +765,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
false -> false ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St); create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
{false, MP} -> {false, MP} ->
create_table_as_standalone(Alias, Name, true, MP, TRec, St); create_table_as_standalone(Alias, Name, MP, TRec, St);
{true, MP} -> {true, MP} ->
?log(debug, "will create ~p as standalone and migrate", [Name]), ?log(debug, "will create ~p as standalone and migrate", [Name]),
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of case create_table_as_standalone(Alias, Name, MP, TRec, St) of
{ok, OldTRec, _} -> {ok, OldTRec, _} ->
?log(info, "Migrating ~p to column family", [Name]), ?log(info, "Migrating ~p to column family", [Name]),
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St); create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
@ -825,8 +777,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
end end
end; end;
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) -> create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
{Exists, MP} = table_exists_as_standalone(Name), {_, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St). create_table_as_standalone(Alias, Name, MP, TRec, St).
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) -> create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
?log(debug, "Migrate to cf (~p)", [Name]), ?log(debug, "Migrate to cf (~p)", [Name]),
@ -1106,29 +1058,12 @@ table_exists_as_standalone(Name) ->
end, end,
{Exists, MP}. {Exists, MP}.
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) -> create_table_as_standalone(Alias, Name, MP, TRec, St) ->
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
{ok, #{type := standalone, vsn := Vsn1,
encoding := Enc1} = Cf, _St1} = Ok ->
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
Alias, Name),
case Vsn1 of
1 ->
load_info(Alias, Name, Cf);
_ ->
skip
end,
Ok;
Other ->
Other
end.
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
Vsn = check_version(TRec), Vsn = check_version(TRec),
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)}, TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St). do_open_standalone(true, Alias, Name, MP, TRec1, St).
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0, do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
#st{standalone = Ts} = St) -> #st{standalone = Ts} = St) ->
Opts = rocksdb_opts_from_trec(TRec0), Opts = rocksdb_opts_from_trec(TRec0),
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
@ -1138,146 +1073,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
DbRec1 = DbRec#{ cfs => CfNames, DbRec1 = DbRec#{ cfs => CfNames,
mountpoint => MP }, mountpoint => MP },
TRec = maps:merge(TRec0, DbRec#{type => standalone}), TRec = maps:merge(TRec0, DbRec#{type => standalone}),
TRec1 = guess_table_vsn_and_encoding(Exists, TRec), {ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{error, _} = Err -> {error, _} = Err ->
?log(debug, "open_db error: ~p", [Err]), ?log(debug, "open_db error: ~p", [Err]),
Err Err
end. end.
%% When opening a standalone table, chances are it's a legacy table
%% where legacy encoding is already in place. We try to read the
%% first object and apply legacy encoding. If successful, we set
%% legacy encoding in the TRec. If we migrate the data to a column
%% family, we should apply the defined encoding for the cf.
%%
%% The first object can either be an info object (in the legacy case)
%% or a data object, with a sext-encoded key, and a term_to_binary-
%% encoded object as value, where the key position is set to [].
%% The info objects would be sext-encoded key + term-encoded value.
guess_table_vsn_and_encoding(false, TRec) ->
TRec;
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
alias := Alias, name := Name} = R) ->
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
{ok, {V, E}} ->
R#{vsn => V, encoding => E};
error ->
R1 = set_default_guess(R),
mrdb:with_rdb_iterator(
R1, fun(I) ->
guess_table_vsn_and_encoding_(
mrdb:rdb_iterator_move(I, first), I, As, R1)
end)
end.
set_default_guess(#{type := standalone} = R) ->
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
1 ->
R#{vsn => 1, encoding => {sext, {object, term}}};
V ->
R#{vsn => V}
end.
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
Arity = length(As) + 1,
case K of
<<?INFO_TAG, EncK/binary>> ->
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
mnesia_rocksdb_lib:decode(V, term)},
%% This is a vsn 1 standalone table
R#{vsn => 1, encoding => {sext, {object, term}}}
catch
error:_ ->
R
end;
_ ->
Enc = guess_obj_encoding(K, V, Arity),
R#{encoding => Enc}
end;
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
R.
guess_obj_encoding(K, V, Arity) ->
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
guess_encoding(Bin) ->
try {sext, sext:decode(Bin)}
catch
error:_ ->
try {term, binary_to_term(Bin)}
catch
error:_ -> raw
end
end.
guess_key_encoding(Bin) ->
case guess_encoding(Bin) of
raw -> raw;
{Enc, _} -> Enc
end.
guess_val_encoding(Bin, Arity) ->
case guess_encoding(Bin) of
raw -> {value, raw};
{Enc, Term} ->
if is_tuple(Term), size(Term) == Arity,
element(2, Term) == [] ->
{object, Enc};
true ->
{value, Enc}
end
end.
%% This is slightly different from `rocksdb:is_empty/1`, since it allows %% This is slightly different from `rocksdb:is_empty/1`, since it allows
%% for the presence of some metadata, and still considers it empty if there %% for the presence of some metadata, and still considers it empty if there
%% is no user data. %% is no user data.
table_is_empty(#{} = DbRec) -> table_is_empty(#{} = DbRec) ->
Start = iterator_data_start(DbRec),
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
DbRec, fun(I) -> DbRec, fun(I) ->
case mrdb:rdb_iterator_move(I, Start) of case mrdb:rdb_iterator_move(I, first) of
{ok, _, _} -> false; {ok, _, _} -> false;
_ -> true _ -> true
end end
end). end).
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
iterator_data_start(_) -> first.
load_info(Alias, Name, Cf) ->
ARef = get_ref({admin, Alias}),
mrdb:with_rdb_iterator(
Cf, fun(I) ->
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
end).
load_info_(Res, I, ARef, Tab) ->
case Res of
{ok, << ?INFO_TAG, K/binary >>, V} ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_encv(ARef, Tab, DecK, V);
<<131,_/binary>> = Value ->
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
try binary_to_term(Value) of
_DecVal ->
%% We haven't been storing erlang-term encoded data as info,
%% so assume this is double-encoded and correct
write_info_encv(ARef, Tab, DecK, Value)
catch
error:_ ->
skip
end;
_ ->
skip
end,
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
_ ->
ok
end.
check_version(TRec) -> check_version(TRec) ->
user_property(mrdb_version, TRec, ?VSN). user_property(mrdb_version, TRec, ?VSN).
@ -1607,7 +1420,6 @@ close_and_delete_standalone(#{alias := Alias,
case get_table_mountpoint(Alias, Name, St) of case get_table_mountpoint(Alias, Name, St) of
{ok, MP} -> {ok, MP} ->
close_and_delete(DbRef, MP), close_and_delete(DbRef, MP),
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)}; St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
error -> error ->
St St

View File

@ -66,9 +66,12 @@
, prev/2 , prev/3 , prev/2 , prev/3
, last/1 , last/2 , last/1 , last/2
, select/2 , select/3 , select/2 , select/3
, select_reverse/2, select_reverse/3
, select/1 , select/1
, fold/3 , fold/4 , fold/5 , fold/3 , fold/4 , fold/5
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
, rdb_fold/4 , rdb_fold/5 , rdb_fold/4 , rdb_fold/5
, rdb_fold_reverse/4, rdb_fold_reverse/5
, write_info/3 , write_info/3
, read_info/2 , read_info/2
, read_info/1 , read_info/1
@ -540,7 +543,6 @@ re_throw(Cat, Err) ->
throw -> throw(Err) throw -> throw(Err)
end. end.
mnesia_compatible_aborts() -> mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false). mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
@ -661,10 +663,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
inherit_ctxt(Ref, R) -> inherit_ctxt(Ref, R) ->
maps:merge(Ref, maps:with([snapshot, activity], R)). maps:merge(Ref, maps:with([snapshot, activity], R)).
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%% @equiv with_iterator(Tab, Fun, [])
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res. -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
with_iterator(Tab, Fun) -> with_iterator(Tab, Fun) ->
with_iterator(Tab, Fun, []). with_iterator(Tab, Fun, []).
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%%
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res. -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
with_iterator(Tab, Fun, Opts) -> with_iterator(Tab, Fun, Opts) ->
R = ensure_ref(Tab), R = ensure_ref(Tab),
@ -1300,6 +1317,13 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:select(ensure_ref(Tab), Pat, Limit). mrdb_select:select(ensure_ref(Tab), Pat, Limit).
select_reverse(Tab, Pat) ->
select_reverse(Tab, Pat, infinity).
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
select(Cont) -> select(Cont) ->
mrdb_select:select(Cont). mrdb_select:select(Cont).
@ -1354,6 +1378,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit). mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
fold_reverse(Tab, Fun, Acc) ->
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3) rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) -> , is_binary(Prefix) ->
rdb_fold(Tab, Fun, Acc, Prefix, infinity). rdb_fold(Tab, Fun, Acc, Prefix, infinity).
@ -1363,6 +1397,15 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
true = valid_limit(Limit), true = valid_limit(Limit),
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit). mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
, is_binary(Prefix) ->
true = valid_limit(Limit),
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) -> valid_limit(L) ->
case L of case L of
infinity -> infinity ->
@ -1374,15 +1417,7 @@ valid_limit(L) ->
end. end.
write_info(Tab, K, V) -> write_info(Tab, K, V) ->
R = ensure_ref(Tab), #{alias := Alias} = ensure_ref(Tab),
Alias = case R of
#{type := standalone, vsn := 1, alias := A} = TRef ->
%% Also write on legacy info format
write_info_standalone(TRef, K, V),
A;
#{alias := A} ->
A
end,
write_info_(ensure_ref({admin, Alias}), Tab, K, V). write_info_(ensure_ref({admin, Alias}), Tab, K, V).
write_info_(#{} = R, Tab, K, V) -> write_info_(#{} = R, Tab, K, V) ->
@ -1399,13 +1434,8 @@ read_info(Tab, K) ->
read_info(Tab, K, Default) when K==size; K==memory -> read_info(Tab, K, Default) when K==size; K==memory ->
read_direct_info_(ensure_ref(Tab), K, Default); read_direct_info_(ensure_ref(Tab), K, Default);
read_info(Tab, K, Default) -> read_info(Tab, K, Default) ->
#{alias := Alias} = R = ensure_ref(Tab), #{alias := Alias} = ensure_ref(Tab),
case R of mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
#{type := standalone, vsn := 1} = TRef ->
read_info_standalone(TRef, K, Default);
#{alias := Alias} ->
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
end.
read_direct_info_(R, memory, _Def) -> read_direct_info_(R, memory, _Def) ->
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0); get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
@ -1429,26 +1459,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
%%rocksdb_boolean(<<"1">>) -> true; %%rocksdb_boolean(<<"1">>) -> true;
%%rocksdb_boolean(<<"0">>) -> false. %%rocksdb_boolean(<<"0">>) -> false.
write_info_standalone(#{} = R, K, V) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
EncV = term_to_binary(V),
rdb_put(R, EncK, EncV, write_opts(R, [])).
read_info_standalone(#{} = R, K, Default) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
get_info_res(Res, Default) ->
case Res of
not_found ->
Default;
{ok, Bin} ->
%% no fancy tricks when encoding/decoding info values
binary_to_term(Bin);
{error, E} ->
error(E)
end.
%% insert_bag_v2(Ref, K, V, Opts) -> %% insert_bag_v2(Ref, K, V, Opts) ->
%% rdb_merge(Ref, K, {list_append, [V]} %% rdb_merge(Ref, K, {list_append, [V]}

View File

@ -3,9 +3,13 @@
-export([ select/3 %% (Ref, MatchSpec, Limit) -export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit) , select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select_reverse/3
, select_reverse/4
, select/1 %% (Cont) , select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit) , fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, fold_reverse/5
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit) , rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
]). ]).
-export([continuation_info/2]). -export([continuation_info/2]).
@ -25,18 +29,27 @@
, compiled_ms , compiled_ms
, limit , limit
, key_only = false % TODO: not used , key_only = false % TODO: not used
, direction = forward % TODO: not used , direction = forward
}). }).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) -> select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit). select(Ref, MS, false, forward, Limit).
select(Ref, MS, AccKeys, Limit) select(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, forward, Limit).
select_reverse(Ref, MS, Limit) ->
select(Ref, MS, false, reverse, Limit).
select_reverse(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, reverse, Limit).
select(Ref, MS, AccKeys, Dir, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) -> when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit), Sel = mk_sel(Ref, MS, Dir, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end). mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) -> mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref), Keypat = keypat(MS, keypos(Tab), Ref),
#sel{tab = Tab, #sel{tab = Tab,
ref = Ref, ref = Ref,
@ -44,6 +57,7 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) ->
ms = MS, ms = MS,
compiled_ms = ets:match_spec_compile(MS), compiled_ms = ets:match_spec_compile(MS),
key_only = needs_key_only(MS), key_only = needs_key_only(MS),
direction = Dir,
limit = Limit}. limit = Limit}.
select(Cont) -> select(Cont) ->
@ -64,6 +78,12 @@ continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined. continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) -> fold(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, forward, Limit).
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
{AccKeys, F} = {AccKeys, F} =
if is_function(Fun, 3) -> if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) -> {true, fun({K, Obj}, Acc1) ->
@ -74,7 +94,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
true -> true ->
mrdb:abort(invalid_fold_fun) mrdb:abort(invalid_fold_fun)
end, end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc). fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
fold_('$end_of_table', _, Acc) -> fold_('$end_of_table', _, Acc) ->
Acc; Acc;
@ -86,12 +106,43 @@ fold_({L, Cont}, Fun, Acc) ->
rdb_fold(Ref, Fun, Acc, Prefix, Limit) -> rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
Ref, fun(I) -> Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)), MovRes = fwd_init_seek(I, Prefix),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit) i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end). end).
first(#{vsn := 1}) -> <<?DATA_START>>; rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
first(_) -> first. mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
fwd_init_seek_tgt(<<>> ) -> first;
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
rev_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, rev_init_seek_tgt(Pfx)).
rev_init_seek_tgt(<<>>) -> last;
rev_init_seek_tgt(Prefix) ->
case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end.
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
PfxI = binary:decode_unsigned(Pfx),
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
case PfxI + 1 of
I1 when I1 >= MaxI -> last;
I1 ->
binary:encode_unsigned(I1)
end.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 -> i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
@ -104,17 +155,34 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) -> i_rdb_fold(_, _, _, _, Acc, _) ->
Acc. Acc.
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS , compiled_ms = MS
, limit = Limit , limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) -> , direction = Dir
StartKey = case {Pfx, Vsn, Enc} of , ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
{<<>>, 1, {sext, _}} -> {StartKey, Sel} = case Enc of
<<?DATA_START>>; {term, _} ->
{_, _, {term, _}} -> %% No defined ordering - do forward select
<<>>; {first, Sel0#sel{direction = forward}};
_ -> _ ->
Pfx SK = case Dir of
forward -> fwd_init_seek_tgt(Pfx);
reverse -> rev_init_seek_tgt(Pfx)
end,
{SK, Sel0}
end, end,
select_traverse(rocksdb:iterator_move(I, StartKey), Limit, select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc). Pfx, MS, I, Sel, AccKeys, Acc).
@ -193,7 +261,7 @@ map_vars([H|T], P) ->
map_vars([], _) -> map_vars([], _) ->
[]. [].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
AccKeys, Acc) -> AccKeys, Acc) ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
true -> true ->
@ -202,7 +270,7 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
case ets:match_spec_run([Rec], MS) of case ets:match_spec_run([Rec], MS) of
[] -> [] ->
select_traverse( select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS, rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
I, Sel, AccKeys, Acc); I, Sel, AccKeys, Acc);
[Match] -> [Match] ->
Acc1 = if AccKeys -> Acc1 = if AccKeys ->
@ -212,6 +280,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
end, end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end; end;
false when Dir == reverse, K > Pfx ->
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
false when Limit == infinity -> false when Limit == infinity ->
lists:reverse(Acc); lists:reverse(Acc);
false -> false ->
@ -220,6 +291,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) -> select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}). select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
next_or_prev(forward) -> next;
next_or_prev(reverse) -> prev.
select_return(infinity, {L, '$end_of_table'}) -> select_return(infinity, {L, '$end_of_table'}) ->
L; L;
select_return(_, Ret) -> select_return(_, Ret) ->
@ -239,29 +313,34 @@ decr(I) when is_integer(I) ->
decr(infinity) -> decr(infinity) ->
infinity. infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc), {lists:reverse(Acc),
fun(sel) -> Sel; fun(sel) -> Sel;
(cont) -> (cont) ->
mrdb:with_rdb_iterator( mrdb:with_rdb_iterator(
Ref, Ref,
fun(NewI) -> fun(NewI) ->
select_traverse(iterator_next(NewI, K), select_traverse(iterator_next(NewI, K, Dir),
Limit, Pfx, MS, NewI, Sel, Limit, Pfx, MS, NewI, Sel,
AccKeys, []) AccKeys, [])
end) end)
end}; end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) -> iterator_next(I, K, Dir) ->
case rocksdb:iterator_move(I, K) of case i_move(I, K, Dir) of
{ok, K, _} -> {ok, K, _} ->
rocksdb:iterator_move(I, next); rocksdb:iterator_move(I, next_or_prev(Dir));
Other -> Other ->
Other Other
end. end.
i_move(I, K, reverse) ->
rocksdb:iterator_move(I, {seek_for_prev, K});
i_move(I, K, forward) ->
rocksdb:iterator_move(I, K).
keypat([H|T], KeyPos, Ref) -> keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)). keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).

226
test/mrdb_fold_SUITE.erl Normal file
View File

@ -0,0 +1,226 @@
-module(mrdb_fold_SUITE).
-export([
all/0
, groups/0
, suite/0
, init_per_suite/1
, end_per_suite/1
, init_per_group/2
, end_per_group/2
, init_per_testcase/2
, end_per_testcase/2
]).
-export([
no_prefix_fwd_rdb_fold/1
, prefixed_fwd_rdb_fold/1
, no_prefix_rev_rdb_fold/1
, prefixed_rev_rdb_fold/1
, prefixed_rev_rdb_fold_max/1
, fwd_fold/1
, filtered_fwd_fold/1
, rev_fold/1
, filtered_rev_fold/1
, fwd_select/1
, rev_select/1
, select_cont/1
, rev_select_cont/1
]).
-include_lib("common_test/include/ct.hrl").
-record(r, {k, v}).
suite() ->
[].
all() ->
[{group, all_tests}].
groups() ->
[
{all_tests, [sequence], [ {group, rdb_fold}
, {group, fold}
, {group, select} ]}
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
, prefixed_fwd_rdb_fold
, no_prefix_rev_rdb_fold
, prefixed_rev_rdb_fold
, prefixed_rev_rdb_fold_max ]}
, {fold, [sequence], [ fwd_fold
, filtered_fwd_fold
, rev_fold
, filtered_rev_fold ]}
, {select, [sequence], [ fwd_select
, rev_select
, select_cont
, rev_select_cont ]}
].
init_per_suite(Config) ->
mnesia:stop(),
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
Config.
end_per_suite(_Config) ->
mnesia:stop(),
ok.
init_per_group(G, Config) when G==rdb_fold; G==fold ->
mk_tab(G),
Config;
init_per_group(_, Config) ->
Config.
end_per_group(_, _Config) ->
ok.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
mk_tab(G) ->
T = tab_name(G),
case create_tab(T) of
{atomic, ok} -> fill_tab(T);
{aborted,{already_exists,T}} -> ok
end.
tab_name(fold ) -> r;
tab_name(select ) -> r;
tab_name(rdb_fold) -> t.
create_tab(T) ->
Opts = tab_opts(T),
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
tab_opts(r) ->
[{attributes, [k, v]}, {type, ordered_set}];
tab_opts(t) ->
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
no_prefix_fwd_rdb_fold(_Config) ->
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = lists:reverse(raw_objs()),
{Res, Res} = {Res, Expected}.
fwd_fold(_Config) ->
Res = mrdb:fold(r, fun simple_acc/2, []),
Expected = lists:reverse(objs()),
{Res, Res} = {Res, Expected}.
prefixed_fwd_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
{Res, Res} = {Res, Expected}.
filtered_fwd_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
no_prefix_rev_rdb_fold(_Config) ->
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = raw_objs(),
{Res, Res} = {Res, Expected}.
rev_fold(_Config) ->
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
Expected = objs(),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
filtered_rev_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold_max(_Config) ->
Pfx = <<255,255>>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
fwd_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select(r, MS),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
rev_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select_reverse(r, MS),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = filtered_objs(MS),
select_cont_loop(mrdb:select(r, MS, 1), Expected).
rev_select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = lists:reverse(filtered_objs(MS)),
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
select_cont_loop(mrdb:select(Cont), Rest);
select_cont_loop({[], '$end_of_table'}, []) ->
ok.
simple_acc(#r{} = Obj, Acc) ->
[Obj | Acc].
simple_rdb_acc(K, V, Acc) ->
[{K,V} | Acc].
fill_tab(t = Tab) ->
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
ok;
fill_tab(r = Tab) ->
[mrdb:insert(Tab, Obj) || Obj <- objs()],
ok.
prefixed_raw_objs(Pfx) ->
[Obj || {K,_} = Obj <- raw_objs(),
is_prefix(Pfx, K)].
raw_objs() ->
[ {<<"aa">> , <<"1">>}
, {<<"aa1">>, <<"2">>}
, {<<"ab">> , <<"3">>}
, {<<"ab1">>, <<"4">>}
, {<<255,255>>, <<"5">>} ].
filtered_objs(MS) ->
MSC = ets:match_spec_compile(MS),
ets:match_spec_run(objs(), MSC).
objs() ->
[ #r{k = {a,a,1}, v = 1}
, #r{k = {a,b,2}, v = 2}
, #r{k = {a,b,3}, v = 3}
, #r{k = {a,c,4}, v = 4}
, #r{k = {b,b,5}, v = 5}
].
%% copied from mrdb_select.erl
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.