diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 9bca483..82b50a9 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) -> i_show_table(I, Move, Limit, Ref) -> case rocksdb:iterator_move(I, Move) of {ok, EncKey, EncVal} -> - {Type,Val} = - case EncKey of - << ?INFO_TAG, K/binary >> -> - K1 = decode_key(K, Ref), - V = decode_val(EncVal, K1, Ref), - {info,V}; - _ -> - K = decode_key(EncKey, Ref), - V = decode_val(EncVal, K, Ref), - {data,V} - end, - io:fwrite("~p: ~p~n", [Type, Val]), + K = decode_key(EncKey, Ref), + Val = decode_val(EncVal, K, Ref), + io:fwrite("~p~n", [Val]), i_show_table(I, next, Limit-1, Ref); _ -> ok @@ -675,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) -> slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 -> #{semantics := Sem} = Ref = get_ref(Tab), - Start = case Ref of - #{type := standalone, vsn := 1} -> <>; - _ -> first - end, - First = fun(I) -> rocksdb:iterator_move(I, Start) end, + First = fun(I) -> rocksdb:iterator_move(I, first) end, F = case Sem of bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end; _ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 0256417..3e5b481 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -250,31 +250,6 @@ get_info_res(Res, Default) -> error(E) end. -%% Admin info: metadata written by the admin proc to keep track of -%% the derived status of tables (such as detected version and encoding -%% of existing standalone tables.) -%% -write_admin_info(K, V, Alias, Name) -> - mrdb:rdb_put(get_ref({admin, Alias}), - admin_info_key(K, Name), - term_to_binary(V)). - -read_admin_info(K, Alias, Name) -> - EncK = admin_info_key(K, Name), - case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of - {ok, Bin} -> - {ok, binary_to_term(Bin)}; - _ -> - error - end. - -delete_admin_info(K, Alias, Name) -> - EncK = admin_info_key(K, Name), - mrdb:rdb_delete(get_ref({admin, Alias}), EncK). - -admin_info_key(K, Name) -> - mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext). - %% Table metadata info maintained by users %% write_info(Alias, Tab, K, V) -> @@ -285,7 +260,6 @@ write_info_(Ref, Tab, K, V) -> write_info_encv(Ref, Tab, K, V) -> EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext), - maybe_write_standalone_info(Ref, K, V), mrdb:rdb_put(Ref, EncK, V, []). delete_info(Alias, Tab, K) -> @@ -293,30 +267,8 @@ delete_info(Alias, Tab, K) -> delete_info_(Ref, Tab, K) -> EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext), - maybe_delete_standalone_info(Ref, K), mrdb:rdb_delete(Ref, EncK, []). -maybe_write_standalone_info(Ref, K, V) -> - case Ref of - #{type := standalone, vsn := 1, db_ref := DbRef} -> - EncK = mnesia_rocksdb_lib:encode_key(K, sext), - Key = <>, - EncV = mnesia_rocksdb_lib:encode_val(V, term), - rocksdb:put(DbRef, Key, EncV, []); - _ -> - ok - end. - -maybe_delete_standalone_info(Ref, K) -> - case Ref of - #{type := standalone, vsn := 1, db_ref := DbRef} -> - EncK = mnesia_rocksdb_lib:encode_key(K, sext), - Key = <>, - rocksdb:delete(DbRef, Key, []); - _ -> - ok - end. - write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 -> call(Alias, {write_table_property, Tab, Prop}). @@ -813,10 +765,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec, false -> create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St); {false, MP} -> - create_table_as_standalone(Alias, Name, true, MP, TRec, St); + create_table_as_standalone(Alias, Name, MP, TRec, St); {true, MP} -> ?log(debug, "will create ~p as standalone and migrate", [Name]), - case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of + case create_table_as_standalone(Alias, Name, MP, TRec, St) of {ok, OldTRec, _} -> ?log(info, "Migrating ~p to column family", [Name]), create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St); @@ -825,8 +777,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec, end end; create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) -> - {Exists, MP} = table_exists_as_standalone(Name), - create_table_as_standalone(Alias, Name, Exists, MP, TRec, St). + {_, MP} = table_exists_as_standalone(Name), + create_table_as_standalone(Alias, Name, MP, TRec, St). create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) -> ?log(debug, "Migrate to cf (~p)", [Name]), @@ -1106,29 +1058,12 @@ table_exists_as_standalone(Name) -> end, {Exists, MP}. -create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) -> - case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of - {ok, #{type := standalone, vsn := Vsn1, - encoding := Enc1} = Cf, _St1} = Ok -> - write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1}, - Alias, Name), - case Vsn1 of - 1 -> - load_info(Alias, Name, Cf); - _ -> - skip - end, - Ok; - Other -> - Other - end. - -create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) -> +create_table_as_standalone(Alias, Name, MP, TRec, St) -> Vsn = check_version(TRec), TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)}, - do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St). + do_open_standalone(true, Alias, Name, MP, TRec1, St). -do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0, +do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0, #st{standalone = Ts} = St) -> Opts = rocksdb_opts_from_trec(TRec0), case open_db_(MP, Alias, Opts, [], CreateIfMissing) of @@ -1138,146 +1073,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0, DbRec1 = DbRec#{ cfs => CfNames, mountpoint => MP }, TRec = maps:merge(TRec0, DbRec#{type => standalone}), - TRec1 = guess_table_vsn_and_encoding(Exists, TRec), - {ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}}; + {ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}}; {error, _} = Err -> ?log(debug, "open_db error: ~p", [Err]), Err end. -%% When opening a standalone table, chances are it's a legacy table -%% where legacy encoding is already in place. We try to read the -%% first object and apply legacy encoding. If successful, we set -%% legacy encoding in the TRec. If we migrate the data to a column -%% family, we should apply the defined encoding for the cf. -%% -%% The first object can either be an info object (in the legacy case) -%% or a data object, with a sext-encoded key, and a term_to_binary- -%% encoded object as value, where the key position is set to []. -%% The info objects would be sext-encoded key + term-encoded value. -guess_table_vsn_and_encoding(false, TRec) -> - TRec; -guess_table_vsn_and_encoding(true, #{properties := #{attributes := As}, - alias := Alias, name := Name} = R) -> - case read_admin_info(standalone_vsn_and_enc, Alias, Name) of - {ok, {V, E}} -> - R#{vsn => V, encoding => E}; - error -> - R1 = set_default_guess(R), - mrdb:with_rdb_iterator( - R1, fun(I) -> - guess_table_vsn_and_encoding_( - mrdb:rdb_iterator_move(I, first), I, As, R1) - end) - end. - -set_default_guess(#{type := standalone} = R) -> - case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of - 1 -> - R#{vsn => 1, encoding => {sext, {object, term}}}; - V -> - R#{vsn => V} - end. - -guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) -> - Arity = length(As) + 1, - case K of - <> -> - try _ = {mnesia_rocksdb_lib:decode(EncK, sext), - mnesia_rocksdb_lib:decode(V, term)}, - %% This is a vsn 1 standalone table - R#{vsn => 1, encoding => {sext, {object, term}}} - catch - error:_ -> - R - end; - _ -> - Enc = guess_obj_encoding(K, V, Arity), - R#{encoding => Enc} - end; -guess_table_vsn_and_encoding_(_Other, _, _, R) -> - R. - -guess_obj_encoding(K, V, Arity) -> - {guess_key_encoding(K), guess_val_encoding(V, Arity)}. - -guess_encoding(Bin) -> - try {sext, sext:decode(Bin)} - catch - error:_ -> - try {term, binary_to_term(Bin)} - catch - error:_ -> raw - end - end. - -guess_key_encoding(Bin) -> - case guess_encoding(Bin) of - raw -> raw; - {Enc, _} -> Enc - end. - -guess_val_encoding(Bin, Arity) -> - case guess_encoding(Bin) of - raw -> {value, raw}; - {Enc, Term} -> - if is_tuple(Term), size(Term) == Arity, - element(2, Term) == [] -> - {object, Enc}; - true -> - {value, Enc} - end - end. - %% This is slightly different from `rocksdb:is_empty/1`, since it allows %% for the presence of some metadata, and still considers it empty if there %% is no user data. table_is_empty(#{} = DbRec) -> - Start = iterator_data_start(DbRec), mrdb:with_rdb_iterator( DbRec, fun(I) -> - case mrdb:rdb_iterator_move(I, Start) of + case mrdb:rdb_iterator_move(I, first) of {ok, _, _} -> false; _ -> true end end). -iterator_data_start(#{vsn := 1}) -> <>; -iterator_data_start(_) -> first. - -load_info(Alias, Name, Cf) -> - ARef = get_ref({admin, Alias}), - mrdb:with_rdb_iterator( - Cf, fun(I) -> - load_info_(rocksdb:iterator_move(I, first), I, ARef, Name) - end). - -load_info_(Res, I, ARef, Tab) -> - case Res of - {ok, << ?INFO_TAG, K/binary >>, V} -> - DecK = mnesia_rocksdb_lib:decode_key(K), - case read_info_(ARef, Tab, DecK, undefined) of - undefined -> - write_info_encv(ARef, Tab, DecK, V); - <<131,_/binary>> = Value -> - %% Due to a previous bug, info values could be double-encoded with binary_to_term() - try binary_to_term(Value) of - _DecVal -> - %% We haven't been storing erlang-term encoded data as info, - %% so assume this is double-encoded and correct - write_info_encv(ARef, Tab, DecK, Value) - catch - error:_ -> - skip - end; - _ -> - skip - end, - load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab); - _ -> - ok - end. - check_version(TRec) -> user_property(mrdb_version, TRec, ?VSN). @@ -1607,7 +1420,6 @@ close_and_delete_standalone(#{alias := Alias, case get_table_mountpoint(Alias, Name, St) of {ok, MP} -> close_and_delete(DbRef, MP), - delete_admin_info(standalone_vsn_and_enc, Alias, Name), St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)}; error -> St diff --git a/src/mrdb.erl b/src/mrdb.erl index 74025a0..b6b186d 100644 --- a/src/mrdb.erl +++ b/src/mrdb.erl @@ -61,14 +61,17 @@ , get_batch/1 , snapshot/1 , release_snapshot/1 - , first/1 , first/2 - , next/2 , next/3 - , prev/2 , prev/3 - , last/1 , last/2 - , select/2 , select/3 + , first/1 , first/2 + , next/2 , next/3 + , prev/2 , prev/3 + , last/1 , last/2 + , select/2 , select/3 + , select_reverse/2, select_reverse/3 , select/1 - , fold/3 , fold/4 , fold/5 - , rdb_fold/4 , rdb_fold/5 + , fold/3 , fold/4 , fold/5 + , fold_reverse/3 , fold_reverse/4, fold_reverse/5 + , rdb_fold/4 , rdb_fold/5 + , rdb_fold_reverse/4, rdb_fold_reverse/5 , write_info/3 , read_info/2 , read_info/1 @@ -540,7 +543,6 @@ re_throw(Cat, Err) -> throw -> throw(Err) end. - mnesia_compatible_aborts() -> mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false). @@ -661,10 +663,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) -> inherit_ctxt(Ref, R) -> maps:merge(Ref, maps:with([snapshot, activity], R)). +%% @doc Create an iterator on table `Tab' for the duration of `Fun' +%% +%% The iterator is passed to the provided fun as `Fun(Iterator)', and is +%% closed once the fun terminates. +%% @equiv with_iterator(Tab, Fun, []) +%% @end -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res. with_iterator(Tab, Fun) -> with_iterator(Tab, Fun, []). +%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun' +%% +%% The iterator is passed to the provided fun as `Fun(Iterator)', and is +%% closed once the fun terminates. +%% +%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator +%% will return `{ok, Obj}' where `Obj' is the complete decoded object. +%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}. +%% @end -spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res. with_iterator(Tab, Fun, Opts) -> R = ensure_ref(Tab), @@ -1300,6 +1317,13 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 -> true = valid_limit(Limit), mrdb_select:select(ensure_ref(Tab), Pat, Limit). +select_reverse(Tab, Pat) -> + select_reverse(Tab, Pat, infinity). + +select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 -> + true = valid_limit(Limit), + mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit). + select(Cont) -> mrdb_select:select(Cont). @@ -1354,6 +1378,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) -> true = valid_limit(Limit), mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit). +fold_reverse(Tab, Fun, Acc) -> + fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]). + +fold_reverse(Tab, Fun, Acc, MatchSpec) -> + fold_reverse(Tab, Fun, Acc, MatchSpec, infinity). + +fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) -> + true = valid_limit(Limit), + mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit). + rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3) , is_binary(Prefix) -> rdb_fold(Tab, Fun, Acc, Prefix, infinity). @@ -1363,6 +1397,15 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3) true = valid_limit(Limit), mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit). +rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3) + , is_binary(Prefix) -> + rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity). + +rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3) + , is_binary(Prefix) -> + true = valid_limit(Limit), + mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit). + valid_limit(L) -> case L of infinity -> @@ -1374,15 +1417,7 @@ valid_limit(L) -> end. write_info(Tab, K, V) -> - R = ensure_ref(Tab), - Alias = case R of - #{type := standalone, vsn := 1, alias := A} = TRef -> - %% Also write on legacy info format - write_info_standalone(TRef, K, V), - A; - #{alias := A} -> - A - end, + #{alias := Alias} = ensure_ref(Tab), write_info_(ensure_ref({admin, Alias}), Tab, K, V). write_info_(#{} = R, Tab, K, V) -> @@ -1399,13 +1434,8 @@ read_info(Tab, K) -> read_info(Tab, K, Default) when K==size; K==memory -> read_direct_info_(ensure_ref(Tab), K, Default); read_info(Tab, K, Default) -> - #{alias := Alias} = R = ensure_ref(Tab), - case R of - #{type := standalone, vsn := 1} = TRef -> - read_info_standalone(TRef, K, Default); - #{alias := Alias} -> - mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default) - end. + #{alias := Alias} = ensure_ref(Tab), + mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default). read_direct_info_(R, memory, _Def) -> get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0); @@ -1429,26 +1459,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) -> %%rocksdb_boolean(<<"1">>) -> true; %%rocksdb_boolean(<<"0">>) -> false. -write_info_standalone(#{} = R, K, V) -> - EncK = <>, - EncV = term_to_binary(V), - rdb_put(R, EncK, EncV, write_opts(R, [])). - -read_info_standalone(#{} = R, K, Default) -> - EncK = <>, - get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default). - -get_info_res(Res, Default) -> - case Res of - not_found -> - Default; - {ok, Bin} -> - %% no fancy tricks when encoding/decoding info values - binary_to_term(Bin); - {error, E} -> - error(E) - end. - %% insert_bag_v2(Ref, K, V, Opts) -> %% rdb_merge(Ref, K, {list_append, [V]} diff --git a/src/mrdb_select.erl b/src/mrdb_select.erl index 1dbcc5a..2415746 100644 --- a/src/mrdb_select.erl +++ b/src/mrdb_select.erl @@ -1,11 +1,15 @@ %% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- -module(mrdb_select). --export([ select/3 %% (Ref, MatchSpec, Limit) - , select/4 %% (Ref, MatchSpec, AccKeys, Limit) - , select/1 %% (Cont) - , fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit) - , rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit) +-export([ select/3 %% (Ref, MatchSpec, Limit) + , select/4 %% (Ref, MatchSpec, AccKeys, Limit) + , select_reverse/3 + , select_reverse/4 + , select/1 %% (Cont) + , fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit) + , fold_reverse/5 + , rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit) + , rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit) ]). -export([continuation_info/2]). @@ -25,18 +29,27 @@ , compiled_ms , limit , key_only = false % TODO: not used - , direction = forward % TODO: not used + , direction = forward }). select(Ref, MS, Limit) when is_map(Ref), is_list(MS) -> - select(Ref, MS, false, Limit). + select(Ref, MS, false, forward, Limit). -select(Ref, MS, AccKeys, Limit) +select(Ref, MS, AccKeys, Limit) -> + select(Ref, MS, AccKeys, forward, Limit). + +select_reverse(Ref, MS, Limit) -> + select(Ref, MS, false, reverse, Limit). + +select_reverse(Ref, MS, AccKeys, Limit) -> + select(Ref, MS, AccKeys, reverse, Limit). + +select(Ref, MS, AccKeys, Dir, Limit) when is_map(Ref), is_list(MS), is_boolean(AccKeys) -> - Sel = mk_sel(Ref, MS, Limit), + Sel = mk_sel(Ref, MS, Dir, Limit), mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end). -mk_sel(#{name := Tab} = Ref, MS, Limit) -> +mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) -> Keypat = keypat(MS, keypos(Tab), Ref), #sel{tab = Tab, ref = Ref, @@ -44,6 +57,7 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) -> ms = MS, compiled_ms = ets:match_spec_compile(MS), key_only = needs_key_only(MS), + direction = Dir, limit = Limit}. select(Cont) -> @@ -64,6 +78,12 @@ continuation_info_(direction, #sel{direction = Dir}) -> Dir; continuation_info_(_, _) -> undefined. fold(Ref, Fun, Acc, MS, Limit) -> + do_fold(Ref, Fun, Acc, MS, forward, Limit). + +fold_reverse(Ref, Fun, Acc, MS, Limit) -> + do_fold(Ref, Fun, Acc, MS, reverse, Limit). + +do_fold(Ref, Fun, Acc, MS, Dir, Limit) -> {AccKeys, F} = if is_function(Fun, 3) -> {true, fun({K, Obj}, Acc1) -> @@ -74,7 +94,7 @@ fold(Ref, Fun, Acc, MS, Limit) -> true -> mrdb:abort(invalid_fold_fun) end, - fold_(select(Ref, MS, AccKeys, Limit), F, Acc). + fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc). fold_('$end_of_table', _, Acc) -> Acc; @@ -86,12 +106,43 @@ fold_({L, Cont}, Fun, Acc) -> rdb_fold(Ref, Fun, Acc, Prefix, Limit) -> mrdb:with_rdb_iterator( Ref, fun(I) -> - MovRes = rocksdb:iterator_move(I, first(Ref)), + MovRes = fwd_init_seek(I, Prefix), i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit) end). -first(#{vsn := 1}) -> <>; -first(_) -> first. +rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) -> + mrdb:with_rdb_iterator( + Ref, fun(I) -> + MovRes = rev_init_seek(I, Prefix), + i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit) + end). + +fwd_init_seek(I, Pfx) -> + rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)). + +fwd_init_seek_tgt(<<>> ) -> first; +fwd_init_seek_tgt(Prefix) -> {seek, Prefix}. + +rev_init_seek(I, Pfx) -> + rocksdb:iterator_move(I, rev_init_seek_tgt(Pfx)). + +rev_init_seek_tgt(<<>>) -> last; +rev_init_seek_tgt(Prefix) -> + case incr_prefix(Prefix) of + last -> last; + Pfx1 when is_binary(Pfx1) -> + {seek, Pfx1} + end. + +incr_prefix(<<>>) -> last; +incr_prefix(Pfx) when is_binary(Pfx) -> + PfxI = binary:decode_unsigned(Pfx), + MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1, + case PfxI + 1 of + I1 when I1 >= MaxI -> last; + I1 -> + binary:encode_unsigned(I1) + end. i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 -> case is_prefix(Pfx, K) of @@ -104,18 +155,35 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 -> i_rdb_fold(_, _, _, _, Acc, _) -> Acc. +i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 -> + case is_prefix(Pfx, K) of + true -> + i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, + Fun(K, V, Acc), decr(Limit)); + false when K > Pfx -> + i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit); + false -> + Acc + end; +i_rdb_fold_reverse(_, _, _, _, Acc, _) -> + Acc. + i_select(I, #sel{ keypat = Pfx , compiled_ms = MS , limit = Limit - , ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) -> - StartKey = case {Pfx, Vsn, Enc} of - {<<>>, 1, {sext, _}} -> - <>; - {_, _, {term, _}} -> - <<>>; - _ -> - Pfx - end, + , direction = Dir + , ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) -> + {StartKey, Sel} = case Enc of + {term, _} -> + %% No defined ordering - do forward select + {first, Sel0#sel{direction = forward}}; + _ -> + SK = case Dir of + forward -> fwd_init_seek_tgt(Pfx); + reverse -> rev_init_seek_tgt(Pfx) + end, + {SK, Sel0} + end, select_traverse(rocksdb:iterator_move(I, StartKey), Limit, Pfx, MS, I, Sel, AccKeys, Acc). @@ -193,7 +261,7 @@ map_vars([H|T], P) -> map_vars([], _) -> []. -select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, +select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel, AccKeys, Acc) -> case is_prefix(Pfx, K) of true -> @@ -202,7 +270,7 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, case ets:match_spec_run([Rec], MS) of [] -> select_traverse( - rocksdb:iterator_move(I, next), Limit, Pfx, MS, + rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc); [Match] -> Acc1 = if AccKeys -> @@ -212,6 +280,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, end, traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) end; + false when Dir == reverse, K > Pfx -> + select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS, + I, Sel, AccKeys, Acc); false when Limit == infinity -> lists:reverse(Acc); false -> @@ -220,6 +291,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel, select_traverse({error, _}, Limit, _, _, _, _, _, Acc) -> select_return(Limit, {lists:reverse(Acc), '$end_of_table'}). +next_or_prev(forward) -> next; +next_or_prev(reverse) -> prev. + select_return(infinity, {L, '$end_of_table'}) -> L; select_return(_, Ret) -> @@ -239,29 +313,34 @@ decr(I) when is_integer(I) -> decr(infinity) -> infinity. -traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> +traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) -> {lists:reverse(Acc), fun(sel) -> Sel; (cont) -> mrdb:with_rdb_iterator( Ref, fun(NewI) -> - select_traverse(iterator_next(NewI, K), + select_traverse(iterator_next(NewI, K, Dir), Limit, Pfx, MS, NewI, Sel, AccKeys, []) end) end}; -traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> - select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). +traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) -> + select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc). -iterator_next(I, K) -> - case rocksdb:iterator_move(I, K) of +iterator_next(I, K, Dir) -> + case i_move(I, K, Dir) of {ok, K, _} -> - rocksdb:iterator_move(I, next); + rocksdb:iterator_move(I, next_or_prev(Dir)); Other -> Other end. +i_move(I, K, reverse) -> + rocksdb:iterator_move(I, {seek_for_prev, K}); +i_move(I, K, forward) -> + rocksdb:iterator_move(I, K). + keypat([H|T], KeyPos, Ref) -> keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)). diff --git a/test/mrdb_fold_SUITE.erl b/test/mrdb_fold_SUITE.erl new file mode 100644 index 0000000..f5f1546 --- /dev/null +++ b/test/mrdb_fold_SUITE.erl @@ -0,0 +1,226 @@ +-module(mrdb_fold_SUITE). + +-export([ + all/0 + , groups/0 + , suite/0 + , init_per_suite/1 + , end_per_suite/1 + , init_per_group/2 + , end_per_group/2 + , init_per_testcase/2 + , end_per_testcase/2 + ]). + +-export([ + no_prefix_fwd_rdb_fold/1 + , prefixed_fwd_rdb_fold/1 + , no_prefix_rev_rdb_fold/1 + , prefixed_rev_rdb_fold/1 + , prefixed_rev_rdb_fold_max/1 + , fwd_fold/1 + , filtered_fwd_fold/1 + , rev_fold/1 + , filtered_rev_fold/1 + , fwd_select/1 + , rev_select/1 + , select_cont/1 + , rev_select_cont/1 + ]). + +-include_lib("common_test/include/ct.hrl"). + +-record(r, {k, v}). + +suite() -> + []. + +all() -> + [{group, all_tests}]. + +groups() -> + [ + {all_tests, [sequence], [ {group, rdb_fold} + , {group, fold} + , {group, select} ]} + , {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold + , prefixed_fwd_rdb_fold + , no_prefix_rev_rdb_fold + , prefixed_rev_rdb_fold + , prefixed_rev_rdb_fold_max ]} + , {fold, [sequence], [ fwd_fold + , filtered_fwd_fold + , rev_fold + , filtered_rev_fold ]} + , {select, [sequence], [ fwd_select + , rev_select + , select_cont + , rev_select_cont ]} + ]. + +init_per_suite(Config) -> + mnesia:stop(), + ok = mnesia_rocksdb_tlib:start_mnesia(reset), + Config. + +end_per_suite(_Config) -> + mnesia:stop(), + ok. + +init_per_group(G, Config) when G==rdb_fold; G==fold -> + mk_tab(G), + Config; +init_per_group(_, Config) -> + Config. + +end_per_group(_, _Config) -> + ok. + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. + +mk_tab(G) -> + T = tab_name(G), + case create_tab(T) of + {atomic, ok} -> fill_tab(T); + {aborted,{already_exists,T}} -> ok + end. + +tab_name(fold ) -> r; +tab_name(select ) -> r; +tab_name(rdb_fold) -> t. + +create_tab(T) -> + Opts = tab_opts(T), + mnesia:create_table(T, [{rdb, [node()]} | Opts]). + +tab_opts(r) -> + [{attributes, [k, v]}, {type, ordered_set}]; +tab_opts(t) -> + [{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}]. + +no_prefix_fwd_rdb_fold(_Config) -> + Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity), + Expected = lists:reverse(raw_objs()), + {Res, Res} = {Res, Expected}. + +fwd_fold(_Config) -> + Res = mrdb:fold(r, fun simple_acc/2, []), + Expected = lists:reverse(objs()), + {Res, Res} = {Res, Expected}. + +prefixed_fwd_rdb_fold(_Config) -> + Pfx = <<"aa">>, + Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity), + Expected = lists:reverse(prefixed_raw_objs(Pfx)), + {Res, Res} = {Res, Expected}. + +filtered_fwd_fold(_Config) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity), + Expected = lists:reverse(filtered_objs(MS)), + {Res, Res} = {Res, Expected}. + +no_prefix_rev_rdb_fold(_Config) -> + Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity), + Expected = raw_objs(), + {Res, Res} = {Res, Expected}. + +rev_fold(_Config) -> + Res = mrdb:fold_reverse(r, fun simple_acc/2, []), + Expected = objs(), + {Res, Res} = {Res, Expected}. + +prefixed_rev_rdb_fold(_Config) -> + Pfx = <<"aa">>, + Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity), + Expected = prefixed_raw_objs(Pfx), + {Res, Res} = {Res, Expected}. + +filtered_rev_fold(_Config) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity), + Expected = filtered_objs(MS), + {Res, Res} = {Res, Expected}. + +prefixed_rev_rdb_fold_max(_Config) -> + Pfx = <<255,255>>, + Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity), + Expected = prefixed_raw_objs(Pfx), + {Res, Res} = {Res, Expected}. + +fwd_select(_Config) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Res = mrdb:select(r, MS), + Expected = filtered_objs(MS), + {Res, Res} = {Res, Expected}. + +rev_select(_Config) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Res = mrdb:select_reverse(r, MS), + Expected = lists:reverse(filtered_objs(MS)), + {Res, Res} = {Res, Expected}. + +select_cont(_Cont) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Expected = filtered_objs(MS), + select_cont_loop(mrdb:select(r, MS, 1), Expected). + +rev_select_cont(_Cont) -> + MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}], + Expected = lists:reverse(filtered_objs(MS)), + select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected). + +select_cont_loop({[Obj], Cont}, [Obj|Rest]) -> + select_cont_loop(mrdb:select(Cont), Rest); +select_cont_loop({[], '$end_of_table'}, []) -> + ok. + +simple_acc(#r{} = Obj, Acc) -> + [Obj | Acc]. + +simple_rdb_acc(K, V, Acc) -> + [{K,V} | Acc]. + +fill_tab(t = Tab) -> + [mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()], + ok; +fill_tab(r = Tab) -> + [mrdb:insert(Tab, Obj) || Obj <- objs()], + ok. + +prefixed_raw_objs(Pfx) -> + [Obj || {K,_} = Obj <- raw_objs(), + is_prefix(Pfx, K)]. + +raw_objs() -> + [ {<<"aa">> , <<"1">>} + , {<<"aa1">>, <<"2">>} + , {<<"ab">> , <<"3">>} + , {<<"ab1">>, <<"4">>} + , {<<255,255>>, <<"5">>} ]. + +filtered_objs(MS) -> + MSC = ets:match_spec_compile(MS), + ets:match_spec_run(objs(), MSC). + +objs() -> + [ #r{k = {a,a,1}, v = 1} + , #r{k = {a,b,2}, v = 2} + , #r{k = {a,b,3}, v = 3} + , #r{k = {a,c,4}, v = 4} + , #r{k = {b,b,5}, v = 5} + ]. + +%% copied from mrdb_select.erl +is_prefix(A, B) when is_binary(A), is_binary(B) -> + Sa = byte_size(A), + case B of + <> -> + true; + _ -> + false + end.