Refactor to support column families, direct rocksdb access

Expose low-level helpers, fix dialyzer warnings

WIP column families and mrdb API

Basic functionality in place

started adding documentation

remove doc/ from .gitignore

add doc/* files

recognize pre-existing tabs at startup

wip: most of the functionality in place (not yet merge ops)

wip: adding transaction support

wip: add transaction test case (currently dumps core)

First draft, mnesia plugin user guide

Fix note formatting

WIP working on indexing

Index iterators, dialyzer, xref fixes

open db with optimistic transactions

Use rocksdb-1.7.0

Use seanhinde rocksdb patch, enable rollback

Call the right transaction_get() function

WIP add 'snap_tx' activity type

tx restart using mrdb_mutex

Fix test suite sync bugs

WIP instrumented for debugging

WIP working on migration test case

Add migration test suite

Migration works, subscribe to schema changes

WIP fix batch handling

Manage separate batches per db_ref

Add mrdb:fold/3

Add some docs, erlang_ls config

Use seanhinde's rocksdb vsn
This commit is contained in:
Ulf Wiger
2020-12-22 10:36:24 +01:00
parent c0ce3afe39
commit d5dafb5b7e
41 changed files with 8688 additions and 1571 deletions
+393 -1342
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+17
View File
@@ -0,0 +1,17 @@
-include_lib("hut/include/hut.hrl").
%% enable debugging messages through mnesia:set_debug_level(debug)
-ifndef(MNESIA_ROCKSDB_NO_DBG).
-define(dbg(Fmt, Args), ?log(debug, Fmt, Args)).
%% -define(dbg(Fmt, Args),
%% %% avoid evaluating Args if the message will be dropped anyway
%% case mnesia_monitor:get_env(debug) of
%% none -> ok;
%% verbose -> ok;
%% _ -> mnesia_lib:dbg_out("~p:~p: "++(Fmt),[?MODULE,?LINE|Args])
%% end).
-else.
-define(dbg(Fmt, Args), ok).
-endif.
-define(DEFAULT_RETRIES, 1).
+330 -6
View File
@@ -1,17 +1,341 @@
%%% @doc RocksDB update wrappers, in separate module for easy tracing and mocking.
%%% @doc RocksDB update wrappers, in separate module for easy tracing and mocking.
%%%
-module(mnesia_rocksdb_lib).
-export([put/4,
write/3,
delete/3]).
-export([ put/4
, write/3
, delete/3
]).
-export([ open_rocksdb/3
, data_mountpoint/1
, create_mountpoint/1
, tabname/1
]).
-export([ default_encoding/3
, check_encoding/2
, valid_obj_type/2
, valid_key_type/2 ]).
-export([ keypos/1
, encode_key/1, encode_key/2
, decode_key/1, decode_key/2
, encode_val/1, encode_val/2
, decode_val/1, decode_val/3
, encode/2
, decode/2
]).
-include("mnesia_rocksdb.hrl").
-include_lib("hut/include/hut.hrl").
put(#{db := Ref, cf := CF}, K, V, Opts) ->
rocksdb:put(Ref, CF, K, V, Opts);
put(Ref, K, V, Opts) ->
rocksdb:put(Ref, K, V, Opts).
write(Ref, L, Opts) ->
rocksdb:write(Ref, L, Opts).
write(#{db := Ref, cf := CF}, L, Opts) ->
write_as_batch(L, Ref, CF, Opts).
delete(Ref, K, Opts) ->
rocksdb:delete(Ref, K, Opts).
write_as_batch(L, Ref, CF, Opts) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun({put, K, V}) ->
ok = rocksdb:batch_put(Batch, CF, K, V);
({delete, K}) ->
ok = rocksdb:batch_delete(Batch, CF, K)
end, L),
rocksdb:write_batch(Ref, Batch, Opts).
create_mountpoint(Tab) ->
MPd = data_mountpoint(Tab),
case filelib:is_dir(MPd) of
false ->
file:make_dir(MPd),
ok;
true ->
Dir = mnesia_lib:dir(),
case lists:prefix(Dir, MPd) of
true ->
ok;
false ->
{error, exists}
end
end.
data_mountpoint(Tab) ->
Dir = mnesia_monitor:get_env(dir),
filename:join(Dir, tabname(Tab) ++ ".extrdb").
tabname({admin, Alias}) ->
"mnesia_rocksdb-" ++ atom_to_list(Alias) ++ "-_db";
tabname({Tab, index, {{Pos},_}}) ->
atom_to_list(Tab) ++ "-=" ++ atom_to_list(Pos) ++ "=-_ix";
tabname({Tab, index, {Pos,_}}) ->
atom_to_list(Tab) ++ "-" ++ integer_to_list(Pos) ++ "-_ix";
tabname({Tab, retainer, Name}) ->
atom_to_list(Tab) ++ "-" ++ retainername(Name) ++ "-_RET";
tabname(Tab) when is_atom(Tab) ->
atom_to_list(Tab) ++ "-_tab".
default_encoding({_, index, _}, _, _) ->
{sext, {value, raw}};
default_encoding({_, retainer, _}, _, _) ->
{term, {value, term}};
default_encoding(_, Type, As) ->
KeyEnc = case Type of
ordered_set -> sext;
set -> term;
bag -> sext
end,
ValEnc = case As of
[_, _] ->
{value, term};
[_, _ | _] ->
{object, term}
end,
{KeyEnc, ValEnc}.
check_encoding(Encoding, Attributes) ->
try check_encoding_(Encoding, Attributes)
catch
throw:Error ->
Error
end.
check_encoding_({Key, Val}, As) ->
Key1 = check_key_encoding(Key),
Val1 = check_value_encoding(Val, As),
{ok, {Key1, Val1}};
check_encoding_(E, _) ->
throw({error, {invalid_encoding, E}}).
check_key_encoding(E) when E==sext; E==term; E==raw ->
E;
check_key_encoding(E) ->
throw({error, {invalid_key_encoding, E}}).
check_value_encoding(raw, [_, _]) -> {value, raw};
check_value_encoding({value, E} = V, [_, _]) when E==term; E==raw; E==sext -> V;
check_value_encoding({object, E} = V, _) when E==term; E==raw; E==sext -> V;
check_value_encoding(term, As) -> {val_encoding_type(As), term};
check_value_encoding(sext, As) -> {val_encoding_type(As), sext};
check_value_encoding(E, _) ->
throw({error, {invalid_value_encoding, E}}).
val_encoding_type(Attrs) ->
case Attrs of
[_, _] -> value;
[_, _|_] -> object
end.
valid_obj_type(#{encoding := Enc}, Obj) ->
case {Enc, Obj} of
{{binary, {value, binary}}, {_, K, V}} ->
is_binary(K) andalso is_binary(V);
{{binary, _}, _} ->
is_binary(element(2, Obj));
{{_, {value, binary}}, {_, _, V}} ->
is_binary(V);
_ ->
%% No restrictions on object type
%% unless key and/or value typed to binary
true
end.
valid_key_type(#{encoding := Enc}, Key) ->
case Enc of
{binary, _} when is_binary(Key) ->
true;
{binary, _} ->
false;
_ ->
true
end.
-spec encode_key(any()) -> binary().
encode_key(Key) ->
encode(Key, sext).
encode(Value, sext) ->
sext:encode(Value);
encode(Value, raw) when is_binary(Value) ->
Value;
encode(Value, term) ->
term_to_binary(Value).
encode_key(Key, #{encoding := {Enc,_}}) ->
encode(Key, Enc);
encode_key(Key, _) ->
encode(Key, sext).
-spec decode_key(binary()) -> any().
decode_key(CodedKey) ->
decode(CodedKey, sext).
decode_key(CodedKey, #{encoding := {Enc, _}}) ->
decode(CodedKey, Enc);
decode_key(CodedKey, Enc) ->
decode(CodedKey, Enc).
decode(Val, sext) ->
case sext:partial_decode(Val) of
{full, Result, _} ->
Result;
_ ->
error(badarg, Val)
end;
decode(Val, raw) ->
Val;
decode(Val, term) ->
binary_to_term(Val).
-spec encode_val(any()) -> binary().
encode_val(Val) ->
encode(Val, term).
encode_val(Val, Enc) when is_atom(Enc) ->
encode(Val, Enc);
encode_val(_, #{name := {_,index,_}}) ->
<<>>;
encode_val(Val, #{encoding := {_, Enc0}, attr_pos := AP}) ->
{Type, Enc} = enc_type(Enc0),
case {map_size(AP), Type} of
{2, value} ->
encode(element(3, Val), Enc);
{_, object} ->
encode(setelement(2, Val, []), Enc)
end.
enc_type({T, _} = E) when T==value; T==object ->
E;
enc_type(E) when is_atom(E) ->
{object, E}.
-spec decode_val(binary()) -> any().
decode_val(CodedVal) ->
binary_to_term(CodedVal).
decode_val(<<>>, K, #{name := {_,index,_}}) ->
{K};
decode_val(CodedVal, Key, Ref) ->
{Type, Enc} = value_encoding(Ref),
case Type of
object ->
setelement(2, decode(CodedVal, Enc), Key);
value ->
make_rec(Key, decode(CodedVal, Enc), Ref)
end.
make_rec(Key, _Val, #{name := {_, index, {_,ordered}}}) ->
{Key};
make_rec(Key, Val, #{properties := #{record_name := Tag}}) ->
{Tag, Key, Val};
make_rec(Key, Val, #{attr_pos := AP}) ->
%% no record name
case AP of
#{key := 1} -> {Key, Val};
#{key := 2} -> {Val, Key} %% Yeah, right, but people are weird
end.
value_encoding(#{encoding := {_, Enc}}) ->
enc_type(Enc);
value_encoding(#{}) ->
{object, term};
value_encoding({Type, Enc} = E) when is_atom(Type), is_atom(Enc) ->
E.
keypos({admin, _}) ->
1;
keypos({_, index, _}) ->
1;
keypos({_, retainer, _}) ->
2;
keypos(Tab) when is_atom(Tab) ->
2.
%% ======================================================================
%% Private functions
%% ======================================================================
retainername(Name) when is_atom(Name) ->
atom_to_list(Name);
retainername(Name) when is_list(Name) ->
try binary_to_list(list_to_binary(Name))
catch
error:_ ->
lists:flatten(io_lib:write(Name))
end;
retainername(Name) ->
lists:flatten(io_lib:write(Name)).
open_rocksdb(MPd, RdbOpts, CFs) ->
open_rocksdb(MPd, rocksdb_open_opts_(RdbOpts), CFs, get_retries()).
%% Code adapted from basho/riak_kv_eleveldb_backend.erl
open_rocksdb(MPd, Opts, CFs, Retries) ->
open_db(MPd, Opts, CFs, max(1, Retries), undefined).
open_db(_, _, _, 0, LastError) ->
{error, LastError};
open_db(MPd, Opts, CFs, RetriesLeft, _) ->
case rocksdb:open_optimistic_transaction_db(MPd, Opts, CFs) of
{ok, _Ref, _CFRefs} = Ok ->
?log(debug, "Open - Rocksdb: ~s (~p) -> ~p", [MPd, Opts, Ok]),
Ok;
%% Check specifically for lock error, this can be caused if
%% a crashed mnesia takes some time to flush rocksdb information
%% out to disk. The process is gone, but the NIF resource cleanup
%% may not have completed.
{error, {db_open, OpenErr}=Reason} ->
case lists:prefix("IO error: lock ", OpenErr) of
true ->
SleepFor = get_retry_delay(),
?log(debug, ("Open - Rocksdb backend retrying ~p in ~p ms"
" after error ~s"), [MPd, SleepFor, OpenErr]),
timer:sleep(SleepFor),
open_db(MPd, Opts, CFs, RetriesLeft - 1, Reason);
false ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
get_retries() -> 30.
get_retry_delay() -> 100.
rocksdb_open_opts_(RdbOpts) ->
lists:foldl(
fun({K,_} = Item, Acc) ->
lists:keystore(K, 1, Acc, Item)
end, default_open_opts(), RdbOpts).
default_open_opts() ->
[ {create_if_missing, true}
, {cache_size,
list_to_integer(get_env_default("ROCKSDB_CACHE_SIZE", "32212254"))}
, {block_size, 1024}
, {max_open_files, 30}
, {write_buffer_size,
list_to_integer(get_env_default(
"ROCKSDB_WRITE_BUFFER_SIZE", "4194304"))}
, {compression,
list_to_atom(get_env_default("ROCKSDB_COMPRESSION", "true"))}
, {use_bloomfilter, true}
].
get_env_default(Key, Default) ->
case os:getenv(Key) of
false ->
Default;
Value ->
Value
end.
+6 -11
View File
@@ -33,17 +33,12 @@
code_change/3]).
-include("mnesia_rocksdb_tuning.hrl").
-include("mnesia_rocksdb_int.hrl").
-define(KB, 1024).
-define(MB, 1024 * 1024).
-define(GB, 1024 * 1024 * 1024).
-ifdef(DEBUG).
-define(dbg(Fmt, Args), io:fwrite(user,"~p:~p: "++(Fmt),[?MODULE,?LINE|Args])).
-else.
-define(dbg(Fmt, Args), ok).
-endif.
lookup(Tab, Default) ->
try ets:lookup(?MODULE, Tab) of
[{_, Params}] ->
@@ -113,21 +108,21 @@ store_params(Params) ->
NTabs = length(Params),
Env0= mnesia_rocksdb_tuning:describe_env(),
Env = Env0#tuning{n_tabs = NTabs},
?dbg("Env = ~p~n", [Env]),
?log(debug, "Env = ~p~n", [Env]),
TotalFiles = lists:sum([mnesia_rocksdb_tuning:max_files(Sz) ||
{_, Sz} <- Params]),
?dbg("TotalFiles = ~p~n", [TotalFiles]),
?log(debug, "TotalFiles = ~p~n", [TotalFiles]),
MaxFs = Env#tuning.max_files,
?dbg("MaxFs = ~p~n", [MaxFs]),
?log(debug, "MaxFs = ~p~n", [MaxFs]),
FsHeadroom = MaxFs * 0.6,
?dbg("FsHeadroom = ~p~n", [FsHeadroom]),
?log(debug, "FsHeadroom = ~p~n", [FsHeadroom]),
FilesFactor = if TotalFiles =< FsHeadroom ->
1; % don't have to scale down
true ->
FsHeadroom / TotalFiles
end,
Env1 = Env#tuning{files_factor = FilesFactor},
?dbg("Env1 = ~p~n", [Env1]),
?log(debug, "Env1 = ~p~n", [Env1]),
lists:foreach(
fun({Tab, Sz}) when is_atom(Tab);
is_atom(element(1,Tab)),
+1452
View File
File diff suppressed because it is too large Load Diff
+191
View File
@@ -0,0 +1,191 @@
-module(mrdb_index).
-export([
with_iterator/3
, iterator_move/2
, iterator/2
, iterator_close/1
]).
-record(mrdb_ix_iter, { i :: mrdb:iterator()
, type = set :: set | bag
, sub :: mrdb:ref() | pid()
}).
-type ix_iterator() :: #mrdb_ix_iter{}.
-type index_value() :: any().
-type iterator_action() :: mrdb:iterator_action().
-type object() :: tuple().
-record(subst, { i :: mrdb:iterator()
, vals_f
, cur
, mref }).
-define(TIMEOUT, 5000).
-import(mnesia_rocksdb_lib, [ encode_key/2 ]).
-export_type([ ix_iterator/0 ]).
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
{ok, I} = iterator(Tab, IxPos),
try Fun(I)
after
iterator_close(I)
end.
-spec iterator(mrdb:ref_or_tab(), mrdb:index_position()) -> {ok, ix_iterator()}
| {error, _}.
iterator(Tab, IxPos) ->
#{semantics := Sem} = R = mrdb:ensure_ref(Tab),
#{ix_vals_f := IxValsF} = IxR = ensure_index_ref(IxPos, R),
case mrdb:iterator(IxR, []) of
{ok, I} ->
case Sem of
bag ->
P = sub_new(R, IxValsF),
{ok, #mrdb_ix_iter{ i = I
, sub = P }};
_ ->
{ok, #mrdb_ix_iter{i = I, sub = R}}
end;
Err ->
Err
end.
-spec iterator_move(ix_iterator(), iterator_action()) -> {ok, index_value(), object()}
| {error, _}.
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} ->
{ok, FKey, opt_read(Sub, PKey)};
Other ->
Other
end.
iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case call_sub(Sub, {move_rel, Dir}) of
not_found ->
case mrdb:iterator_move(I, Dir) of
{ok, {FKey, PKey}} ->
call_sub(Sub, {move_abs, FKey, PKey});
Other ->
Other
end;
Other ->
Other
end.
opt_read(R, Key) ->
case mrdb:read(R, Key, []) of
[Obj] ->
Obj;
[] ->
[]
end.
sub_new(R, ValsF) when is_function(ValsF, 1) ->
Me = self(),
{Pid, MRef} = spawn_monitor(
fun() ->
MRef = monitor(process, Me),
case mrdb:iterator(R) of
{ok, I} ->
Me ! {self(), ok},
sub_loop(#subst{ mref = MRef
, i = I
, vals_f = ValsF
, cur = undefined});
Error ->
Me ! {self(), Error}
end
end),
receive
{'DOWN', MRef, _, _, Crash} ->
mrdb:abort({error, Crash});
{Pid, ok} ->
demonitor(MRef),
Pid;
{Pid, Error} ->
demonitor(MRef),
mrdb:abort(Error)
end.
sub_loop(#subst{i = I, mref = MRef} = St) ->
receive
{'DOWN', MRef, _, _, _} ->
mrdb:iterator_close(I);
{Pid, Ref, close} ->
mrdb:iterator_close(I),
Pid ! {Ref, ok};
{Pid, Ref, cur} ->
Pid ! {Ref, St#subst.cur},
sub_loop(St);
{Pid, Ref, {move, Cur, Dir}} when is_binary(Dir) ->
{Res, St1} = sub_abs_move(Cur, Dir, St),
Pid ! {Ref, Res},
sub_loop(St1);
{Pid, Ref, {move_rel, Dir}} ->
{Res, St1} = sub_rel_move(Dir, St),
Pid ! {Ref, Res},
sub_loop(St1)
end.
sub_abs_move(Cur, Dir, #subst{i = I} = St) ->
case mrdb:iterator_move(I, Dir) of
{ok, _} = Ok ->
{Ok, St#subst{cur = Cur}};
Other ->
{Other, St#subst{cur = undefined}}
end.
sub_rel_move(Dir, #subst{i = I, vals_f = VF, cur = Prev} = St) ->
case mrdb:iterator_move(I, Dir) of
{ok, Obj} = Ok ->
case lists:member(Prev, VF(Obj)) of
true ->
{Ok, St};
false ->
{not_found, St#subst{cur = undefined}}
end;
Other ->
{Other, St#subst{cur = undefined}}
end.
call_sub(Pid, Req) ->
MRef = monitor(process, Pid),
Pid ! {self(), MRef, Req},
receive
{MRef, Reply} ->
demonitor(MRef),
Reply;
{'DOWN', MRef, _, _, Reason} ->
error(Reason)
after ?TIMEOUT ->
error(timeout)
end.
-spec iterator_close(ix_iterator()) -> ok.
iterator_close(#mrdb_ix_iter{i = I, sub = Sub}) ->
mrdb:iterator_close(I),
iterator_close_sub(Sub).
iterator_close_sub(P) when is_pid(P) ->
call_sub(P, close);
iterator_close_sub(_) ->
ok.
ensure_index_ref(IxPos, #{name := Name, attr_pos := AP, properties := #{index := Ixs}}) ->
{_,ordered} = Ix = lists:keyfind(index_pos(IxPos, AP), 1, Ixs),
mrdb:get_ref({Name, index, Ix}).
index_pos(P, AP) when is_atom(P) ->
maps:get(P, AP);
index_pos(P, _) ->
P.
+80
View File
@@ -0,0 +1,80 @@
-module(mrdb_mutex).
-export([ do/2 ]).
-export([ ensure_tab/0 ]).
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
try F()
after
release(Rsrc)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
end.
+270
View File
@@ -0,0 +1,270 @@
-module(mrdb_select).
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-import(mnesia_rocksdb_lib, [ keypos/1
, decode_key/2
, decode_val/3
]).
-include("mnesia_rocksdb.hrl").
-record(sel, { alias % TODO: not used
, tab
, ref
, keypat
, ms % TODO: not used
, compiled_ms
, limit
, key_only = false % TODO: not used
, direction = forward % TODO: not used
}).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit).
select(Ref, MS, AccKeys, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref),
#sel{tab = Tab,
ref = Ref,
keypat = Keypat,
ms = MS,
compiled_ms = ets:match_spec_compile(MS),
key_only = needs_key_only(MS),
limit = Limit}.
select(Cont) ->
case Cont of
'$end_of_table' -> '$end_of_table';
_ -> Cont()
end.
fold(Ref, Fun, Acc, MS, Limit) ->
{AccKeys, F} =
if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) ->
Fun(Obj, K, Acc1)
end};
is_function(Fun, 2) ->
{false, Fun};
true ->
mrdb:abort(invalid_fold_fun)
end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
fold_('$end_of_table', _, Acc) ->
Acc;
fold_(L, Fun, Acc) when is_list(L) ->
lists:foldl(Fun, Acc, L);
fold_({L, Cont}, Fun, Acc) ->
fold_(select(Cont), Fun, lists:foldl(Fun, Acc, L)).
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
first(#{vsn := 1}) -> <<?DATA_START>>;
first(_) -> first.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_fold(rocksdb:iterator_move(I, next), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false ->
Acc
end;
i_rdb_fold(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS
, limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
StartKey = case {Pfx, Vsn, Enc} of
{<<>>, 1, {sext, _}} ->
<<?DATA_START>>;
{_, _, {term, _}} ->
<<>>;
_ ->
Pfx
end,
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc).
needs_key_only([Pat]) ->
needs_key_only_(Pat);
needs_key_only([_|_] = Pats) ->
lists:all(fun needs_key_only_/1, Pats).
needs_key_only_({HP, _, Body}) ->
BodyVars = lists:flatmap(fun extract_vars/1, Body),
%% Note that we express the conditions for "needs more than key" and negate.
not(wild_in_body(BodyVars) orelse
case bound_in_headpat(HP) of
{all,V} -> lists:member(V, BodyVars);
Vars when is_list(Vars) -> any_in_body(lists:keydelete(2,1,Vars), BodyVars)
end).
extract_vars([H|T]) ->
extract_vars(H) ++ extract_vars(T);
extract_vars(T) when is_tuple(T) ->
extract_vars(tuple_to_list(T));
extract_vars(T) when T=='$$'; T=='$_' ->
[T];
extract_vars(T) when is_atom(T) ->
case is_wild(T) of
true ->
[T];
false ->
[]
end;
extract_vars(_) ->
[].
any_in_body(Vars, BodyVars) ->
lists:any(fun({_,Vs}) ->
intersection(Vs, BodyVars) =/= []
end, Vars).
intersection(A,B) when is_list(A), is_list(B) ->
A -- (A -- B).
is_wild('_') ->
true;
is_wild(A) when is_atom(A) ->
case atom_to_list(A) of
"\$" ++ S ->
try begin
_ = list_to_integer(S),
true
end
catch
error:_ ->
false
end;
_ ->
false
end.
wild_in_body(BodyVars) ->
intersection(BodyVars, ['$$','$_']) =/= [].
bound_in_headpat(HP) when is_atom(HP) ->
{all, HP};
bound_in_headpat(HP) when is_tuple(HP) ->
[_|T] = tuple_to_list(HP),
map_vars(T, 2).
map_vars([H|T], P) ->
case extract_vars(H) of
[] ->
map_vars(T, P+1);
Vs ->
[{P, Vs}|map_vars(T, P+1)]
end;
map_vars([], _) ->
[].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
AccKeys, Acc) ->
case is_prefix(Pfx, K) of
true ->
DecKey = decode_key(K, R),
Rec = decode_val(V, DecKey, R),
case ets:match_spec_run([Rec], MS) of
[] ->
select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
[Match] ->
Acc1 = if AccKeys ->
[{K, Match}|Acc];
true ->
[Match|Acc]
end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end;
false when Limit == infinity ->
lists:reverse(Acc);
false ->
{lists:reverse(Acc), '$end_of_table'}
end;
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
select_return(infinity, {L, '$end_of_table'}) ->
L;
select_return(_, Ret) ->
Ret.
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.
decr(I) when is_integer(I) ->
I-1;
decr(infinity) ->
infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc),
fun() ->
mrdb:with_rdb_iterator(
Ref,
fun(NewI) ->
select_traverse(iterator_next(NewI, K),
Limit, Pfx, MS, NewI, Sel,
AccKeys, [])
end)
end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) ->
case rocksdb:iterator_move(I, K) of
{ok, K, _} ->
rocksdb:iterator_move(I, next);
Other ->
Other
end.
keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
keypat(_, _, _, <<>>) -> <<>>;
keypat([H|T], KeyPos, Ref, Pfx0) ->
Pfx = keypat_pfx(H, KeyPos, Ref),
keypat(T, KeyPos, Ref, common_prefix(Pfx, Pfx0));
keypat([], _, _, Pfx) ->
Pfx.
common_prefix(<<H, T/binary>>, <<H, T1/binary>>) ->
<<H, (common_prefix(T, T1))/binary>>;
common_prefix(_, _) ->
<<>>.
keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadPat) ->
KP = element(KeyPos, HeadPat),
sext:prefix(KP);
keypat_pfx(_, _, _) ->
<<>>.