Tests for fold, fold_reverse, select_reverse, etc.

This commit is contained in:
Ulf Wiger
2025-06-19 20:12:00 +02:00
parent 323fcea7a7
commit 04e6063ce0
3 changed files with 258 additions and 90 deletions
+80 -44
View File
@@ -3,10 +3,13 @@
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select_reverse/3
, select_reverse/4
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, fold_reverse/5
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_rev_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-export([continuation_info/2]).
@@ -26,18 +29,27 @@
, compiled_ms
, limit
, key_only = false % TODO: not used
, direction = forward % TODO: not used
, direction = forward
}).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit).
select(Ref, MS, false, forward, Limit).
select(Ref, MS, AccKeys, Limit)
select(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, forward, Limit).
select_reverse(Ref, MS, Limit) ->
select(Ref, MS, false, reverse, Limit).
select_reverse(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, reverse, Limit).
select(Ref, MS, AccKeys, Dir, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit),
Sel = mk_sel(Ref, MS, Dir, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref),
#sel{tab = Tab,
ref = Ref,
@@ -45,6 +57,7 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) ->
ms = MS,
compiled_ms = ets:match_spec_compile(MS),
key_only = needs_key_only(MS),
direction = Dir,
limit = Limit}.
select(Cont) ->
@@ -65,6 +78,12 @@ continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, forward, Limit).
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
{AccKeys, F} =
if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) ->
@@ -75,7 +94,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
true ->
mrdb:abort(invalid_fold_fun)
end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
fold_('$end_of_table', _, Acc) ->
Acc;
@@ -91,27 +110,29 @@ rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
rdb_rev_fold(Ref, Fun, Acc, Prefix, Limit) ->
rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_rev_fold(MovRes, I, Prefix, Fun, Acc, Limit)
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, <<>>) ->
rocksdb:iterator_move(I, first);
fwd_init_seek(I, Prefix) ->
rocksdb:iterator_move(I, {seek, Prefix}).
fwd_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
rev_init_seek(I, <<>>) ->
rocksdb:iterator_move(I, last);
rev_init_seek(I, Prefix) ->
Tgt = case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end,
rocksdb:iterator_move(I, Tgt).
fwd_init_seek_tgt(<<>> ) -> first;
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
rev_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, rev_init_seek_tgt(Pfx)).
rev_init_seek_tgt(<<>>) -> last;
rev_init_seek_tgt(Prefix) ->
case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end.
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
@@ -134,31 +155,35 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) ->
Acc.
i_rdb_rev_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_rev_fold(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_rev_fold(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_rev_fold(_, _, _, _, Acc, _) ->
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS
, limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
StartKey = case {Pfx, Vsn, Enc} of
{<<>>, 1, {sext, _}} ->
<<?DATA_START>>;
{_, _, {term, _}} ->
<<>>;
_ ->
Pfx
end,
, direction = Dir
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
{StartKey, Sel} = case Enc of
{term, _} ->
%% No defined ordering - do forward select
{first, Sel0#sel{direction = forward}};
_ ->
SK = case Dir of
forward -> fwd_init_seek_tgt(Pfx);
reverse -> rev_init_seek_tgt(Pfx)
end,
{SK, Sel0}
end,
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc).
@@ -236,7 +261,7 @@ map_vars([H|T], P) ->
map_vars([], _) ->
[].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
AccKeys, Acc) ->
case is_prefix(Pfx, K) of
true ->
@@ -245,7 +270,7 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
case ets:match_spec_run([Rec], MS) of
[] ->
select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
[Match] ->
Acc1 = if AccKeys ->
@@ -255,6 +280,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end;
false when Dir == reverse, K > Pfx ->
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
false when Limit == infinity ->
lists:reverse(Acc);
false ->
@@ -263,6 +291,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
next_or_prev(forward) -> next;
next_or_prev(reverse) -> prev.
select_return(infinity, {L, '$end_of_table'}) ->
L;
select_return(_, Ret) ->
@@ -282,29 +313,34 @@ decr(I) when is_integer(I) ->
decr(infinity) ->
infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc),
fun(sel) -> Sel;
(cont) ->
mrdb:with_rdb_iterator(
Ref,
fun(NewI) ->
select_traverse(iterator_next(NewI, K),
select_traverse(iterator_next(NewI, K, Dir),
Limit, Pfx, MS, NewI, Sel,
AccKeys, [])
end)
end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) ->
case rocksdb:iterator_move(I, K) of
iterator_next(I, K, Dir) ->
case i_move(I, K, Dir) of
{ok, K, _} ->
rocksdb:iterator_move(I, next);
rocksdb:iterator_move(I, next_or_prev(Dir));
Other ->
Other
end.
i_move(I, K, reverse) ->
rocksdb:iterator_move(I, {seek_for_prev, K});
i_move(I, K, forward) ->
rocksdb:iterator_move(I, K).
keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).