Compare commits
No commits in common. "master" and "uw-stacktraces" have entirely different histories.
master
...
uw-stacktr
@ -262,9 +262,18 @@ i_show_table(_, _, 0, _) ->
|
||||
i_show_table(I, Move, Limit, Ref) ->
|
||||
case rocksdb:iterator_move(I, Move) of
|
||||
{ok, EncKey, EncVal} ->
|
||||
K = decode_key(EncKey, Ref),
|
||||
Val = decode_val(EncVal, K, Ref),
|
||||
io:fwrite("~p~n", [Val]),
|
||||
{Type,Val} =
|
||||
case EncKey of
|
||||
<< ?INFO_TAG, K/binary >> ->
|
||||
K1 = decode_key(K, Ref),
|
||||
V = decode_val(EncVal, K1, Ref),
|
||||
{info,V};
|
||||
_ ->
|
||||
K = decode_key(EncKey, Ref),
|
||||
V = decode_val(EncVal, K, Ref),
|
||||
{data,V}
|
||||
end,
|
||||
io:fwrite("~p: ~p~n", [Type, Val]),
|
||||
i_show_table(I, next, Limit-1, Ref);
|
||||
_ ->
|
||||
ok
|
||||
@ -666,7 +675,11 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
|
||||
|
||||
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
|
||||
#{semantics := Sem} = Ref = get_ref(Tab),
|
||||
First = fun(I) -> rocksdb:iterator_move(I, first) end,
|
||||
Start = case Ref of
|
||||
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
|
||||
_ -> first
|
||||
end,
|
||||
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
|
||||
F = case Sem of
|
||||
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
|
||||
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end
|
||||
|
@ -250,6 +250,31 @@ get_info_res(Res, Default) ->
|
||||
error(E)
|
||||
end.
|
||||
|
||||
%% Admin info: metadata written by the admin proc to keep track of
|
||||
%% the derived status of tables (such as detected version and encoding
|
||||
%% of existing standalone tables.)
|
||||
%%
|
||||
write_admin_info(K, V, Alias, Name) ->
|
||||
mrdb:rdb_put(get_ref({admin, Alias}),
|
||||
admin_info_key(K, Name),
|
||||
term_to_binary(V)).
|
||||
|
||||
read_admin_info(K, Alias, Name) ->
|
||||
EncK = admin_info_key(K, Name),
|
||||
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
|
||||
{ok, Bin} ->
|
||||
{ok, binary_to_term(Bin)};
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
delete_admin_info(K, Alias, Name) ->
|
||||
EncK = admin_info_key(K, Name),
|
||||
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
|
||||
|
||||
admin_info_key(K, Name) ->
|
||||
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
|
||||
|
||||
%% Table metadata info maintained by users
|
||||
%%
|
||||
write_info(Alias, Tab, K, V) ->
|
||||
@ -260,6 +285,7 @@ write_info_(Ref, Tab, K, V) ->
|
||||
|
||||
write_info_encv(Ref, Tab, K, V) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
||||
maybe_write_standalone_info(Ref, K, V),
|
||||
mrdb:rdb_put(Ref, EncK, V, []).
|
||||
|
||||
delete_info(Alias, Tab, K) ->
|
||||
@ -267,8 +293,30 @@ delete_info(Alias, Tab, K) ->
|
||||
|
||||
delete_info_(Ref, Tab, K) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
|
||||
maybe_delete_standalone_info(Ref, K),
|
||||
mrdb:rdb_delete(Ref, EncK, []).
|
||||
|
||||
maybe_write_standalone_info(Ref, K, V) ->
|
||||
case Ref of
|
||||
#{type := standalone, vsn := 1, db_ref := DbRef} ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
|
||||
Key = <<?INFO_TAG, EncK/binary>>,
|
||||
EncV = mnesia_rocksdb_lib:encode_val(V, term),
|
||||
rocksdb:put(DbRef, Key, EncV, []);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
maybe_delete_standalone_info(Ref, K) ->
|
||||
case Ref of
|
||||
#{type := standalone, vsn := 1, db_ref := DbRef} ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
|
||||
Key = <<?INFO_TAG, EncK/binary>>,
|
||||
rocksdb:delete(DbRef, Key, []);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
||||
call(Alias, {write_table_property, Tab, Prop}).
|
||||
|
||||
@ -765,10 +813,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
false ->
|
||||
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
|
||||
{false, MP} ->
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St);
|
||||
create_table_as_standalone(Alias, Name, true, MP, TRec, St);
|
||||
{true, MP} ->
|
||||
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
||||
case create_table_as_standalone(Alias, Name, MP, TRec, St) of
|
||||
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
|
||||
{ok, OldTRec, _} ->
|
||||
?log(info, "Migrating ~p to column family", [Name]),
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
||||
@ -777,8 +825,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
end
|
||||
end;
|
||||
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
||||
{_, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St).
|
||||
{Exists, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
|
||||
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
||||
?log(debug, "Migrate to cf (~p)", [Name]),
|
||||
@ -1058,12 +1106,29 @@ table_exists_as_standalone(Name) ->
|
||||
end,
|
||||
{Exists, MP}.
|
||||
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St) ->
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
|
||||
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
|
||||
{ok, #{type := standalone, vsn := Vsn1,
|
||||
encoding := Enc1} = Cf, _St1} = Ok ->
|
||||
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
|
||||
Alias, Name),
|
||||
case Vsn1 of
|
||||
1 ->
|
||||
load_info(Alias, Name, Cf);
|
||||
_ ->
|
||||
skip
|
||||
end,
|
||||
Ok;
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
|
||||
Vsn = check_version(TRec),
|
||||
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
|
||||
do_open_standalone(true, Alias, Name, MP, TRec1, St).
|
||||
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St).
|
||||
|
||||
do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
|
||||
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
|
||||
#st{standalone = Ts} = St) ->
|
||||
Opts = rocksdb_opts_from_trec(TRec0),
|
||||
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
|
||||
@ -1073,24 +1138,146 @@ do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
|
||||
DbRec1 = DbRec#{ cfs => CfNames,
|
||||
mountpoint => MP },
|
||||
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
|
||||
{ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
||||
TRec1 = guess_table_vsn_and_encoding(Exists, TRec),
|
||||
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
||||
{error, _} = Err ->
|
||||
?log(debug, "open_db error: ~p", [Err]),
|
||||
Err
|
||||
end.
|
||||
|
||||
%% When opening a standalone table, chances are it's a legacy table
|
||||
%% where legacy encoding is already in place. We try to read the
|
||||
%% first object and apply legacy encoding. If successful, we set
|
||||
%% legacy encoding in the TRec. If we migrate the data to a column
|
||||
%% family, we should apply the defined encoding for the cf.
|
||||
%%
|
||||
%% The first object can either be an info object (in the legacy case)
|
||||
%% or a data object, with a sext-encoded key, and a term_to_binary-
|
||||
%% encoded object as value, where the key position is set to [].
|
||||
%% The info objects would be sext-encoded key + term-encoded value.
|
||||
guess_table_vsn_and_encoding(false, TRec) ->
|
||||
TRec;
|
||||
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
|
||||
alias := Alias, name := Name} = R) ->
|
||||
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
|
||||
{ok, {V, E}} ->
|
||||
R#{vsn => V, encoding => E};
|
||||
error ->
|
||||
R1 = set_default_guess(R),
|
||||
mrdb:with_rdb_iterator(
|
||||
R1, fun(I) ->
|
||||
guess_table_vsn_and_encoding_(
|
||||
mrdb:rdb_iterator_move(I, first), I, As, R1)
|
||||
end)
|
||||
end.
|
||||
|
||||
set_default_guess(#{type := standalone} = R) ->
|
||||
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
|
||||
1 ->
|
||||
R#{vsn => 1, encoding => {sext, {object, term}}};
|
||||
V ->
|
||||
R#{vsn => V}
|
||||
end.
|
||||
|
||||
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
|
||||
Arity = length(As) + 1,
|
||||
case K of
|
||||
<<?INFO_TAG, EncK/binary>> ->
|
||||
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
|
||||
mnesia_rocksdb_lib:decode(V, term)},
|
||||
%% This is a vsn 1 standalone table
|
||||
R#{vsn => 1, encoding => {sext, {object, term}}}
|
||||
catch
|
||||
error:_ ->
|
||||
R
|
||||
end;
|
||||
_ ->
|
||||
Enc = guess_obj_encoding(K, V, Arity),
|
||||
R#{encoding => Enc}
|
||||
end;
|
||||
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
|
||||
R.
|
||||
|
||||
guess_obj_encoding(K, V, Arity) ->
|
||||
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
|
||||
|
||||
guess_encoding(Bin) ->
|
||||
try {sext, sext:decode(Bin)}
|
||||
catch
|
||||
error:_ ->
|
||||
try {term, binary_to_term(Bin)}
|
||||
catch
|
||||
error:_ -> raw
|
||||
end
|
||||
end.
|
||||
|
||||
guess_key_encoding(Bin) ->
|
||||
case guess_encoding(Bin) of
|
||||
raw -> raw;
|
||||
{Enc, _} -> Enc
|
||||
end.
|
||||
|
||||
guess_val_encoding(Bin, Arity) ->
|
||||
case guess_encoding(Bin) of
|
||||
raw -> {value, raw};
|
||||
{Enc, Term} ->
|
||||
if is_tuple(Term), size(Term) == Arity,
|
||||
element(2, Term) == [] ->
|
||||
{object, Enc};
|
||||
true ->
|
||||
{value, Enc}
|
||||
end
|
||||
end.
|
||||
|
||||
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
|
||||
%% for the presence of some metadata, and still considers it empty if there
|
||||
%% is no user data.
|
||||
table_is_empty(#{} = DbRec) ->
|
||||
Start = iterator_data_start(DbRec),
|
||||
mrdb:with_rdb_iterator(
|
||||
DbRec, fun(I) ->
|
||||
case mrdb:rdb_iterator_move(I, first) of
|
||||
case mrdb:rdb_iterator_move(I, Start) of
|
||||
{ok, _, _} -> false;
|
||||
_ -> true
|
||||
end
|
||||
end).
|
||||
|
||||
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
|
||||
iterator_data_start(_) -> first.
|
||||
|
||||
load_info(Alias, Name, Cf) ->
|
||||
ARef = get_ref({admin, Alias}),
|
||||
mrdb:with_rdb_iterator(
|
||||
Cf, fun(I) ->
|
||||
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
|
||||
end).
|
||||
|
||||
load_info_(Res, I, ARef, Tab) ->
|
||||
case Res of
|
||||
{ok, << ?INFO_TAG, K/binary >>, V} ->
|
||||
DecK = mnesia_rocksdb_lib:decode_key(K),
|
||||
case read_info_(ARef, Tab, DecK, undefined) of
|
||||
undefined ->
|
||||
write_info_encv(ARef, Tab, DecK, V);
|
||||
<<131,_/binary>> = Value ->
|
||||
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
|
||||
try binary_to_term(Value) of
|
||||
_DecVal ->
|
||||
%% We haven't been storing erlang-term encoded data as info,
|
||||
%% so assume this is double-encoded and correct
|
||||
write_info_encv(ARef, Tab, DecK, Value)
|
||||
catch
|
||||
error:_ ->
|
||||
skip
|
||||
end;
|
||||
_ ->
|
||||
skip
|
||||
end,
|
||||
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
check_version(TRec) ->
|
||||
user_property(mrdb_version, TRec, ?VSN).
|
||||
|
||||
@ -1302,26 +1489,24 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
|
||||
%% not yet created
|
||||
CFs = cfs(CFs0),
|
||||
file:make_dir(MP),
|
||||
OpenRes = rocksdb_open(MP, Opts, CFs),
|
||||
OpenOpts = [ {create_if_missing, true}
|
||||
, {create_missing_column_families, true}
|
||||
, {merge_operator, erlang_merge_operator}
|
||||
| Opts ],
|
||||
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
|
||||
map_cfs(OpenRes, CFs, Alias, Acc0);
|
||||
false ->
|
||||
{error, enoent};
|
||||
true ->
|
||||
%% Assumption: even an old rocksdb database file will have at least "default"
|
||||
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
|
||||
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
|
||||
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
|
||||
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
||||
end.
|
||||
|
||||
open_opts(Opts) ->
|
||||
[ {create_if_missing, true}
|
||||
, {create_missing_column_families, true}
|
||||
, {merge_operator, erlang_merge_operator}
|
||||
| Opts ].
|
||||
|
||||
rocksdb_open(MP, Opts, CFs) ->
|
||||
%% rocksdb:open(MP, Opts, CFs),
|
||||
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
|
||||
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
|
||||
|
||||
is_open(Alias, #st{backends = Bs}) ->
|
||||
case maps:find(Alias, Bs) of
|
||||
@ -1420,6 +1605,7 @@ close_and_delete_standalone(#{alias := Alias,
|
||||
case get_table_mountpoint(Alias, Name, St) of
|
||||
{ok, MP} ->
|
||||
close_and_delete(DbRef, MP),
|
||||
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
|
||||
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
|
||||
error ->
|
||||
St
|
||||
|
114
src/mrdb.erl
114
src/mrdb.erl
@ -53,7 +53,6 @@
|
||||
, delete/2 , delete/3
|
||||
, delete_object/2, delete_object/3
|
||||
, match_delete/2
|
||||
, merge/3 , merge/4
|
||||
, clear_table/1
|
||||
, batch_write/2 , batch_write/3
|
||||
, update_counter/3, update_counter/4
|
||||
@ -61,17 +60,14 @@
|
||||
, get_batch/1
|
||||
, snapshot/1
|
||||
, release_snapshot/1
|
||||
, first/1 , first/2
|
||||
, next/2 , next/3
|
||||
, prev/2 , prev/3
|
||||
, last/1 , last/2
|
||||
, select/2 , select/3
|
||||
, select_reverse/2, select_reverse/3
|
||||
, first/1 , first/2
|
||||
, next/2 , next/3
|
||||
, prev/2 , prev/3
|
||||
, last/1 , last/2
|
||||
, select/2 , select/3
|
||||
, select/1
|
||||
, fold/3 , fold/4 , fold/5
|
||||
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
|
||||
, rdb_fold/4 , rdb_fold/5
|
||||
, rdb_fold_reverse/4, rdb_fold_reverse/5
|
||||
, fold/3 , fold/4 , fold/5
|
||||
, rdb_fold/4 , rdb_fold/5
|
||||
, write_info/3
|
||||
, read_info/2
|
||||
, read_info/1
|
||||
@ -543,6 +539,7 @@ re_throw(Cat, Err) ->
|
||||
throw -> throw(Err)
|
||||
end.
|
||||
|
||||
|
||||
mnesia_compatible_aborts() ->
|
||||
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
||||
|
||||
@ -663,25 +660,10 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
|
||||
inherit_ctxt(Ref, R) ->
|
||||
maps:merge(Ref, maps:with([snapshot, activity], R)).
|
||||
|
||||
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
|
||||
%%
|
||||
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
|
||||
%% closed once the fun terminates.
|
||||
%% @equiv with_iterator(Tab, Fun, [])
|
||||
%% @end
|
||||
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
|
||||
with_iterator(Tab, Fun) ->
|
||||
with_iterator(Tab, Fun, []).
|
||||
|
||||
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
|
||||
%%
|
||||
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
|
||||
%% closed once the fun terminates.
|
||||
%%
|
||||
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
|
||||
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
|
||||
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
|
||||
%% @end
|
||||
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
|
||||
with_iterator(Tab, Fun, Opts) ->
|
||||
R = ensure_ref(Tab),
|
||||
@ -758,21 +740,6 @@ insert(Tab, Obj0, Opts) ->
|
||||
EncVal = encode_val(Obj, Ref),
|
||||
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
||||
|
||||
merge(Tab, Key, MergeOp) ->
|
||||
merge(Tab, Key, MergeOp, []).
|
||||
|
||||
merge(Tab, Key, MergeOp, Opts) ->
|
||||
#{encoding := Enc} = Ref = ensure_ref(Tab),
|
||||
case Enc of
|
||||
{_, {value, term}} ->
|
||||
merge_(Ref, Key, MergeOp, Opts);
|
||||
_ ->
|
||||
abort(badarg)
|
||||
end.
|
||||
|
||||
merge_(Ref, Key, MergeOp, Opts) ->
|
||||
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
|
||||
|
||||
validate_obj(Obj, #{mode := mnesia}) ->
|
||||
Obj;
|
||||
validate_obj(Obj, #{attr_pos := AP,
|
||||
@ -1317,13 +1284,6 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:select(ensure_ref(Tab), Pat, Limit).
|
||||
|
||||
select_reverse(Tab, Pat) ->
|
||||
select_reverse(Tab, Pat, infinity).
|
||||
|
||||
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
|
||||
|
||||
select(Cont) ->
|
||||
mrdb_select:select(Cont).
|
||||
|
||||
@ -1378,16 +1338,6 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc) ->
|
||||
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
|
||||
|
||||
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
rdb_fold(Tab, Fun, Acc, Prefix, infinity).
|
||||
@ -1397,15 +1347,6 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
|
||||
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||
|
||||
valid_limit(L) ->
|
||||
case L of
|
||||
infinity ->
|
||||
@ -1417,7 +1358,15 @@ valid_limit(L) ->
|
||||
end.
|
||||
|
||||
write_info(Tab, K, V) ->
|
||||
#{alias := Alias} = ensure_ref(Tab),
|
||||
R = ensure_ref(Tab),
|
||||
Alias = case R of
|
||||
#{type := standalone, vsn := 1, alias := A} = TRef ->
|
||||
%% Also write on legacy info format
|
||||
write_info_standalone(TRef, K, V),
|
||||
A;
|
||||
#{alias := A} ->
|
||||
A
|
||||
end,
|
||||
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
|
||||
|
||||
write_info_(#{} = R, Tab, K, V) ->
|
||||
@ -1434,8 +1383,13 @@ read_info(Tab, K) ->
|
||||
read_info(Tab, K, Default) when K==size; K==memory ->
|
||||
read_direct_info_(ensure_ref(Tab), K, Default);
|
||||
read_info(Tab, K, Default) ->
|
||||
#{alias := Alias} = ensure_ref(Tab),
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
|
||||
#{alias := Alias} = R = ensure_ref(Tab),
|
||||
case R of
|
||||
#{type := standalone, vsn := 1} = TRef ->
|
||||
read_info_standalone(TRef, K, Default);
|
||||
#{alias := Alias} ->
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
|
||||
end.
|
||||
|
||||
read_direct_info_(R, memory, _Def) ->
|
||||
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
|
||||
@ -1459,6 +1413,26 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
|
||||
%%rocksdb_boolean(<<"1">>) -> true;
|
||||
%%rocksdb_boolean(<<"0">>) -> false.
|
||||
|
||||
write_info_standalone(#{} = R, K, V) ->
|
||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
||||
EncV = term_to_binary(V),
|
||||
rdb_put(R, EncK, EncV, write_opts(R, [])).
|
||||
|
||||
read_info_standalone(#{} = R, K, Default) ->
|
||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
||||
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
|
||||
|
||||
get_info_res(Res, Default) ->
|
||||
case Res of
|
||||
not_found ->
|
||||
Default;
|
||||
{ok, Bin} ->
|
||||
%% no fancy tricks when encoding/decoding info values
|
||||
binary_to_term(Bin);
|
||||
{error, E} ->
|
||||
error(E)
|
||||
end.
|
||||
|
||||
%% insert_bag_v2(Ref, K, V, Opts) ->
|
||||
%% rdb_merge(Ref, K, {list_append, [V]}
|
||||
|
||||
|
@ -9,10 +9,6 @@
|
||||
, fold/4
|
||||
, rev_fold/4
|
||||
, index_ref/2
|
||||
, select/3
|
||||
, select/4
|
||||
, select_reverse/3
|
||||
, select_reverse/4
|
||||
]).
|
||||
|
||||
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
|
||||
@ -37,11 +33,6 @@
|
||||
|
||||
-export_type([ ix_iterator/0 ]).
|
||||
|
||||
-spec index_ref(mrdb:ref_or_tab(), mrdb:index_position()) -> mrdb:db_ref().
|
||||
index_ref(Tab, Ix) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Ix, R).
|
||||
|
||||
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
|
||||
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
|
||||
{ok, I} = iterator(Tab, IxPos),
|
||||
@ -97,26 +88,6 @@ fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
|
||||
iter_fold(I, Start, Dir, FoldFun, Acc)
|
||||
end).
|
||||
|
||||
select(Tab, Ix, MS) ->
|
||||
select(Tab, Ix, MS, infinity).
|
||||
|
||||
select(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
select_reverse(Tab, Ix, MS) ->
|
||||
select_reverse(Tab, Ix, MS, infinity).
|
||||
|
||||
select_reverse(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select_reverse(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
iter_fold(I, Start, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
|
||||
|
||||
@ -125,6 +96,10 @@ iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
|
||||
iter_fold_({error, _}, _, _, _, Acc) ->
|
||||
Acc.
|
||||
|
||||
index_ref(Tab, Pos) ->
|
||||
TRef = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Pos, TRef).
|
||||
|
||||
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
case mrdb:iterator_move(I, Dir) of
|
||||
{ok, {{FKey, PKey}}} ->
|
||||
@ -146,29 +121,6 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
Other
|
||||
end.
|
||||
|
||||
pre_match_spec([{{KeyPat,ObjPat}, Gs, MatchBody} | T]) ->
|
||||
[{{{KeyPat,'_'},ObjPat}, Gs, MatchBody} | pre_match_spec(T)];
|
||||
pre_match_spec([H|T]) ->
|
||||
[H | pre_match_spec(T)];
|
||||
pre_match_spec([]) ->
|
||||
[].
|
||||
|
||||
%% Used for mrdb_select:select()
|
||||
%% The select operation folds over index keys, and for matching keys,
|
||||
%% calls the `derive_obj_f/1` fun, which normally just does `Obj -> [Obj]`.
|
||||
%% In the case of indexes, it fetches the object, if it exists, and then
|
||||
%% returns `[{IndexKey, Object}]`, which is what the matchspec should operate on.
|
||||
%%
|
||||
mk_derive_obj_f(Ref) ->
|
||||
fun({{IxK, Key}}) ->
|
||||
case mrdb:read(Ref, Key, []) of
|
||||
[Obj] ->
|
||||
[{IxK, Obj}];
|
||||
[] ->
|
||||
[]
|
||||
end
|
||||
end.
|
||||
|
||||
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
|
||||
opt_read(R, Key) ->
|
||||
case mrdb:read(R, Key, []) of
|
||||
|
@ -1,15 +1,11 @@
|
||||
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
|
||||
-module(mrdb_select).
|
||||
|
||||
-export([ select/3 %% (Ref, MatchSpec, Limit)
|
||||
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
|
||||
, select_reverse/3
|
||||
, select_reverse/4
|
||||
, select/1 %% (Cont)
|
||||
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
|
||||
, fold_reverse/5
|
||||
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
-export([ select/3 %% (Ref, MatchSpec, Limit)
|
||||
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
|
||||
, select/1 %% (Cont)
|
||||
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
|
||||
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
]).
|
||||
|
||||
-export([continuation_info/2]).
|
||||
@ -29,41 +25,26 @@
|
||||
, compiled_ms
|
||||
, limit
|
||||
, key_only = false % TODO: not used
|
||||
, direction = forward
|
||||
, pre_ms
|
||||
, derive_obj_f = fun unit_l/1
|
||||
, direction = forward % TODO: not used
|
||||
}).
|
||||
|
||||
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
|
||||
select(Ref, MS, false, forward, Limit).
|
||||
select(Ref, MS, false, Limit).
|
||||
|
||||
select(Ref, MS, AccKeys, Limit) ->
|
||||
select(Ref, MS, AccKeys, forward, Limit).
|
||||
|
||||
select_reverse(Ref, MS, Limit) ->
|
||||
select(Ref, MS, false, reverse, Limit).
|
||||
|
||||
select_reverse(Ref, MS, AccKeys, Limit) ->
|
||||
select(Ref, MS, AccKeys, reverse, Limit).
|
||||
|
||||
select(Ref, MS, AccKeys, Dir, Limit)
|
||||
select(Ref, MS, AccKeys, Limit)
|
||||
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
|
||||
Sel = mk_sel(Ref, MS, Dir, Limit),
|
||||
Sel = mk_sel(Ref, MS, Limit),
|
||||
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
|
||||
|
||||
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
|
||||
MSpre = maps:get(pre_ms, Ref, MS),
|
||||
Keypat = keypat(MSpre, keypos(Tab), Ref),
|
||||
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
|
||||
Keypat = keypat(MS, keypos(Tab), Ref),
|
||||
#sel{tab = Tab,
|
||||
ref = Ref,
|
||||
keypat = Keypat,
|
||||
ms = MSpre,
|
||||
pre_ms = MSpre,
|
||||
compiled_ms = ms_compile(MS),
|
||||
ms = MS,
|
||||
compiled_ms = ets:match_spec_compile(MS),
|
||||
key_only = needs_key_only(MS),
|
||||
direction = Dir,
|
||||
limit = Limit,
|
||||
derive_obj_f = derive_f(Ref)}.
|
||||
limit = Limit}.
|
||||
|
||||
select(Cont) ->
|
||||
case Cont of
|
||||
@ -78,18 +59,11 @@ continuation_info(_, _) -> undefined.
|
||||
|
||||
continuation_info_(ref, #sel{ref = Ref}) -> Ref;
|
||||
continuation_info_(ms, #sel{ms = MS }) -> MS;
|
||||
continuation_info_(pre_ms, #sel{pre_ms = MSp}) -> MSp;
|
||||
continuation_info_(limit, #sel{limit = L }) -> L;
|
||||
continuation_info_(direction, #sel{direction = Dir}) -> Dir;
|
||||
continuation_info_(_, _) -> undefined.
|
||||
|
||||
fold(Ref, Fun, Acc, MS, Limit) ->
|
||||
do_fold(Ref, Fun, Acc, MS, forward, Limit).
|
||||
|
||||
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
|
||||
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
|
||||
|
||||
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
|
||||
{AccKeys, F} =
|
||||
if is_function(Fun, 3) ->
|
||||
{true, fun({K, Obj}, Acc1) ->
|
||||
@ -100,7 +74,7 @@ do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
|
||||
true ->
|
||||
mrdb:abort(invalid_fold_fun)
|
||||
end,
|
||||
fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
|
||||
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
|
||||
|
||||
fold_('$end_of_table', _, Acc) ->
|
||||
Acc;
|
||||
@ -112,57 +86,12 @@ fold_({L, Cont}, Fun, Acc) ->
|
||||
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref, fun(I) ->
|
||||
MovRes = fwd_init_seek(I, Prefix),
|
||||
MovRes = rocksdb:iterator_move(I, first(Ref)),
|
||||
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
|
||||
end).
|
||||
|
||||
rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref, fun(I) ->
|
||||
MovRes = rev_init_seek(I, Prefix),
|
||||
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
|
||||
end).
|
||||
|
||||
fwd_init_seek(I, Pfx) ->
|
||||
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
|
||||
|
||||
fwd_init_seek_tgt(<<>> ) -> first;
|
||||
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
|
||||
|
||||
rev_init_seek(I, Pfx) ->
|
||||
case rev_init_seek_tgt(Pfx) of
|
||||
last ->
|
||||
i_move(I, last);
|
||||
{seek, Bin} ->
|
||||
%% An 'incremented' prefix.
|
||||
%% This will fail if we seek past the end of the table.
|
||||
%% Then, try to seek_for_prev instead (fails if table empty).
|
||||
%% This because rocksdb lacks a "seek backward to last matching prefix".
|
||||
case i_move(I, {seek, Bin}) of
|
||||
{error, invalid_iterator} ->
|
||||
i_move(I, {seek_for_prev, Bin});
|
||||
{ok, _, _} = Ok ->
|
||||
Ok
|
||||
end
|
||||
end.
|
||||
|
||||
rev_init_seek_tgt(<<>>) -> last;
|
||||
rev_init_seek_tgt(Prefix) ->
|
||||
case incr_prefix(Prefix) of
|
||||
last -> last;
|
||||
Pfx1 when is_binary(Pfx1) ->
|
||||
{seek, Pfx1}
|
||||
end.
|
||||
|
||||
incr_prefix(<<>>) -> last;
|
||||
incr_prefix(Pfx) when is_binary(Pfx) ->
|
||||
PfxI = binary:decode_unsigned(Pfx),
|
||||
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
|
||||
case PfxI + 1 of
|
||||
I1 when I1 >= MaxI -> last;
|
||||
I1 ->
|
||||
binary:encode_unsigned(I1)
|
||||
end.
|
||||
first(#{vsn := 1}) -> <<?DATA_START>>;
|
||||
first(_) -> first.
|
||||
|
||||
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
case is_prefix(Pfx, K) of
|
||||
@ -175,37 +104,20 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
i_rdb_fold(_, _, _, _, Acc, _) ->
|
||||
Acc.
|
||||
|
||||
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
case is_prefix(Pfx, K) of
|
||||
true ->
|
||||
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
|
||||
Fun(K, V, Acc), decr(Limit));
|
||||
false when K > Pfx ->
|
||||
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
|
||||
false ->
|
||||
Acc
|
||||
end;
|
||||
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
|
||||
Acc.
|
||||
|
||||
i_select(I, #sel{ keypat = Pfx
|
||||
, compiled_ms = MS
|
||||
, limit = Limit
|
||||
, direction = Dir
|
||||
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
|
||||
{MoveRes, Sel} = case Enc of
|
||||
{term, _} ->
|
||||
%% No defined ordering - do forward select
|
||||
{i_move(I, first), Sel0#sel{direction = forward}};
|
||||
_ ->
|
||||
case Dir of
|
||||
forward ->
|
||||
{fwd_init_seek(I, Pfx), Sel0};
|
||||
reverse ->
|
||||
{rev_init_seek(I, Pfx), Sel0}
|
||||
end
|
||||
end,
|
||||
select_traverse(MoveRes, Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
|
||||
StartKey = case {Pfx, Vsn, Enc} of
|
||||
{<<>>, 1, {sext, _}} ->
|
||||
<<?DATA_START>>;
|
||||
{_, _, {term, _}} ->
|
||||
<<>>;
|
||||
_ ->
|
||||
Pfx
|
||||
end,
|
||||
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
|
||||
Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
|
||||
needs_key_only([Pat]) ->
|
||||
needs_key_only_(Pat);
|
||||
@ -281,17 +193,16 @@ map_vars([H|T], P) ->
|
||||
map_vars([], _) ->
|
||||
[].
|
||||
|
||||
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
|
||||
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
|
||||
AccKeys, Acc) ->
|
||||
case is_prefix(Pfx, K) of
|
||||
true ->
|
||||
DecKey = decode_key(K, R),
|
||||
Rec0 = decode_val(V, DecKey, R),
|
||||
RecL = derive_object(Rec0, Sel),
|
||||
case ms_run(RecL, MS) of
|
||||
Rec = decode_val(V, DecKey, R),
|
||||
case ets:match_spec_run([Rec], MS) of
|
||||
[] ->
|
||||
select_traverse(
|
||||
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
|
||||
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
|
||||
I, Sel, AccKeys, Acc);
|
||||
[Match] ->
|
||||
Acc1 = if AccKeys ->
|
||||
@ -301,9 +212,6 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} =
|
||||
end,
|
||||
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
|
||||
end;
|
||||
false when Dir == reverse, K > Pfx ->
|
||||
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
|
||||
I, Sel, AccKeys, Acc);
|
||||
false when Limit == infinity ->
|
||||
lists:reverse(Acc);
|
||||
false ->
|
||||
@ -312,9 +220,6 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} =
|
||||
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
|
||||
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
|
||||
|
||||
next_or_prev(forward) -> next;
|
||||
next_or_prev(reverse) -> prev.
|
||||
|
||||
select_return(infinity, {L, '$end_of_table'}) ->
|
||||
L;
|
||||
select_return(_, Ret) ->
|
||||
@ -334,40 +239,29 @@ decr(I) when is_integer(I) ->
|
||||
decr(infinity) ->
|
||||
infinity.
|
||||
|
||||
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
|
||||
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
|
||||
{lists:reverse(Acc),
|
||||
fun(sel) -> Sel;
|
||||
(cont) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref,
|
||||
fun(NewI) ->
|
||||
select_traverse(iterator_next(NewI, K, Dir),
|
||||
select_traverse(iterator_next(NewI, K),
|
||||
Limit, Pfx, MS, NewI, Sel,
|
||||
AccKeys, [])
|
||||
end)
|
||||
end};
|
||||
traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
|
||||
select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
|
||||
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
|
||||
iterator_next(I, K, Dir) ->
|
||||
case i_move(I, K, Dir) of
|
||||
iterator_next(I, K) ->
|
||||
case rocksdb:iterator_move(I, K) of
|
||||
{ok, K, _} ->
|
||||
rocksdb:iterator_move(I, next_or_prev(Dir));
|
||||
rocksdb:iterator_move(I, next);
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
i_move(I, Tgt) ->
|
||||
rocksdb:iterator_move(I, Tgt).
|
||||
|
||||
i_move(I, K, reverse) ->
|
||||
rocksdb:iterator_move(I, {seek_for_prev, K});
|
||||
i_move(I, K, forward) ->
|
||||
rocksdb:iterator_move(I, K).
|
||||
|
||||
derive_object(R, #sel{derive_obj_f = F}) ->
|
||||
F(R).
|
||||
|
||||
keypat([H|T], KeyPos, Ref) ->
|
||||
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
|
||||
|
||||
@ -389,14 +283,3 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadP
|
||||
keypat_pfx(_, _, _) ->
|
||||
<<>>.
|
||||
|
||||
derive_f(#{derive_obj_f := F}) when is_function(F, 1) -> F;
|
||||
derive_f(_) -> fun unit_l/1.
|
||||
|
||||
unit_l(X) ->
|
||||
[X].
|
||||
|
||||
ms_compile(MS) ->
|
||||
ets:match_spec_compile(MS).
|
||||
|
||||
ms_run(RecL, MS) ->
|
||||
ets:match_spec_run(RecL, MS).
|
||||
|
@ -27,9 +27,6 @@
|
||||
, mrdb_two_procs_tx_inner_restart/1
|
||||
, mrdb_two_procs_snap/1
|
||||
, mrdb_three_procs/1
|
||||
, create_counters/1
|
||||
, update_counters/1
|
||||
, restart_node/1
|
||||
]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
@ -45,8 +42,7 @@ all() ->
|
||||
groups() ->
|
||||
[
|
||||
{all_tests, [sequence], [ {group, checks}
|
||||
, {group, mrdb}
|
||||
, {group, counters}]}
|
||||
, {group, mrdb} ]}
|
||||
%% , error_handling ]}
|
||||
, {checks, [sequence], [ encoding_sext_attrs
|
||||
, encoding_binary_binary
|
||||
@ -61,10 +57,6 @@ groups() ->
|
||||
, mrdb_two_procs_tx_inner_restart
|
||||
, mrdb_two_procs_snap
|
||||
, mrdb_three_procs ]}
|
||||
, {counters, [sequence], [ create_counters
|
||||
, update_counters
|
||||
, restart_node
|
||||
, update_counters ]}
|
||||
].
|
||||
|
||||
|
||||
@ -757,35 +749,6 @@ get_attempt() ->
|
||||
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
|
||||
Attempt.
|
||||
|
||||
create_counters(_Config) ->
|
||||
create_tab(counters, []),
|
||||
mrdb:insert(counters, {counters, c0, 1}),
|
||||
mrdb:update_counter(counters, c1, 1),
|
||||
[{counters, c0, 1}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, 1}] = mrdb:read(counters, c1),
|
||||
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
|
||||
ok.
|
||||
|
||||
restart_node(_Config) ->
|
||||
mnesia:stop(),
|
||||
ok = mnesia:start(),
|
||||
ct:log("mnesia restarted", []),
|
||||
ok.
|
||||
|
||||
update_counters(_Config) ->
|
||||
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
|
||||
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
|
||||
ok = mrdb:update_counter(counters, c0, 1),
|
||||
ok = mrdb:update_counter(counters, c1, 1),
|
||||
ct:log("Incremented c0 and c1 by 1", []),
|
||||
C0 = C0Prev + 1,
|
||||
C1 = C1Prev + 1,
|
||||
[{counters, c0, C0}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, C1}] = mrdb:read(counters, c1),
|
||||
ct:log("c0: ~p, c1: ~p", [C0, C1]),
|
||||
ok.
|
||||
|
||||
create_tabs(Tabs, Config) ->
|
||||
Res = lists:map(fun create_tab/1, Tabs),
|
||||
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
|
||||
|
@ -236,25 +236,12 @@ test_index_plugin(Config) ->
|
||||
if Type == rdb ->
|
||||
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
|
||||
ok = test_select(Tab,{pfx},[{'_', [], ['$_']}]),
|
||||
MS2 = [{{<<"whi">>,{Tab,"truth",'_'}},[],['$_']}],
|
||||
[_] = mrdb_index:select(Tab, {pfx}, MS2),
|
||||
ok = test_select(Tab, {pfx}, MS2),
|
||||
[{Tab,"foobar","sentence"}] = mrdb:index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
test_select(Tab, Ix, MS) ->
|
||||
Res = mrdb_index:select(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, Res]),
|
||||
RevRes = mrdb_index:select_reverse(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select_reverse(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, RevRes]),
|
||||
{Res,Res} = {Res, lists:reverse(RevRes)},
|
||||
ok.
|
||||
|
||||
|
||||
ixtype(T) when T==bag;
|
||||
T==ordered ->
|
||||
{{pfx}, T};
|
||||
|
@ -1,226 +0,0 @@
|
||||
-module(mrdb_fold_SUITE).
|
||||
|
||||
-export([
|
||||
all/0
|
||||
, groups/0
|
||||
, suite/0
|
||||
, init_per_suite/1
|
||||
, end_per_suite/1
|
||||
, init_per_group/2
|
||||
, end_per_group/2
|
||||
, init_per_testcase/2
|
||||
, end_per_testcase/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
no_prefix_fwd_rdb_fold/1
|
||||
, prefixed_fwd_rdb_fold/1
|
||||
, no_prefix_rev_rdb_fold/1
|
||||
, prefixed_rev_rdb_fold/1
|
||||
, prefixed_rev_rdb_fold_max/1
|
||||
, fwd_fold/1
|
||||
, filtered_fwd_fold/1
|
||||
, rev_fold/1
|
||||
, filtered_rev_fold/1
|
||||
, fwd_select/1
|
||||
, rev_select/1
|
||||
, select_cont/1
|
||||
, rev_select_cont/1
|
||||
]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-record(r, {k, v}).
|
||||
|
||||
suite() ->
|
||||
[].
|
||||
|
||||
all() ->
|
||||
[{group, all_tests}].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{all_tests, [sequence], [ {group, rdb_fold}
|
||||
, {group, fold}
|
||||
, {group, select} ]}
|
||||
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
|
||||
, prefixed_fwd_rdb_fold
|
||||
, no_prefix_rev_rdb_fold
|
||||
, prefixed_rev_rdb_fold
|
||||
, prefixed_rev_rdb_fold_max ]}
|
||||
, {fold, [sequence], [ fwd_fold
|
||||
, filtered_fwd_fold
|
||||
, rev_fold
|
||||
, filtered_rev_fold ]}
|
||||
, {select, [sequence], [ fwd_select
|
||||
, rev_select
|
||||
, select_cont
|
||||
, rev_select_cont ]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
mnesia:stop(),
|
||||
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
mnesia:stop(),
|
||||
ok.
|
||||
|
||||
init_per_group(G, Config) when G==rdb_fold; G==fold ->
|
||||
mk_tab(G),
|
||||
Config;
|
||||
init_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(_, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
mk_tab(G) ->
|
||||
T = tab_name(G),
|
||||
case create_tab(T) of
|
||||
{atomic, ok} -> fill_tab(T);
|
||||
{aborted,{already_exists,T}} -> ok
|
||||
end.
|
||||
|
||||
tab_name(fold ) -> r;
|
||||
tab_name(select ) -> r;
|
||||
tab_name(rdb_fold) -> t.
|
||||
|
||||
create_tab(T) ->
|
||||
Opts = tab_opts(T),
|
||||
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
|
||||
|
||||
tab_opts(r) ->
|
||||
[{attributes, [k, v]}, {type, ordered_set}];
|
||||
tab_opts(t) ->
|
||||
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
|
||||
|
||||
no_prefix_fwd_rdb_fold(_Config) ->
|
||||
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
|
||||
Expected = lists:reverse(raw_objs()),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
fwd_fold(_Config) ->
|
||||
Res = mrdb:fold(r, fun simple_acc/2, []),
|
||||
Expected = lists:reverse(objs()),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_fwd_rdb_fold(_Config) ->
|
||||
Pfx = <<"aa">>,
|
||||
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
filtered_fwd_fold(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
no_prefix_rev_rdb_fold(_Config) ->
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
|
||||
Expected = raw_objs(),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
rev_fold(_Config) ->
|
||||
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
|
||||
Expected = objs(),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_rev_rdb_fold(_Config) ->
|
||||
Pfx = <<"aa">>,
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = prefixed_raw_objs(Pfx),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
filtered_rev_fold(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
|
||||
Expected = filtered_objs(MS),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_rev_rdb_fold_max(_Config) ->
|
||||
Pfx = <<255,255>>,
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = prefixed_raw_objs(Pfx),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
fwd_select(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:select(r, MS),
|
||||
Expected = filtered_objs(MS),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
rev_select(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:select_reverse(r, MS),
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
select_cont(_Cont) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Expected = filtered_objs(MS),
|
||||
select_cont_loop(mrdb:select(r, MS, 1), Expected).
|
||||
|
||||
rev_select_cont(_Cont) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
|
||||
|
||||
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
|
||||
select_cont_loop(mrdb:select(Cont), Rest);
|
||||
select_cont_loop({[], '$end_of_table'}, []) ->
|
||||
ok.
|
||||
|
||||
simple_acc(#r{} = Obj, Acc) ->
|
||||
[Obj | Acc].
|
||||
|
||||
simple_rdb_acc(K, V, Acc) ->
|
||||
[{K,V} | Acc].
|
||||
|
||||
fill_tab(t = Tab) ->
|
||||
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
|
||||
ok;
|
||||
fill_tab(r = Tab) ->
|
||||
[mrdb:insert(Tab, Obj) || Obj <- objs()],
|
||||
ok.
|
||||
|
||||
prefixed_raw_objs(Pfx) ->
|
||||
[Obj || {K,_} = Obj <- raw_objs(),
|
||||
is_prefix(Pfx, K)].
|
||||
|
||||
raw_objs() ->
|
||||
[ {<<"aa">> , <<"1">>}
|
||||
, {<<"aa1">>, <<"2">>}
|
||||
, {<<"ab">> , <<"3">>}
|
||||
, {<<"ab1">>, <<"4">>}
|
||||
, {<<255,255>>, <<"5">>} ].
|
||||
|
||||
filtered_objs(MS) ->
|
||||
MSC = ets:match_spec_compile(MS),
|
||||
ets:match_spec_run(objs(), MSC).
|
||||
|
||||
objs() ->
|
||||
[ #r{k = {a,a,1}, v = 1}
|
||||
, #r{k = {a,b,2}, v = 2}
|
||||
, #r{k = {a,b,3}, v = 3}
|
||||
, #r{k = {a,c,4}, v = 4}
|
||||
, #r{k = {b,b,5}, v = 5}
|
||||
].
|
||||
|
||||
%% copied from mrdb_select.erl
|
||||
is_prefix(A, B) when is_binary(A), is_binary(B) ->
|
||||
Sa = byte_size(A),
|
||||
case B of
|
||||
<<A:Sa/binary, _/binary>> ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
Loading…
x
Reference in New Issue
Block a user