Merge pull request 'Add mrdb_index:select() et al' (#7) from uw-index-select into master
Reviewed-on: #7
This commit is contained in:
commit
e0542f7f52
@ -9,6 +9,10 @@
|
||||
, fold/4
|
||||
, rev_fold/4
|
||||
, index_ref/2
|
||||
, select/3
|
||||
, select/4
|
||||
, select_reverse/3
|
||||
, select_reverse/4
|
||||
]).
|
||||
|
||||
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
|
||||
@ -33,6 +37,11 @@
|
||||
|
||||
-export_type([ ix_iterator/0 ]).
|
||||
|
||||
-spec index_ref(mrdb:ref_or_tab(), mrdb:index_position()) -> mrdb:db_ref().
|
||||
index_ref(Tab, Ix) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Ix, R).
|
||||
|
||||
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
|
||||
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
|
||||
{ok, I} = iterator(Tab, IxPos),
|
||||
@ -88,6 +97,26 @@ fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
|
||||
iter_fold(I, Start, Dir, FoldFun, Acc)
|
||||
end).
|
||||
|
||||
select(Tab, Ix, MS) ->
|
||||
select(Tab, Ix, MS, infinity).
|
||||
|
||||
select(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
select_reverse(Tab, Ix, MS) ->
|
||||
select_reverse(Tab, Ix, MS, infinity).
|
||||
|
||||
select_reverse(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select_reverse(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
iter_fold(I, Start, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
|
||||
|
||||
@ -96,10 +125,6 @@ iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
|
||||
iter_fold_({error, _}, _, _, _, Acc) ->
|
||||
Acc.
|
||||
|
||||
index_ref(Tab, Pos) ->
|
||||
TRef = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Pos, TRef).
|
||||
|
||||
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
case mrdb:iterator_move(I, Dir) of
|
||||
{ok, {{FKey, PKey}}} ->
|
||||
@ -121,6 +146,29 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
Other
|
||||
end.
|
||||
|
||||
pre_match_spec([{{KeyPat,ObjPat}, Gs, MatchBody} | T]) ->
|
||||
[{{{KeyPat,'_'},ObjPat}, Gs, MatchBody} | pre_match_spec(T)];
|
||||
pre_match_spec([H|T]) ->
|
||||
[H | pre_match_spec(T)];
|
||||
pre_match_spec([]) ->
|
||||
[].
|
||||
|
||||
%% Used for mrdb_select:select()
|
||||
%% The select operation folds over index keys, and for matching keys,
|
||||
%% calls the `derive_obj_f/1` fun, which normally just does `Obj -> [Obj]`.
|
||||
%% In the case of indexes, it fetches the object, if it exists, and then
|
||||
%% returns `[{IndexKey, Object}]`, which is what the matchspec should operate on.
|
||||
%%
|
||||
mk_derive_obj_f(Ref) ->
|
||||
fun({{IxK, Key}}) ->
|
||||
case mrdb:read(Ref, Key, []) of
|
||||
[Obj] ->
|
||||
[{IxK, Obj}];
|
||||
[] ->
|
||||
[]
|
||||
end
|
||||
end.
|
||||
|
||||
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
|
||||
opt_read(R, Key) ->
|
||||
case mrdb:read(R, Key, []) of
|
||||
|
@ -30,6 +30,8 @@
|
||||
, limit
|
||||
, key_only = false % TODO: not used
|
||||
, direction = forward
|
||||
, pre_ms
|
||||
, derive_obj_f = fun unit_l/1
|
||||
}).
|
||||
|
||||
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
|
||||
@ -50,15 +52,18 @@ select(Ref, MS, AccKeys, Dir, Limit)
|
||||
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
|
||||
|
||||
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
|
||||
Keypat = keypat(MS, keypos(Tab), Ref),
|
||||
MSpre = maps:get(pre_ms, Ref, MS),
|
||||
Keypat = keypat(MSpre, keypos(Tab), Ref),
|
||||
#sel{tab = Tab,
|
||||
ref = Ref,
|
||||
keypat = Keypat,
|
||||
ms = MS,
|
||||
compiled_ms = ets:match_spec_compile(MS),
|
||||
ms = MSpre,
|
||||
pre_ms = MSpre,
|
||||
compiled_ms = ms_compile(MS),
|
||||
key_only = needs_key_only(MS),
|
||||
direction = Dir,
|
||||
limit = Limit}.
|
||||
limit = Limit,
|
||||
derive_obj_f = derive_f(Ref)}.
|
||||
|
||||
select(Cont) ->
|
||||
case Cont of
|
||||
@ -73,6 +78,7 @@ continuation_info(_, _) -> undefined.
|
||||
|
||||
continuation_info_(ref, #sel{ref = Ref}) -> Ref;
|
||||
continuation_info_(ms, #sel{ms = MS }) -> MS;
|
||||
continuation_info_(pre_ms, #sel{pre_ms = MSp}) -> MSp;
|
||||
continuation_info_(limit, #sel{limit = L }) -> L;
|
||||
continuation_info_(direction, #sel{direction = Dir}) -> Dir;
|
||||
continuation_info_(_, _) -> undefined.
|
||||
@ -124,7 +130,21 @@ fwd_init_seek_tgt(<<>> ) -> first;
|
||||
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
|
||||
|
||||
rev_init_seek(I, Pfx) ->
|
||||
rocksdb:iterator_move(I, rev_init_seek_tgt(Pfx)).
|
||||
case rev_init_seek_tgt(Pfx) of
|
||||
last ->
|
||||
i_move(I, last);
|
||||
{seek, Bin} ->
|
||||
%% An 'incremented' prefix.
|
||||
%% This will fail if we seek past the end of the table.
|
||||
%% Then, try to seek_for_prev instead (fails if table empty).
|
||||
%% This because rocksdb lacks a "seek backward to last matching prefix".
|
||||
case i_move(I, {seek, Bin}) of
|
||||
{error, invalid_iterator} ->
|
||||
i_move(I, {seek_for_prev, Bin});
|
||||
{ok, _, _} = Ok ->
|
||||
Ok
|
||||
end
|
||||
end.
|
||||
|
||||
rev_init_seek_tgt(<<>>) -> last;
|
||||
rev_init_seek_tgt(Prefix) ->
|
||||
@ -173,19 +193,19 @@ i_select(I, #sel{ keypat = Pfx
|
||||
, limit = Limit
|
||||
, direction = Dir
|
||||
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
|
||||
{StartKey, Sel} = case Enc of
|
||||
{term, _} ->
|
||||
{MoveRes, Sel} = case Enc of
|
||||
{term, _} ->
|
||||
%% No defined ordering - do forward select
|
||||
{first, Sel0#sel{direction = forward}};
|
||||
_ ->
|
||||
SK = case Dir of
|
||||
forward -> fwd_init_seek_tgt(Pfx);
|
||||
reverse -> rev_init_seek_tgt(Pfx)
|
||||
end,
|
||||
{SK, Sel0}
|
||||
end,
|
||||
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
|
||||
Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
{i_move(I, first), Sel0#sel{direction = forward}};
|
||||
_ ->
|
||||
case Dir of
|
||||
forward ->
|
||||
{fwd_init_seek(I, Pfx), Sel0};
|
||||
reverse ->
|
||||
{rev_init_seek(I, Pfx), Sel0}
|
||||
end
|
||||
end,
|
||||
select_traverse(MoveRes, Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
|
||||
needs_key_only([Pat]) ->
|
||||
needs_key_only_(Pat);
|
||||
@ -266,8 +286,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} =
|
||||
case is_prefix(Pfx, K) of
|
||||
true ->
|
||||
DecKey = decode_key(K, R),
|
||||
Rec = decode_val(V, DecKey, R),
|
||||
case ets:match_spec_run([Rec], MS) of
|
||||
Rec0 = decode_val(V, DecKey, R),
|
||||
RecL = derive_object(Rec0, Sel),
|
||||
case ms_run(RecL, MS) of
|
||||
[] ->
|
||||
select_traverse(
|
||||
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
|
||||
@ -336,11 +357,17 @@ iterator_next(I, K, Dir) ->
|
||||
Other
|
||||
end.
|
||||
|
||||
i_move(I, Tgt) ->
|
||||
rocksdb:iterator_move(I, Tgt).
|
||||
|
||||
i_move(I, K, reverse) ->
|
||||
rocksdb:iterator_move(I, {seek_for_prev, K});
|
||||
i_move(I, K, forward) ->
|
||||
rocksdb:iterator_move(I, K).
|
||||
|
||||
derive_object(R, #sel{derive_obj_f = F}) ->
|
||||
F(R).
|
||||
|
||||
keypat([H|T], KeyPos, Ref) ->
|
||||
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
|
||||
|
||||
@ -362,3 +389,14 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadP
|
||||
keypat_pfx(_, _, _) ->
|
||||
<<>>.
|
||||
|
||||
derive_f(#{derive_obj_f := F}) when is_function(F, 1) -> F;
|
||||
derive_f(_) -> fun unit_l/1.
|
||||
|
||||
unit_l(X) ->
|
||||
[X].
|
||||
|
||||
ms_compile(MS) ->
|
||||
ets:match_spec_compile(MS).
|
||||
|
||||
ms_run(RecL, MS) ->
|
||||
ets:match_spec_run(RecL, MS).
|
||||
|
@ -236,12 +236,25 @@ test_index_plugin(Config) ->
|
||||
if Type == rdb ->
|
||||
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
|
||||
ok = test_select(Tab,{pfx},[{'_', [], ['$_']}]),
|
||||
MS2 = [{{<<"whi">>,{Tab,"truth",'_'}},[],['$_']}],
|
||||
[_] = mrdb_index:select(Tab, {pfx}, MS2),
|
||||
ok = test_select(Tab, {pfx}, MS2),
|
||||
[{Tab,"foobar","sentence"}] = mrdb:index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
test_select(Tab, Ix, MS) ->
|
||||
Res = mrdb_index:select(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, Res]),
|
||||
RevRes = mrdb_index:select_reverse(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select_reverse(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, RevRes]),
|
||||
{Res,Res} = {Res, lists:reverse(RevRes)},
|
||||
ok.
|
||||
|
||||
|
||||
ixtype(T) when T==bag;
|
||||
T==ordered ->
|
||||
{{pfx}, T};
|
||||
|
Loading…
x
Reference in New Issue
Block a user