Fix prefixed rdb_fold, add mrdb:rdb_rev_fold()

This commit is contained in:
Ulf Wiger 2025-06-19 14:00:47 +02:00
parent 318f84bbaf
commit d8b6ab788e
5 changed files with 196 additions and 85 deletions

View File

@ -675,11 +675,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
#{semantics := Sem} = Ref = get_ref(Tab),
Start = case Ref of
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
_ -> first
end,
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
First = fun(I) -> rocksdb:iterator_move(I, first) end,
F = case Sem of
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end

View File

@ -285,7 +285,6 @@ write_info_(Ref, Tab, K, V) ->
write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) ->
@ -293,30 +292,8 @@ delete_info(Alias, Tab, K) ->
delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
maybe_delete_standalone_info(Ref, K),
mrdb:rdb_delete(Ref, EncK, []).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
EncV = mnesia_rocksdb_lib:encode_val(V, term),
rocksdb:put(DbRef, Key, EncV, []);
_ ->
ok
end.
maybe_delete_standalone_info(Ref, K) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
rocksdb:delete(DbRef, Key, []);
_ ->
ok
end.
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}).
@ -1108,16 +1085,11 @@ table_exists_as_standalone(Name) ->
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
{ok, #{type := standalone, vsn := Vsn1,
encoding := Enc1} = Cf, _St1} = Ok ->
{ok, #{ type := standalone, vsn := Vsn1
, encoding := Enc1}
, _St1} = Ok ->
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
Alias, Name),
case Vsn1 of
1 ->
load_info(Alias, Name, Cf);
_ ->
skip
end,
Ok;
Other ->
Other
@ -1174,7 +1146,7 @@ guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
set_default_guess(#{type := standalone} = R) ->
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
1 ->
R#{vsn => 1, encoding => {sext, {object, term}}};
error({unsupported_vsn, 1});
V ->
R#{vsn => V}
end.
@ -1233,51 +1205,14 @@ guess_val_encoding(Bin, Arity) ->
%% for the presence of some metadata, and still considers it empty if there
%% is no user data.
table_is_empty(#{} = DbRec) ->
Start = iterator_data_start(DbRec),
mrdb:with_rdb_iterator(
DbRec, fun(I) ->
case mrdb:rdb_iterator_move(I, Start) of
case mrdb:rdb_iterator_move(I, first) of
{ok, _, _} -> false;
_ -> true
end
end).
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
iterator_data_start(_) -> first.
load_info(Alias, Name, Cf) ->
ARef = get_ref({admin, Alias}),
mrdb:with_rdb_iterator(
Cf, fun(I) ->
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
end).
load_info_(Res, I, ARef, Tab) ->
case Res of
{ok, << ?INFO_TAG, K/binary >>, V} ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_encv(ARef, Tab, DecK, V);
<<131,_/binary>> = Value ->
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
try binary_to_term(Value) of
_DecVal ->
%% We haven't been storing erlang-term encoded data as info,
%% so assume this is double-encoded and correct
write_info_encv(ARef, Tab, DecK, Value)
catch
error:_ ->
skip
end;
_ ->
skip
end,
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
_ ->
ok
end.
check_version(TRec) ->
user_property(mrdb_version, TRec, ?VSN).

View File

@ -69,6 +69,7 @@
, select/1
, fold/3 , fold/4 , fold/5
, rdb_fold/4 , rdb_fold/5
, rdb_rev_fold/4 , rdb_rev_fold/5
, write_info/3
, read_info/2
, read_info/1
@ -1363,6 +1364,15 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
true = valid_limit(Limit),
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
rdb_rev_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_rev_fold(Tab, Fun, Acc, Prefix, infinity).
rdb_rev_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
, is_binary(Prefix) ->
true = valid_limit(Limit),
mrdb_select:rdb_rev_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) ->
case L of
infinity ->

View File

@ -6,6 +6,7 @@
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_rev_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-export([continuation_info/2]).
@ -86,12 +87,41 @@ fold_({L, Cont}, Fun, Acc) ->
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)),
MovRes = fwd_init_seek(I, Prefix),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
first(#{vsn := 1}) -> <<?DATA_START>>;
first(_) -> first.
rdb_rev_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_rev_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, <<>>) ->
rocksdb:iterator_move(I, first);
fwd_init_seek(I, Prefix) ->
rocksdb:iterator_move(I, {seek, Prefix}).
rev_init_seek(I, <<>>) ->
rocksdb:iterator_move(I, last);
rev_init_seek(I, Prefix) ->
Tgt = case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end,
rocksdb:iterator_move(I, Tgt).
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
PfxI = binary:decode_unsigned(Pfx),
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
case PfxI + 1 of
I1 when I1 >= MaxI -> last;
I1 ->
binary:encode_unsigned(I1)
end.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
@ -104,6 +134,19 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) ->
Acc.
i_rdb_rev_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_rev_fold(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_rev_fold(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_rev_fold(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS
, limit = Limit

127
test/mrdb_fold_SUITE.erl Normal file
View File

@ -0,0 +1,127 @@
-module(mrdb_fold_SUITE).
-export([
all/0
, groups/0
, suite/0
, init_per_suite/1
, end_per_suite/1
, init_per_group/2
, end_per_group/2
, init_per_testcase/2
, end_per_testcase/2
]).
-export([ no_prefix_fwd_rdb_fold/1
, prefixed_fwd_rdb_fold/1
, no_prefix_rev_rdb_fold/1
, prefixed_rev_rdb_fold/1
, prefixed_rev_rdb_fold_max/1
]).
-include_lib("common_test/include/ct.hrl").
suite() ->
[].
all() ->
[{group, all_tests}].
groups() ->
[
{all_tests, [sequence], [ {group, rdb_fwd_fold}
, {group, rdb_rev_fold} ]}
, {rdb_fwd_fold, [sequence], [ no_prefix_fwd_rdb_fold
, prefixed_fwd_rdb_fold ]}
, {rdb_rev_fold, [sequence], [ no_prefix_rev_rdb_fold
, prefixed_rev_rdb_fold
, prefixed_rev_rdb_fold_max ]}
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(G, Config) ->
mnesia:stop(),
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
create_tab(t, G, [{type, ordered_set}]),
fill_tab(t),
Config.
end_per_group(_, _Config) ->
ok.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
create_tab(Tab, Grp, Opts) ->
GOpts = case Grp of
_ when Grp==rdb_fwd_fold; Grp==rdb_rev_fold ->
[{user_properties,
[{mrdb_encoding, {raw, raw}}]}];
_ -> []
end,
{atomic, ok} = mnesia:create_table(Tab, [{rdb, [node()]} | GOpts ++ Opts]),
ok.
no_prefix_fwd_rdb_fold(_Config) ->
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = lists:reverse(objs()),
{Res, Res} = {Res, Expected}.
prefixed_fwd_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = lists:reverse(prefixed_objs(Pfx)),
{Res, Res} = {Res, Expected}.
no_prefix_rev_rdb_fold(_Config) ->
Res = mrdb:rdb_rev_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = objs(),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_rev_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_objs(Pfx),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold_max(_Config) ->
Pfx = <<255,255>>,
Res = mrdb:rdb_rev_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_objs(Pfx),
{Res, Res} = {Res, Expected}.
simple_rdb_acc(K, V, Acc) ->
[{K,V} | Acc].
fill_tab(Tab) ->
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- objs()],
ok.
prefixed_objs(Pfx) ->
[Obj || {K,_} = Obj <- objs(),
is_prefix(Pfx, K)].
objs() ->
[ {<<"aa">> , <<"1">>}
, {<<"aa1">>, <<"2">>}
, {<<"ab">> , <<"3">>}
, {<<"ab1">>, <<"4">>}
, {<<255,255>>, <<"5">>} ].
%% copied from mrdb_select.erl
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.