Compare commits
67 Commits
2.0.1
...
d6a4dca5f6
| Author | SHA1 | Date | |
|---|---|---|---|
| d6a4dca5f6 | |||
| b97b727605 | |||
| e0542f7f52 | |||
| 13ccdb373a | |||
| 29d5d6f170 | |||
| 4e3c9e83c8 | |||
| f9352b6ff0 | |||
| 04e6063ce0 | |||
| 323fcea7a7 | |||
| d8b6ab788e | |||
| 318f84bbaf | |||
| 0691d8b055 | |||
| ac69c8564f | |||
| 2ca7f36eb1 | |||
| 56d78768d9 | |||
| a768d8008f | |||
| 9cea41cb9a | |||
| d42055c7ac | |||
| 0ae7ecd41d | |||
| ece9db2b09 | |||
| 0e4382d5f7 | |||
| d9d82d7ead | |||
| 34285da6d8 | |||
| cb123ee28a | |||
| 996ae82717 | |||
| d6b86524fd | |||
| 296813ef7b | |||
| 16d0533acf | |||
| 1d6ae82c6d | |||
| 71ebaf2d66 | |||
| f2b6116d31 | |||
| 33ee7929d4 | |||
| 78669bb8bf | |||
| 346decee6e | |||
| 4da0fce567 | |||
| e7d42f9500 | |||
| 699a7d5920 | |||
| 57f02078bc | |||
| 791cec41db | |||
| 6eff62df6d | |||
| 2e430fc5eb | |||
| e76a01f8c4 | |||
| 4d0a78612a | |||
| eeb6aff242 | |||
| f11e16e29f | |||
| 0aecf5ef01 | |||
| 465a220bfe | |||
| 19140c738b | |||
| bed66b2998 | |||
| 3635eac717 | |||
| 95abe4e36e | |||
| 7c729bd932 | |||
| 4489e5d743 | |||
| d1a6bf22d5 | |||
| b65e82ed71 | |||
| ee9e7eac67 | |||
| ce2be519b4 | |||
| 8073a0daa5 | |||
| ab15b7f399 | |||
| a75f6e0c43 | |||
| 1340bb2050 | |||
| d1177b6ad4 | |||
| b908998e6b | |||
| dfc0125800 | |||
| 296abb23bb | |||
| 7057f4dcbd | |||
| c4235be94a |
+23
-3
@@ -2,15 +2,35 @@ version: 2.1
|
||||
|
||||
executors:
|
||||
aebuilder:
|
||||
parameters:
|
||||
otp:
|
||||
type: string
|
||||
docker:
|
||||
- image: aeternity/builder
|
||||
- image: aeternity/builder:focal-<< parameters.otp >>
|
||||
user: builder
|
||||
environment:
|
||||
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
|
||||
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
|
||||
|
||||
|
||||
jobs:
|
||||
build:
|
||||
executor: aebuilder
|
||||
build_and_test:
|
||||
parameters:
|
||||
otp:
|
||||
type: string
|
||||
executor:
|
||||
name: aebuilder
|
||||
otp: << parameters.otp >>
|
||||
steps:
|
||||
- checkout
|
||||
- run: make test
|
||||
- store_artifacts:
|
||||
path: _build/test/logs
|
||||
|
||||
workflows:
|
||||
commit:
|
||||
jobs:
|
||||
- build_and_test:
|
||||
matrix:
|
||||
parameters:
|
||||
otp: ["otp24", "otp25", "otp26"]
|
||||
|
||||
@@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
|
||||
it yourself. If you only need the size occasionally, you may traverse the
|
||||
table to count the elements.
|
||||
|
||||
When `mrdb` transactions abort, they will return a stacktrace caught
|
||||
from within the transaction fun, giving much better debugging info.
|
||||
This is different from how mnesia does it.
|
||||
|
||||
If behavior closer to mnesia's abort returns are needed, say, for backwards
|
||||
compatibility, this can be controlled by setting the environment variable
|
||||
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
|
||||
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
|
||||
For really performance-critical transactions which may abort often, it might
|
||||
make a difference to set this option to `true`, since there is a cost involved
|
||||
in producing stacktraces.
|
||||
|
||||
|
||||
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
||||
|
||||
|
||||
+2
-1
@@ -2,4 +2,5 @@
|
||||
{application,mnesia_rocksdb}.
|
||||
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
|
||||
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
|
||||
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
|
||||
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
|
||||
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
|
||||
|
||||
+10
-3
@@ -4,8 +4,8 @@
|
||||
{deps,
|
||||
[
|
||||
{sext, "1.8.0"},
|
||||
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
|
||||
{hut, "1.3.0"}
|
||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||
{hut, "1.4.0"}
|
||||
]}.
|
||||
|
||||
{xref_checks, [
|
||||
@@ -14,6 +14,10 @@
|
||||
deprecated_function_calls
|
||||
]}.
|
||||
|
||||
{dialyzer, [{plt_apps, all_deps},
|
||||
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
|
||||
]}.
|
||||
|
||||
{profiles,
|
||||
[
|
||||
{test,
|
||||
@@ -22,7 +26,10 @@
|
||||
, {meck, "0.9.2"}
|
||||
, {trace_runner, {git, "https://github.com/uwiger/trace_runner.git",
|
||||
{ref, "2e56677"}}}
|
||||
]}
|
||||
]},
|
||||
{dialyzer, [{plt_apps, all_deps},
|
||||
{base_plt_apps, [erts, kernel, stdlib, mnesia, runtime_tools, eunit,
|
||||
proper, trace_runner, common_test]}]}
|
||||
]},
|
||||
{edown,
|
||||
%% Use as `rebar3 as edown do edoc`
|
||||
|
||||
@@ -1,11 +1,4 @@
|
||||
%% -*- erlang-mode -*-
|
||||
case os:getenv("ERLANG_ROCKSDB_OPTS") of
|
||||
false ->
|
||||
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
|
||||
_ ->
|
||||
%% If manually set, we assume it's throught through
|
||||
skip
|
||||
end.
|
||||
case os:getenv("DEBUG") of
|
||||
"true" ->
|
||||
Opts = proplists:get_value(erl_opts, CONFIG, []),
|
||||
|
||||
+5
-5
@@ -1,15 +1,15 @@
|
||||
{"1.2.0",
|
||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
|
||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||
{<<"rocksdb">>,
|
||||
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
|
||||
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
|
||||
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||
0},
|
||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||
[
|
||||
{pkg_hash,[
|
||||
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
|
||||
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
|
||||
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
|
||||
{pkg_hash_ext,[
|
||||
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
|
||||
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
|
||||
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
|
||||
].
|
||||
|
||||
+16
-27
@@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
|
||||
i_show_table(I, Move, Limit, Ref) ->
|
||||
case rocksdb:iterator_move(I, Move) of
|
||||
{ok, EncKey, EncVal} ->
|
||||
{Type,Val} =
|
||||
case EncKey of
|
||||
<< ?INFO_TAG, K/binary >> ->
|
||||
K1 = decode_key(K, Ref),
|
||||
V = decode_val(EncVal, K1, Ref),
|
||||
{info,V};
|
||||
_ ->
|
||||
K = decode_key(EncKey, Ref),
|
||||
V = decode_val(EncVal, K, Ref),
|
||||
{data,V}
|
||||
end,
|
||||
io:fwrite("~p: ~p~n", [Type, Val]),
|
||||
K = decode_key(EncKey, Ref),
|
||||
Val = decode_val(EncVal, K, Ref),
|
||||
io:fwrite("~p~n", [Val]),
|
||||
i_show_table(I, next, Limit-1, Ref);
|
||||
_ ->
|
||||
ok
|
||||
@@ -346,14 +337,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
|
||||
semantics(_Alias, _) -> undefined.
|
||||
|
||||
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
|
||||
case info(Alias, Tab, {index_consistent, PosInfo}) of
|
||||
true -> true;
|
||||
_ -> false
|
||||
end.
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
|
||||
|
||||
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
|
||||
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
|
||||
when is_boolean(Bool) ->
|
||||
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
|
||||
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
|
||||
|
||||
|
||||
%% PRIVATE FUN
|
||||
@@ -454,8 +442,13 @@ close_table(Alias, Tab) ->
|
||||
error ->
|
||||
ok;
|
||||
_ ->
|
||||
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
||||
close_table_(Alias, Tab)
|
||||
case get(mnesia_dumper_dets) of
|
||||
undefined ->
|
||||
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
||||
close_table_(Alias, Tab);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end.
|
||||
|
||||
close_table_(Alias, Tab) ->
|
||||
@@ -673,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
|
||||
|
||||
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
|
||||
#{semantics := Sem} = Ref = get_ref(Tab),
|
||||
Start = case Ref of
|
||||
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
|
||||
_ -> first
|
||||
end,
|
||||
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
|
||||
First = fun(I) -> rocksdb:iterator_move(I, first) end,
|
||||
F = case Sem of
|
||||
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
|
||||
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end
|
||||
@@ -798,7 +787,7 @@ handle_call({create_table, Tab, Props}, _From,
|
||||
exit:{aborted, Error} ->
|
||||
{reply, {aborted, Error}, St}
|
||||
end;
|
||||
handle_call({load_table, _LoadReason, Props}, _From,
|
||||
handle_call({load_table, _LoadReason, Props}, _,
|
||||
#st{alias = Alias, tab = Tab} = St) ->
|
||||
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
|
||||
{reply, ok, St#st{status = active}};
|
||||
@@ -826,7 +815,7 @@ handle_call({delete, Key}, _From, St) ->
|
||||
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
|
||||
Res = mrdb:match_delete(get_ref(Tab), Pat),
|
||||
{reply, Res, St};
|
||||
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
||||
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
|
||||
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
|
||||
{reply, ok, St#st{status = undefined}};
|
||||
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
||||
|
||||
+58
-191
@@ -33,6 +33,7 @@
|
||||
, read_info/2 %% (Alias, Tab)
|
||||
, read_info/4 %% (Alias, Tab, Key, Default)
|
||||
, write_info/4 %% (Alias, Tab, Key, Value)
|
||||
, delete_info/3 %% (Alias, Tab, Key)
|
||||
, write_table_property/3 %% (Alias, Tab, Property)
|
||||
]).
|
||||
|
||||
@@ -77,7 +78,7 @@
|
||||
| {add_aliases, [alias()]}
|
||||
| {write_table_property, tabname(), tuple()}
|
||||
| {remove_aliases, [alias()]}
|
||||
| {migrate, [{tabname(), map()}], rpt()}
|
||||
| {migrate, [tabname() | {tabname(), map()}], rpt()}
|
||||
| {prep_close, table()}
|
||||
| {close_table, table()}
|
||||
| {clear_table, table() | cf() }.
|
||||
@@ -249,58 +250,38 @@ get_info_res(Res, Default) ->
|
||||
error(E)
|
||||
end.
|
||||
|
||||
%% Admin info: metadata written by the admin proc to keep track of
|
||||
%% the derived status of tables (such as detected version and encoding
|
||||
%% of existing standalone tables.)
|
||||
%%
|
||||
write_admin_info(K, V, Alias, Name) ->
|
||||
mrdb:rdb_put(get_ref({admin, Alias}),
|
||||
admin_info_key(K, Name),
|
||||
term_to_binary(V)).
|
||||
|
||||
read_admin_info(K, Alias, Name) ->
|
||||
EncK = admin_info_key(K, Name),
|
||||
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
|
||||
{ok, Bin} ->
|
||||
{ok, binary_to_term(Bin)};
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
delete_admin_info(K, Alias, Name) ->
|
||||
EncK = admin_info_key(K, Name),
|
||||
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
|
||||
|
||||
admin_info_key(K, Name) ->
|
||||
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
|
||||
|
||||
%% Table metadata info maintained by users
|
||||
%%
|
||||
write_info(Alias, Tab, K, V) ->
|
||||
write_info_(get_ref({admin, Alias}), Tab, K, V).
|
||||
|
||||
write_info_(Ref, Tab, K, V) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
||||
maybe_write_standalone_info(Ref, K, V),
|
||||
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
|
||||
write_info_encv(Ref, Tab, K, term_to_binary(V)).
|
||||
|
||||
maybe_write_standalone_info(Ref, K, V) ->
|
||||
case Ref of
|
||||
#{type := standalone, vsn := 1, db_ref := DbRef} ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
|
||||
Key = <<?INFO_TAG, EncK/binary>>,
|
||||
EncV = mnesia_rocksdb_lib:encode_val(V, term),
|
||||
rocksdb:put(DbRef, Key, EncV, []);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
write_info_encv(Ref, Tab, K, V) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
||||
mrdb:rdb_put(Ref, EncK, V, []).
|
||||
|
||||
delete_info(Alias, Tab, K) ->
|
||||
delete_info_(get_ref({admin, Alias}), Tab, K).
|
||||
|
||||
delete_info_(Ref, Tab, K) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
|
||||
mrdb:rdb_delete(Ref, EncK, []).
|
||||
|
||||
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
||||
call(Alias, {write_table_property, Tab, Prop}).
|
||||
|
||||
-spec migrate_standalone(alias(), Tabs) -> Res when
|
||||
Tabs :: [tabname() | {tabname(),map()}],
|
||||
Res :: [{tabname(), {ok,any()} | {error, any()}}].
|
||||
migrate_standalone(Alias, Tabs) ->
|
||||
migrate_standalone(Alias, Tabs, undefined).
|
||||
|
||||
-spec migrate_standalone(alias(), Tabs, Rpt) -> Res when
|
||||
Tabs :: [tabname() | {tabname(),map()}],
|
||||
Rpt :: 'undefined' | pid() | atom(),
|
||||
Res :: [{tabname(), {ok,any()} | {error, any()}}].
|
||||
migrate_standalone(Alias, Tabs, Rpt0) ->
|
||||
Rpt = case Rpt0 of
|
||||
undefined -> undefined;
|
||||
@@ -323,7 +304,6 @@ call(Alias, Req, Timeout) ->
|
||||
end.
|
||||
|
||||
start_link() ->
|
||||
mrdb_mutex:ensure_tab(),
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
@@ -405,12 +385,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
|
||||
%% We need to store the persistent ref explicitly here,
|
||||
%% since mnesia knows nothing of our admin table.
|
||||
AdminTab = {admin, Alias},
|
||||
Stats = mrdb_stats:new(),
|
||||
CfI = update_cf_info(AdminTab, #{ status => open
|
||||
, name => AdminTab
|
||||
, vsn => ?VSN
|
||||
, encoding => {sext,{value,term}}
|
||||
, attr_pos => #{key => 1,
|
||||
value => 2}
|
||||
, stats => Stats
|
||||
, mountpoint => MP
|
||||
, properties =>
|
||||
#{ attributes => [key, val]
|
||||
@@ -510,12 +492,17 @@ intersection(A, B) ->
|
||||
|
||||
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
|
||||
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
|
||||
case create_trec(Alias, Name, Props, Backend, St) of
|
||||
{ok, NewCf} ->
|
||||
St1 = update_cf(Alias, Name, NewCf, St),
|
||||
{reply, {ok, NewCf}, St1};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
case find_cf(Alias, Name, Backend, St) of
|
||||
{ok, TRec} ->
|
||||
{reply, {ok, TRec}, St};
|
||||
error ->
|
||||
case create_trec(Alias, Name, Props, Backend, St) of
|
||||
{ok, NewCf} ->
|
||||
St1 = update_cf(Alias, Name, NewCf, St),
|
||||
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
end
|
||||
end;
|
||||
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
|
||||
try
|
||||
@@ -628,8 +615,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
||||
case {Op, lists:member(I, Index)} of
|
||||
{delete, true} ->
|
||||
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
|
||||
delete_info(Alias, Main, {index_consistent, I}),
|
||||
maybe_update_pt(Main, CfM1),
|
||||
update_cf(Alias, Main, CfM1, St);
|
||||
{create, _} ->
|
||||
%% Due to a previous bug, this marker might linger
|
||||
%% In any case, it mustn't be there for a newly created index
|
||||
delete_info(Alias, Main, {index_consistent, I}),
|
||||
St;
|
||||
_ ->
|
||||
St
|
||||
end;
|
||||
@@ -640,6 +633,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
||||
maybe_update_main(_, _, _, St) ->
|
||||
St.
|
||||
|
||||
|
||||
%% The pt may not have been created yet. If so, don't do it here.
|
||||
maybe_update_pt(Name, Ref) ->
|
||||
case get_pt(Name, error) of
|
||||
@@ -778,10 +772,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
false ->
|
||||
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
|
||||
{false, MP} ->
|
||||
create_table_as_standalone(Alias, Name, true, MP, TRec, St);
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St);
|
||||
{true, MP} ->
|
||||
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
||||
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
|
||||
case create_table_as_standalone(Alias, Name, MP, TRec, St) of
|
||||
{ok, OldTRec, _} ->
|
||||
?log(info, "Migrating ~p to column family", [Name]),
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
||||
@@ -790,8 +784,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
end
|
||||
end;
|
||||
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
||||
{Exists, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
|
||||
{_, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St).
|
||||
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
||||
?log(debug, "Migrate to cf (~p)", [Name]),
|
||||
@@ -1071,29 +1065,12 @@ table_exists_as_standalone(Name) ->
|
||||
end,
|
||||
{Exists, MP}.
|
||||
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
|
||||
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
|
||||
{ok, #{type := standalone, vsn := Vsn1,
|
||||
encoding := Enc1} = Cf, _St1} = Ok ->
|
||||
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
|
||||
Alias, Name),
|
||||
case Vsn1 of
|
||||
1 ->
|
||||
load_info(Alias, Name, Cf);
|
||||
_ ->
|
||||
skip
|
||||
end,
|
||||
Ok;
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
|
||||
create_table_as_standalone(Alias, Name, MP, TRec, St) ->
|
||||
Vsn = check_version(TRec),
|
||||
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
|
||||
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St).
|
||||
do_open_standalone(true, Alias, Name, MP, TRec1, St).
|
||||
|
||||
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
|
||||
do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
|
||||
#st{standalone = Ts} = St) ->
|
||||
Opts = rocksdb_opts_from_trec(TRec0),
|
||||
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
|
||||
@@ -1103,135 +1080,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
|
||||
DbRec1 = DbRec#{ cfs => CfNames,
|
||||
mountpoint => MP },
|
||||
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
|
||||
TRec1 = guess_table_vsn_and_encoding(Exists, TRec),
|
||||
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
||||
{ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
||||
{error, _} = Err ->
|
||||
?log(debug, "open_db error: ~p", [Err]),
|
||||
Err
|
||||
end.
|
||||
|
||||
%% When opening a standalone table, chances are it's a legacy table
|
||||
%% where legacy encoding is already in place. We try to read the
|
||||
%% first object and apply legacy encoding. If successful, we set
|
||||
%% legacy encoding in the TRec. If we migrate the data to a column
|
||||
%% family, we should apply the defined encoding for the cf.
|
||||
%%
|
||||
%% The first object can either be an info object (in the legacy case)
|
||||
%% or a data object, with a sext-encoded key, and a term_to_binary-
|
||||
%% encoded object as value, where the key position is set to [].
|
||||
%% The info objects would be sext-encoded key + term-encoded value.
|
||||
guess_table_vsn_and_encoding(false, TRec) ->
|
||||
TRec;
|
||||
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
|
||||
alias := Alias, name := Name} = R) ->
|
||||
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
|
||||
{ok, {V, E}} ->
|
||||
R#{vsn => V, encoding => E};
|
||||
error ->
|
||||
R1 = set_default_guess(R),
|
||||
mrdb:with_rdb_iterator(
|
||||
R1, fun(I) ->
|
||||
guess_table_vsn_and_encoding_(
|
||||
mrdb:rdb_iterator_move(I, first), I, As, R1)
|
||||
end)
|
||||
end.
|
||||
|
||||
set_default_guess(#{type := standalone} = R) ->
|
||||
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
|
||||
1 ->
|
||||
R#{vsn => 1, encoding => {sext, {object, term}}};
|
||||
V ->
|
||||
R#{vsn => V}
|
||||
end.
|
||||
|
||||
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
|
||||
Arity = length(As) + 1,
|
||||
case K of
|
||||
<<?INFO_TAG, EncK/binary>> ->
|
||||
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
|
||||
mnesia_rocksdb_lib:decode(V, term)},
|
||||
%% This is a vsn 1 standalone table
|
||||
R#{vsn => 1, encoding => {sext, {object, term}}}
|
||||
catch
|
||||
error:_ ->
|
||||
R
|
||||
end;
|
||||
_ ->
|
||||
Enc = guess_obj_encoding(K, V, Arity),
|
||||
R#{encoding => Enc}
|
||||
end;
|
||||
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
|
||||
R.
|
||||
|
||||
guess_obj_encoding(K, V, Arity) ->
|
||||
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
|
||||
|
||||
guess_encoding(Bin) ->
|
||||
try {sext, sext:decode(Bin)}
|
||||
catch
|
||||
error:_ ->
|
||||
try {term, binary_to_term(Bin)}
|
||||
catch
|
||||
error:_ -> raw
|
||||
end
|
||||
end.
|
||||
|
||||
guess_key_encoding(Bin) ->
|
||||
case guess_encoding(Bin) of
|
||||
raw -> raw;
|
||||
{Enc, _} -> Enc
|
||||
end.
|
||||
|
||||
guess_val_encoding(Bin, Arity) ->
|
||||
case guess_encoding(Bin) of
|
||||
raw -> {value, raw};
|
||||
{Enc, Term} ->
|
||||
if is_tuple(Term), size(Term) == Arity,
|
||||
element(2, Term) == [] ->
|
||||
{object, Enc};
|
||||
true ->
|
||||
{value, Enc}
|
||||
end
|
||||
end.
|
||||
|
||||
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
|
||||
%% for the presence of some metadata, and still considers it empty if there
|
||||
%% is no user data.
|
||||
table_is_empty(#{} = DbRec) ->
|
||||
Start = iterator_data_start(DbRec),
|
||||
mrdb:with_rdb_iterator(
|
||||
DbRec, fun(I) ->
|
||||
case mrdb:rdb_iterator_move(I, Start) of
|
||||
case mrdb:rdb_iterator_move(I, first) of
|
||||
{ok, _, _} -> false;
|
||||
_ -> true
|
||||
end
|
||||
end).
|
||||
|
||||
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
|
||||
iterator_data_start(_) -> first.
|
||||
|
||||
load_info(Alias, Name, Cf) ->
|
||||
ARef = get_ref({admin, Alias}),
|
||||
mrdb:with_rdb_iterator(
|
||||
Cf, fun(I) ->
|
||||
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
|
||||
end).
|
||||
|
||||
load_info_(Res, I, ARef, Tab) ->
|
||||
case Res of
|
||||
{ok, << ?INFO_TAG, K/binary >>, V} ->
|
||||
DecK = mnesia_rocksdb_lib:decode_key(K),
|
||||
case read_info_(ARef, Tab, DecK, undefined) of
|
||||
undefined ->
|
||||
write_info_(ARef, Tab, DecK, V);
|
||||
_ ->
|
||||
skip
|
||||
end,
|
||||
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
check_version(TRec) ->
|
||||
user_property(mrdb_version, TRec, ?VSN).
|
||||
|
||||
@@ -1443,24 +1309,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
|
||||
%% not yet created
|
||||
CFs = cfs(CFs0),
|
||||
file:make_dir(MP),
|
||||
OpenOpts = [ {create_if_missing, true}
|
||||
, {create_missing_column_families, true}
|
||||
, {merge_operator, erlang_merge_operator}
|
||||
| Opts ],
|
||||
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
|
||||
OpenRes = rocksdb_open(MP, Opts, CFs),
|
||||
map_cfs(OpenRes, CFs, Alias, Acc0);
|
||||
false ->
|
||||
{error, enoent};
|
||||
true ->
|
||||
%% Assumption: even an old rocksdb database file will have at least "default"
|
||||
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
|
||||
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
|
||||
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
|
||||
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
||||
end.
|
||||
|
||||
open_opts(Opts) ->
|
||||
[ {create_if_missing, true}
|
||||
, {create_missing_column_families, true}
|
||||
, {merge_operator, erlang_merge_operator}
|
||||
| Opts ].
|
||||
|
||||
rocksdb_open(MP, Opts, CFs) ->
|
||||
%% rocksdb:open(MP, Opts, CFs),
|
||||
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
|
||||
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
|
||||
|
||||
is_open(Alias, #st{backends = Bs}) ->
|
||||
case maps:find(Alias, Bs) of
|
||||
@@ -1559,7 +1427,6 @@ close_and_delete_standalone(#{alias := Alias,
|
||||
case get_table_mountpoint(Alias, Name, St) of
|
||||
{ok, MP} ->
|
||||
close_and_delete(DbRef, MP),
|
||||
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
|
||||
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
|
||||
error ->
|
||||
St
|
||||
|
||||
@@ -42,4 +42,5 @@ start_link() ->
|
||||
%% ===================================================================
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
|
||||
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
|
||||
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
|
||||
|
||||
+205
-95
@@ -53,6 +53,7 @@
|
||||
, delete/2 , delete/3
|
||||
, delete_object/2, delete_object/3
|
||||
, match_delete/2
|
||||
, merge/3 , merge/4
|
||||
, clear_table/1
|
||||
, batch_write/2 , batch_write/3
|
||||
, update_counter/3, update_counter/4
|
||||
@@ -60,14 +61,17 @@
|
||||
, get_batch/1
|
||||
, snapshot/1
|
||||
, release_snapshot/1
|
||||
, first/1 , first/2
|
||||
, next/2 , next/3
|
||||
, prev/2 , prev/3
|
||||
, last/1 , last/2
|
||||
, select/2 , select/3
|
||||
, first/1 , first/2
|
||||
, next/2 , next/3
|
||||
, prev/2 , prev/3
|
||||
, last/1 , last/2
|
||||
, select/2 , select/3
|
||||
, select_reverse/2, select_reverse/3
|
||||
, select/1
|
||||
, fold/3 , fold/4 , fold/5
|
||||
, rdb_fold/4 , rdb_fold/5
|
||||
, fold/3 , fold/4 , fold/5
|
||||
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
|
||||
, rdb_fold/4 , rdb_fold/5
|
||||
, rdb_fold_reverse/4, rdb_fold_reverse/5
|
||||
, write_info/3
|
||||
, read_info/2
|
||||
, read_info/1
|
||||
@@ -105,6 +109,8 @@
|
||||
-include("mnesia_rocksdb.hrl").
|
||||
-include("mnesia_rocksdb_int.hrl").
|
||||
|
||||
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
|
||||
|
||||
-type tab_name() :: atom().
|
||||
-type alias() :: atom().
|
||||
-type admin_tab() :: {admin, alias()}.
|
||||
@@ -115,7 +121,9 @@
|
||||
| index()
|
||||
| retainer().
|
||||
|
||||
-type retries() :: non_neg_integer().
|
||||
-type inner() :: non_neg_integer().
|
||||
-type outer() :: non_neg_integer().
|
||||
-type retries() :: outer() | {inner(), outer()}.
|
||||
|
||||
%% activity type 'ets' makes no sense in this context
|
||||
-type mnesia_activity_type() :: transaction
|
||||
@@ -124,14 +132,15 @@
|
||||
| sync_dirty.
|
||||
|
||||
-type tx_options() :: #{ retries => retries()
|
||||
, no_snapshot => boolean() }.
|
||||
, no_snapshot => boolean()
|
||||
, mnesia_compatible => boolean() }.
|
||||
-type mrdb_activity_type() :: tx | {tx, tx_options()} | batch.
|
||||
|
||||
-type activity_type() :: mrdb_activity_type() | mnesia_activity_type().
|
||||
|
||||
-type key() :: any().
|
||||
-type obj() :: tuple().
|
||||
-type index_position() :: atom() | pos().
|
||||
-type index_position() :: atom() | pos() | plugin_ix_pos().
|
||||
|
||||
-type db_handle() :: rocksdb:db_handle().
|
||||
-type cf_handle() :: rocksdb:cf_handle().
|
||||
@@ -141,16 +150,20 @@
|
||||
|
||||
-type tx_activity() :: #{ type := 'tx'
|
||||
, handle := tx_handle()
|
||||
, attempt := non_neg_integer() }.
|
||||
, attempt := 'undefined' | retries() }.
|
||||
-type batch_activity() :: #{ type := 'batch'
|
||||
, handle := batch_handle() }.
|
||||
-type activity() :: tx_activity() | batch_activity().
|
||||
|
||||
-type pos() :: non_neg_integer().
|
||||
-type plugin_ix_pos() :: {atom()}.
|
||||
|
||||
-type propkey() :: any().
|
||||
-type propvalue() :: any().
|
||||
-type properties() :: #{ record_name := atom()
|
||||
, attributes := [atom()]
|
||||
, index := [{pos(), bag | ordered}]
|
||||
, user_properties => #{propkey() => propvalue()}
|
||||
}.
|
||||
-type semantics() :: bag | set.
|
||||
-type key_encoding() :: 'raw' | 'sext' | 'term'.
|
||||
@@ -246,7 +259,7 @@ release_snapshot(SHandle) ->
|
||||
%% <li> `{tx, TxOpts}' - A `rocksdb' transaction with sligth modifications</li>
|
||||
%% <li> `batch' - A `rocksdb' batch operation</li>
|
||||
%% </ul>
|
||||
%%
|
||||
%%
|
||||
%% By default, transactions are combined with a snapshot with 1 retry.
|
||||
%% The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
|
||||
%% A transaction will be retried if it detects that the commit set conflicts with recent changes.
|
||||
@@ -254,6 +267,14 @@ release_snapshot(SHandle) ->
|
||||
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
|
||||
%% the commit set. It will then be re-run again, until the retry count is exhausted.
|
||||
%%
|
||||
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
|
||||
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
|
||||
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
|
||||
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
|
||||
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
|
||||
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
|
||||
%% attempt, but only on outer retries (the first retry is always an outer retry).
|
||||
%%
|
||||
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
|
||||
%%
|
||||
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
||||
@@ -269,41 +290,92 @@ activity(Type, Alias, F) ->
|
||||
#{ alias => Alias
|
||||
, db_ref => DbRef }, TxCtxt);
|
||||
batch ->
|
||||
Batch = get_batch_(DbRef),
|
||||
Batch = init_batch_ref(DbRef),
|
||||
#{ activity => #{ type => batch
|
||||
, handle => Batch }
|
||||
, alias => Alias
|
||||
, db_ref => DbRef }
|
||||
end,
|
||||
do_activity(F, Alias, Ctxt, false).
|
||||
do_activity(F, Alias, Ctxt).
|
||||
|
||||
do_activity(F, Alias, Ctxt, WithLock) ->
|
||||
try run_f(F, Ctxt, WithLock, Alias) of
|
||||
Res ->
|
||||
try commit_and_pop(Res)
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
do_activity(F, Alias, Ctxt, true)
|
||||
end
|
||||
do_activity(F, Alias, Ctxt) ->
|
||||
try try_f(F, Ctxt)
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
retry_activity(F, Alias, Ctxt)
|
||||
end.
|
||||
|
||||
try_f(F, Ctxt) ->
|
||||
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
|
||||
|
||||
try_f(false, F, Ctxt) ->
|
||||
try run_f(F, Ctxt) of
|
||||
Res ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
Cat:Err:T ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||
abort_and_pop(Cat, {Err, T})
|
||||
end;
|
||||
try_f(true, F, Ctxt) ->
|
||||
try run_f(F, Ctxt) of
|
||||
Res ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
Cat:Err ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult
|
||||
abort_and_pop(Cat, Err)
|
||||
end.
|
||||
|
||||
run_f(F, Ctxt, false, _) ->
|
||||
push_ctxt(Ctxt),
|
||||
F();
|
||||
run_f(F, Ctxt, true, Alias) ->
|
||||
mrdb_mutex:do(
|
||||
Alias,
|
||||
fun() ->
|
||||
push_ctxt(incr_attempt(Ctxt)),
|
||||
F()
|
||||
end).
|
||||
|
||||
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
|
||||
run_f(F, Ctxt) ->
|
||||
push_ctxt(Ctxt),
|
||||
F().
|
||||
|
||||
incr_attempt(0, {_,O}) when O > 0 ->
|
||||
{outer, {0,1}};
|
||||
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
|
||||
is_integer(Ri), is_integer(Ro) ->
|
||||
if I < Ri -> {inner, {I+1, O}};
|
||||
O < Ro -> {outer, {0, O+1}};
|
||||
true ->
|
||||
error
|
||||
end;
|
||||
incr_attempt(_, _) ->
|
||||
error.
|
||||
|
||||
retry_activity(F, Alias, #{activity := #{ type := Type
|
||||
, attempt := A
|
||||
, retries := R} = Act} = Ctxt) ->
|
||||
case incr_attempt(A, R) of
|
||||
{RetryCtxt, A1} ->
|
||||
Act1 = Act#{attempt := A1},
|
||||
Ctxt1 = Ctxt#{activity := Act1},
|
||||
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
retry_activity(F, Alias, Ctxt1)
|
||||
end;
|
||||
error ->
|
||||
return_abort(Type, error, retry_limit)
|
||||
end.
|
||||
|
||||
retry_activity_(inner, F, Alias, Ctxt) ->
|
||||
mrdb_stats:incr(Alias, inner_retries, 1),
|
||||
try_f(F, Ctxt);
|
||||
retry_activity_(outer, F, Alias, Ctxt) ->
|
||||
mrdb_stats:incr(Alias, outer_retries, 1),
|
||||
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
|
||||
|
||||
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
|
||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||
Act1 = Act#{attempt := A+1, handle := TxH},
|
||||
Act1 = Act#{handle := TxH},
|
||||
C1 = C#{ activity := Act1 },
|
||||
case maps:is_key(snapshot, C) of
|
||||
true ->
|
||||
@@ -368,7 +440,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
|
||||
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
||||
|
||||
check_tx_opts(Opts) ->
|
||||
case maps:without([no_snapshot, retries], Opts) of
|
||||
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
|
||||
Other when map_size(Other) > 0 ->
|
||||
abort({invalid_tx_opts, maps:keys(Other)});
|
||||
_ ->
|
||||
@@ -376,10 +448,14 @@ check_tx_opts(Opts) ->
|
||||
end.
|
||||
|
||||
check_retries(#{retries := Retries} = Opts) ->
|
||||
if is_integer(Retries), Retries >= 0 ->
|
||||
Opts;
|
||||
true ->
|
||||
error({invalid_tx_option, {retries, Retries}})
|
||||
case Retries of
|
||||
_ when is_integer(Retries), Retries >= 0 ->
|
||||
Opts#{retries := {0, Retries}};
|
||||
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
|
||||
Inner >= 0, Outer >= 0 ->
|
||||
Opts;
|
||||
_ ->
|
||||
error({invalid_tx_option, {retries, Retries}})
|
||||
end.
|
||||
|
||||
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
|
||||
@@ -394,7 +470,7 @@ create_tx(Opts, DbRef) ->
|
||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||
Opts#{activity => maps:merge(Opts, #{ type => tx
|
||||
, handle => TxH
|
||||
, attempt => 1})}.
|
||||
, attempt => 0 })}.
|
||||
|
||||
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
|
||||
case NoSnap of
|
||||
@@ -414,8 +490,7 @@ commit_and_pop(Res) ->
|
||||
Res;
|
||||
{error, {error, "Resource busy" ++ _ = Busy}} ->
|
||||
case A of
|
||||
#{retries := Retries, attempt := Att}
|
||||
when Att =< Retries ->
|
||||
#{retries := {I,O}} when I > 0; O > 0 ->
|
||||
throw({?MODULE, busy});
|
||||
_ ->
|
||||
error({error, Busy})
|
||||
@@ -473,10 +548,14 @@ re_throw(Cat, Err) ->
|
||||
throw -> throw(Err)
|
||||
end.
|
||||
|
||||
|
||||
mnesia_compatible_aborts() ->
|
||||
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
||||
|
||||
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
|
||||
Bool;
|
||||
mnesia_compatible_aborts(_) ->
|
||||
mnesia_compatible_aborts().
|
||||
|
||||
fix_error({aborted, Err}) ->
|
||||
Err;
|
||||
fix_error(Err) ->
|
||||
@@ -486,7 +565,7 @@ rdb_transaction(DbRef, Opts) ->
|
||||
rocksdb:transaction(DbRef, Opts).
|
||||
|
||||
rdb_transaction_commit_and_pop(H) ->
|
||||
try rdb_transaction_commit(H)
|
||||
try rdb_transaction_commit(H)
|
||||
after
|
||||
pop_ctxt()
|
||||
end.
|
||||
@@ -507,10 +586,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
|
||||
pop_ctxt()
|
||||
end.
|
||||
|
||||
rdb_release_batch(?BATCH_REF_DUMMY) ->
|
||||
ok;
|
||||
rdb_release_batch(H) ->
|
||||
rocksdb:release_batch(H).
|
||||
|
||||
%% @doc Aborts an ongoing {@link activity/2}
|
||||
-spec abort(_) -> no_return().
|
||||
abort(Reason) ->
|
||||
case mnesia_compatible_aborts() of
|
||||
true ->
|
||||
@@ -529,7 +611,7 @@ new_tx(#{activity := _}, _) ->
|
||||
new_tx(Tab, Opts) ->
|
||||
#{db_ref := DbRef} = R = ensure_ref(Tab),
|
||||
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
|
||||
|
||||
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
|
||||
tx_ref(Tab, TxH) ->
|
||||
@@ -539,7 +621,7 @@ tx_ref(Tab, TxH) ->
|
||||
#{activity := #{type := tx, handle := OtherTxH}} ->
|
||||
error({tx_handle_conflict, OtherTxH});
|
||||
R ->
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
|
||||
end.
|
||||
|
||||
-spec tx_commit(tx_handle() | db_ref()) -> ok.
|
||||
@@ -586,10 +668,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
|
||||
inherit_ctxt(Ref, R) ->
|
||||
maps:merge(Ref, maps:with([snapshot, activity], R)).
|
||||
|
||||
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
|
||||
%%
|
||||
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
|
||||
%% closed once the fun terminates.
|
||||
%% @equiv with_iterator(Tab, Fun, [])
|
||||
%% @end
|
||||
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
|
||||
with_iterator(Tab, Fun) ->
|
||||
with_iterator(Tab, Fun, []).
|
||||
|
||||
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
|
||||
%%
|
||||
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
|
||||
%% closed once the fun terminates.
|
||||
%%
|
||||
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
|
||||
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
|
||||
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
|
||||
%% @end
|
||||
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
|
||||
with_iterator(Tab, Fun, Opts) ->
|
||||
R = ensure_ref(Tab),
|
||||
@@ -666,6 +763,21 @@ insert(Tab, Obj0, Opts) ->
|
||||
EncVal = encode_val(Obj, Ref),
|
||||
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
||||
|
||||
merge(Tab, Key, MergeOp) ->
|
||||
merge(Tab, Key, MergeOp, []).
|
||||
|
||||
merge(Tab, Key, MergeOp, Opts) ->
|
||||
#{encoding := Enc} = Ref = ensure_ref(Tab),
|
||||
case Enc of
|
||||
{_, {value, term}} ->
|
||||
merge_(Ref, Key, MergeOp, Opts);
|
||||
_ ->
|
||||
abort(badarg)
|
||||
end.
|
||||
|
||||
merge_(Ref, Key, MergeOp, Opts) ->
|
||||
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
|
||||
|
||||
validate_obj(Obj, #{mode := mnesia}) ->
|
||||
Obj;
|
||||
validate_obj(Obj, #{attr_pos := AP,
|
||||
@@ -798,7 +910,7 @@ update_index_do_bag(Ixs, Name, R, Key, Obj, Opts) ->
|
||||
not_found
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
update_index_do([{_Pos,ordered} = Ix|Ixs], Name, R, Key, Obj, Rest, Opts) ->
|
||||
Tab = {Name, index, Ix},
|
||||
#{ix_vals_f := IxValsF} = IRef = ensure_ref(Tab, R),
|
||||
@@ -889,7 +1001,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
|
||||
_ when is_atom(Ix) ->
|
||||
{attr_pos(Ix, Ref), ordered};
|
||||
{_} ->
|
||||
Ix;
|
||||
{Ix, ordered};
|
||||
_ when is_integer(Ix) ->
|
||||
{Ix, ordered}
|
||||
end,
|
||||
@@ -970,7 +1082,7 @@ alias_of(Tab) ->
|
||||
%% and when releasing, all batches are released. This will not ensure
|
||||
%% atomicity, but there is no way in rocksdb to achieve atomicity
|
||||
%% across db instances. At least, data should end up where you expect.
|
||||
%%
|
||||
%%
|
||||
%% @end
|
||||
-spec as_batch(ref_or_tab(), fun( (db_ref()) -> Res )) -> Res.
|
||||
as_batch(Tab, F) ->
|
||||
@@ -1010,18 +1122,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
|
||||
get_batch(_) ->
|
||||
{error, badarg}.
|
||||
|
||||
get_batch_(DbRef) ->
|
||||
init_batch_ref(DbRef) ->
|
||||
Ref = make_ref(),
|
||||
{ok, Batch} = rdb_batch(),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
|
||||
Ref.
|
||||
|
||||
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
|
||||
|
||||
get_batch_(DbRef, BatchRef) ->
|
||||
Key = batch_ref_key(BatchRef),
|
||||
case pdict_get(Key) of
|
||||
undefined ->
|
||||
error(stale_batch_ref);
|
||||
#{DbRef := Batch} ->
|
||||
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
|
||||
Batch;
|
||||
Map ->
|
||||
{ok, Batch} = rdb_batch(),
|
||||
@@ -1050,7 +1164,9 @@ write_batches(BatchRef, Opts) ->
|
||||
pdict_erase(Key),
|
||||
ret_batch_write_acc(
|
||||
maps:fold(
|
||||
fun(DbRef, Batch, Acc) ->
|
||||
fun(_, ?BATCH_REF_DUMMY, Acc) ->
|
||||
Acc;
|
||||
(DbRef, Batch, Acc) ->
|
||||
case rocksdb:write_batch(DbRef, Batch, Opts) of
|
||||
ok ->
|
||||
Acc;
|
||||
@@ -1206,6 +1322,13 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:select(ensure_ref(Tab), Pat, Limit).
|
||||
|
||||
select_reverse(Tab, Pat) ->
|
||||
select_reverse(Tab, Pat, infinity).
|
||||
|
||||
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
|
||||
|
||||
select(Cont) ->
|
||||
mrdb_select:select(Cont).
|
||||
|
||||
@@ -1260,6 +1383,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc) ->
|
||||
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
|
||||
|
||||
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
|
||||
|
||||
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
rdb_fold(Tab, Fun, Acc, Prefix, infinity).
|
||||
@@ -1269,7 +1402,16 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||
|
||||
valid_limit(L) ->
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
|
||||
|
||||
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
||||
, is_binary(Prefix) ->
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||
|
||||
valid_limit(L) ->
|
||||
case L of
|
||||
infinity ->
|
||||
true;
|
||||
@@ -1280,15 +1422,7 @@ valid_limit(L) ->
|
||||
end.
|
||||
|
||||
write_info(Tab, K, V) ->
|
||||
R = ensure_ref(Tab),
|
||||
Alias = case R of
|
||||
#{type := standalone, vsn := 1, alias := A} = TRef ->
|
||||
%% Also write on legacy info format
|
||||
write_info_standalone(TRef, K, V),
|
||||
A;
|
||||
#{alias := A} ->
|
||||
A
|
||||
end,
|
||||
#{alias := Alias} = ensure_ref(Tab),
|
||||
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
|
||||
|
||||
write_info_(#{} = R, Tab, K, V) ->
|
||||
@@ -1305,13 +1439,8 @@ read_info(Tab, K) ->
|
||||
read_info(Tab, K, Default) when K==size; K==memory ->
|
||||
read_direct_info_(ensure_ref(Tab), K, Default);
|
||||
read_info(Tab, K, Default) ->
|
||||
#{alias := Alias} = R = ensure_ref(Tab),
|
||||
case R of
|
||||
#{type := standalone, vsn := 1} = TRef ->
|
||||
read_info_standalone(TRef, K, Default);
|
||||
#{alias := Alias} ->
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
|
||||
end.
|
||||
#{alias := Alias} = ensure_ref(Tab),
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
|
||||
|
||||
read_direct_info_(R, memory, _Def) ->
|
||||
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
|
||||
@@ -1335,26 +1464,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
|
||||
%%rocksdb_boolean(<<"1">>) -> true;
|
||||
%%rocksdb_boolean(<<"0">>) -> false.
|
||||
|
||||
write_info_standalone(#{} = R, K, V) ->
|
||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
||||
EncV = term_to_binary(V),
|
||||
rdb_put(R, EncK, EncV, write_opts(R, [])).
|
||||
|
||||
read_info_standalone(#{} = R, K, Default) ->
|
||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
||||
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
|
||||
|
||||
get_info_res(Res, Default) ->
|
||||
case Res of
|
||||
not_found ->
|
||||
Default;
|
||||
{ok, Bin} ->
|
||||
%% no fancy tricks when encoding/decoding info values
|
||||
binary_to_term(Bin);
|
||||
{error, E} ->
|
||||
error(E)
|
||||
end.
|
||||
|
||||
%% insert_bag_v2(Ref, K, V, Opts) ->
|
||||
%% rdb_merge(Ref, K, {list_append, [V]}
|
||||
|
||||
@@ -1494,8 +1603,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
|
||||
rdb_iterator(R, Opts) ->
|
||||
rdb_iterator_(R, read_opts(R, Opts)).
|
||||
|
||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
||||
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
|
||||
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
||||
%% Note: versions before 1.8.0 expected DbRef as first argument
|
||||
rocksdb:transaction_iterator(TxH, CfH, ROpts);
|
||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
|
||||
rocksdb:iterator(DbRef, CfH, ROpts).
|
||||
|
||||
@@ -1506,11 +1616,11 @@ rdb_merge_(#{db_ref := DbRef, cf_handle := CfH}, K, Op, WOpts) ->
|
||||
rocksdb:merge(DbRef, CfH, K, Op, WOpts).
|
||||
|
||||
write_opts(#{write_opts := Os}, Opts) -> Os ++ Opts;
|
||||
write_opts(_, Opts) ->
|
||||
write_opts(_, Opts) ->
|
||||
Opts.
|
||||
|
||||
read_opts(#{read_opts := Os}, Opts) -> Os ++ Opts;
|
||||
read_opts(_, Opts) ->
|
||||
read_opts(_, Opts) ->
|
||||
Opts.
|
||||
|
||||
-define(EOT, '$end_of_table').
|
||||
|
||||
+90
-3
@@ -6,11 +6,18 @@
|
||||
, iterator_move/2
|
||||
, iterator/2
|
||||
, iterator_close/1
|
||||
, fold/4
|
||||
, rev_fold/4
|
||||
, index_ref/2
|
||||
, select/3
|
||||
, select/4
|
||||
, select_reverse/3
|
||||
, select_reverse/4
|
||||
]).
|
||||
|
||||
-record(mrdb_ix_iter, { i :: mrdb:iterator()
|
||||
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
|
||||
, type = set :: set | bag
|
||||
, sub :: mrdb:ref() | pid()
|
||||
, sub :: pid() | mrdb:db_ref()
|
||||
}).
|
||||
|
||||
-type ix_iterator() :: #mrdb_ix_iter{}.
|
||||
@@ -19,7 +26,7 @@
|
||||
|
||||
-type object() :: tuple().
|
||||
|
||||
-record(subst, { i :: mrdb:iterator()
|
||||
-record(subst, { i :: mrdb:mrdb_iterator()
|
||||
, vals_f
|
||||
, cur
|
||||
, mref }).
|
||||
@@ -30,6 +37,11 @@
|
||||
|
||||
-export_type([ ix_iterator/0 ]).
|
||||
|
||||
-spec index_ref(mrdb:ref_or_tab(), mrdb:index_position()) -> mrdb:db_ref().
|
||||
index_ref(Tab, Ix) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Ix, R).
|
||||
|
||||
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
|
||||
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
|
||||
{ok, I} = iterator(Tab, IxPos),
|
||||
@@ -62,6 +74,57 @@ iterator(Tab, IxPos) ->
|
||||
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
|
||||
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
|
||||
|
||||
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||
when Acc :: any().
|
||||
%% Folds over the index table corresponding to Tab and IxPos.
|
||||
%% The fold fun is called with the index key and the corresponding object,
|
||||
%% or [] if there is no such object.
|
||||
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||
fold_(Tab, IxPos, first, next, FoldFun, Acc).
|
||||
|
||||
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||
when Acc :: any().
|
||||
%% Like fold/4 above, but folds over the index in the reverse order.
|
||||
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
|
||||
|
||||
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
|
||||
with_iterator(
|
||||
Tab, IxPos,
|
||||
fun(I) ->
|
||||
iter_fold(I, Start, Dir, FoldFun, Acc)
|
||||
end).
|
||||
|
||||
select(Tab, Ix, MS) ->
|
||||
select(Tab, Ix, MS, infinity).
|
||||
|
||||
select(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
select_reverse(Tab, Ix, MS) ->
|
||||
select_reverse(Tab, Ix, MS, infinity).
|
||||
|
||||
select_reverse(Tab, Ix, MS, Limit) ->
|
||||
#{} = R = mrdb:ensure_ref(Tab),
|
||||
#{} = IxR = ensure_index_ref(Ix, R),
|
||||
MSpre = pre_match_spec(MS),
|
||||
mrdb_select:select_reverse(IxR#{ derive_obj_f => mk_derive_obj_f(R)
|
||||
, pre_ms => MSpre }, MS, Limit).
|
||||
|
||||
iter_fold(I, Start, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
|
||||
|
||||
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
|
||||
iter_fold_({error, _}, _, _, _, Acc) ->
|
||||
Acc.
|
||||
|
||||
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
case mrdb:iterator_move(I, Dir) of
|
||||
{ok, {{FKey, PKey}}} ->
|
||||
@@ -83,6 +146,30 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
Other
|
||||
end.
|
||||
|
||||
pre_match_spec([{{KeyPat,ObjPat}, Gs, MatchBody} | T]) ->
|
||||
[{{{KeyPat,'_'},ObjPat}, Gs, MatchBody} | pre_match_spec(T)];
|
||||
pre_match_spec([H|T]) ->
|
||||
[H | pre_match_spec(T)];
|
||||
pre_match_spec([]) ->
|
||||
[].
|
||||
|
||||
%% Used for mrdb_select:select()
|
||||
%% The select operation folds over index keys, and for matching keys,
|
||||
%% calls the `derive_obj_f/1` fun, which normally just does `Obj -> [Obj]`.
|
||||
%% In the case of indexes, it fetches the object, if it exists, and then
|
||||
%% returns `[{IndexKey, Object}]`, which is what the matchspec should operate on.
|
||||
%%
|
||||
mk_derive_obj_f(Ref) ->
|
||||
fun({{IxK, Key}}) ->
|
||||
case mrdb:read(Ref, Key, []) of
|
||||
[Obj] ->
|
||||
[{IxK, Obj}];
|
||||
[] ->
|
||||
[]
|
||||
end
|
||||
end.
|
||||
|
||||
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
|
||||
opt_read(R, Key) ->
|
||||
case mrdb:read(R, Key, []) of
|
||||
[Obj] ->
|
||||
|
||||
+92
-62
@@ -3,79 +3,109 @@
|
||||
|
||||
-export([ do/2 ]).
|
||||
|
||||
-export([ ensure_tab/0 ]).
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
||||
-define(LOCK_TAB, ?MODULE).
|
||||
|
||||
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
|
||||
%% The claim operation is done using an atomic list of two updates:
|
||||
%% first, incrementing with 0 - this returns the previous value
|
||||
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
|
||||
%% regardless of previous value. This means that if [0,1] is returned, the resource
|
||||
%% was not locked previously; if [1,1] is returned, it was.
|
||||
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
|
||||
%% critical section.
|
||||
%%
|
||||
%% Releasing the resource is done by deleting the resource. If we just decrement,
|
||||
%% we will end up with lingering unlocked resources, so we might as well delete.
|
||||
%% Either operation is atomic, and the claim op creates the object if it's missing.
|
||||
%% Releasing the resource is done by notifying the server.
|
||||
%%
|
||||
%% Previous implementations tested:
|
||||
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
|
||||
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
|
||||
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
|
||||
%% This is a pretty simple implementation, but it lacks ordering, and appeared
|
||||
%% to sometimes lead to starvation.
|
||||
%% * A duplicate_bag ets table: These are defined such that insertion order is
|
||||
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
|
||||
%% Unfortunately, performance plummeted when tested with > 25 concurrent
|
||||
%% processes. While this is likely a very high number, given that we're talking
|
||||
%% about contention in a system using optimistic concurrency, it's never good
|
||||
%% if performance falls off a cliff.
|
||||
|
||||
do(Rsrc, F) when is_function(F, 0) ->
|
||||
true = claim(Rsrc),
|
||||
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
|
||||
try F()
|
||||
after
|
||||
release(Rsrc)
|
||||
mrdb_mutex_serializer:done(Rsrc, Ref)
|
||||
end.
|
||||
|
||||
claim(Rsrc) ->
|
||||
case claim_(Rsrc) of
|
||||
true -> true;
|
||||
false -> busy_wait(Rsrc, 1000)
|
||||
-ifdef(TEST).
|
||||
|
||||
mutex_test_() ->
|
||||
{foreach,
|
||||
fun setup/0,
|
||||
fun cleanup/1,
|
||||
[
|
||||
{"Check that all operations complete", fun swarm_do/0}
|
||||
]}.
|
||||
|
||||
setup() ->
|
||||
case whereis(mrdb_mutex_serializer) of
|
||||
undefined ->
|
||||
{ok, Pid} = mrdb_mutex_serializer:start_link(),
|
||||
Pid;
|
||||
Pid ->
|
||||
Pid
|
||||
end.
|
||||
|
||||
claim_(Rsrc) ->
|
||||
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
|
||||
{2, 1, 1, 1}], {Rsrc, 0}) of
|
||||
[0, 1] ->
|
||||
%% have lock
|
||||
true;
|
||||
[1, 1] ->
|
||||
false
|
||||
end.
|
||||
cleanup(Pid) ->
|
||||
unlink(Pid),
|
||||
exit(Pid, kill).
|
||||
|
||||
%% The busy-wait function makes use of the fact that we can read a timer to find out
|
||||
%% if it still has time remaining. This reduces the need for selective receive, looking
|
||||
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
|
||||
%% also be necessary for the `read_timer/1` value to refresh.
|
||||
%%
|
||||
busy_wait(Rsrc, Timeout) ->
|
||||
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
|
||||
do_wait(Rsrc, Ref).
|
||||
|
||||
do_wait(Rsrc, Ref) ->
|
||||
erlang:yield(),
|
||||
case erlang:read_timer(Ref) of
|
||||
false ->
|
||||
erlang:cancel_timer(Ref),
|
||||
error(lock_wait_timeout);
|
||||
_ ->
|
||||
case claim_(Rsrc) of
|
||||
true ->
|
||||
erlang:cancel_timer(Ref),
|
||||
ok;
|
||||
false ->
|
||||
do_wait(Rsrc, Ref)
|
||||
end
|
||||
end.
|
||||
|
||||
release(Rsrc) ->
|
||||
ets:delete(?LOCK_TAB, Rsrc),
|
||||
swarm_do() ->
|
||||
Rsrc = ?LINE,
|
||||
Pid = spawn(fun() -> collect([]) end),
|
||||
L = lists:seq(1, 1000),
|
||||
Evens = [X || X <- L, is_even(X)],
|
||||
Pids = [spawn_monitor(fun() ->
|
||||
send_even(Rsrc, N, Pid)
|
||||
end) || N <- lists:seq(1,1000)],
|
||||
await_pids(Pids),
|
||||
Results = fetch(Pid),
|
||||
{incorrect_results, []} = {incorrect_results, Results -- Evens},
|
||||
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
|
||||
ok.
|
||||
|
||||
|
||||
%% Called by the process holding the ets table.
|
||||
ensure_tab() ->
|
||||
case ets:info(?LOCK_TAB, name) of
|
||||
undefined ->
|
||||
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
|
||||
_ ->
|
||||
true
|
||||
collect(Acc) ->
|
||||
receive
|
||||
{_, result, N} ->
|
||||
collect([N|Acc]);
|
||||
{From, fetch} ->
|
||||
From ! {fetch_reply, Acc},
|
||||
done
|
||||
end.
|
||||
|
||||
fetch(Pid) ->
|
||||
Pid ! {self(), fetch},
|
||||
receive
|
||||
{fetch_reply, Result} ->
|
||||
Result
|
||||
end.
|
||||
|
||||
is_even(N) ->
|
||||
(N rem 2) =:= 0.
|
||||
|
||||
await_pids([{_, MRef}|Pids]) ->
|
||||
receive
|
||||
{'DOWN', MRef, _, _, _} ->
|
||||
await_pids(Pids)
|
||||
after 10000 ->
|
||||
error(timeout)
|
||||
end;
|
||||
await_pids([]) ->
|
||||
ok.
|
||||
|
||||
send_even(Rsrc, N, Pid) ->
|
||||
do(Rsrc, fun() ->
|
||||
case is_even(N) of
|
||||
true ->
|
||||
Pid ! {self(), result, N};
|
||||
false ->
|
||||
exit(not_even)
|
||||
end
|
||||
end).
|
||||
|
||||
-endif.
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
-module(mrdb_mutex_serializer).
|
||||
|
||||
-behavior(gen_server).
|
||||
|
||||
-export([wait/1,
|
||||
done/2]).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-record(st, { queues = #{}
|
||||
, empty_q = queue:new() }). %% perhaps silly optimization
|
||||
|
||||
wait(Rsrc) ->
|
||||
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
|
||||
|
||||
done(Rsrc, Ref) ->
|
||||
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
{ok, #st{}}.
|
||||
|
||||
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
|
||||
, empty_q = NewQ } = St) ->
|
||||
MRef = erlang:monitor(process, Pid),
|
||||
Q0 = maps:get(Rsrc, Queues, NewQ),
|
||||
WasEmpty = queue:is_empty(Q0),
|
||||
Q1 = queue:in({From, MRef}, Q0),
|
||||
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
|
||||
case WasEmpty of
|
||||
true ->
|
||||
{reply, {ok, MRef}, St1};
|
||||
false ->
|
||||
{noreply, St1}
|
||||
end.
|
||||
|
||||
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
|
||||
case maps:find(Rsrc, Queues) of
|
||||
{ok, Q} ->
|
||||
case queue:out(Q) of
|
||||
{{value, {_From, MRef}}, Q1} ->
|
||||
erlang:demonitor(MRef),
|
||||
ok = maybe_dispatch_one(Q1),
|
||||
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
|
||||
{_, _} ->
|
||||
%% Not the lock holder
|
||||
{noreply, St}
|
||||
end;
|
||||
error ->
|
||||
{noreply, St}
|
||||
end.
|
||||
|
||||
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
|
||||
Queues1 =
|
||||
maps:map(
|
||||
fun(_Rsrc, Q) ->
|
||||
drop_or_filter(Q, MRef)
|
||||
end, Queues),
|
||||
{noreply, St#st{ queues = Queues1 }};
|
||||
handle_info(_, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
terminate(_, _) ->
|
||||
ok.
|
||||
|
||||
code_change(_FromVsn, St, _Extra) ->
|
||||
{ok, St}.
|
||||
|
||||
%% In this function, we don't actually pop
|
||||
maybe_dispatch_one(Q) ->
|
||||
case queue:peek(Q) of
|
||||
empty ->
|
||||
ok;
|
||||
{value, {From, MRef}} ->
|
||||
gen_server:reply(From, {ok, MRef}),
|
||||
ok
|
||||
end.
|
||||
|
||||
drop_or_filter(Q, MRef) ->
|
||||
case queue:peek(Q) of
|
||||
{value, {_, MRef}} ->
|
||||
Q1 = queue:drop(Q),
|
||||
ok = maybe_dispatch_one(Q1),
|
||||
Q1;
|
||||
{value, _Other} ->
|
||||
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
|
||||
empty ->
|
||||
Q
|
||||
end.
|
||||
+157
-40
@@ -1,11 +1,15 @@
|
||||
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
|
||||
-module(mrdb_select).
|
||||
|
||||
-export([ select/3 %% (Ref, MatchSpec, Limit)
|
||||
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
|
||||
, select/1 %% (Cont)
|
||||
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
|
||||
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
-export([ select/3 %% (Ref, MatchSpec, Limit)
|
||||
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
|
||||
, select_reverse/3
|
||||
, select_reverse/4
|
||||
, select/1 %% (Cont)
|
||||
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
|
||||
, fold_reverse/5
|
||||
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
|
||||
]).
|
||||
|
||||
-export([continuation_info/2]).
|
||||
@@ -25,26 +29,41 @@
|
||||
, compiled_ms
|
||||
, limit
|
||||
, key_only = false % TODO: not used
|
||||
, direction = forward % TODO: not used
|
||||
, direction = forward
|
||||
, pre_ms
|
||||
, derive_obj_f = fun unit_l/1
|
||||
}).
|
||||
|
||||
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
|
||||
select(Ref, MS, false, Limit).
|
||||
select(Ref, MS, false, forward, Limit).
|
||||
|
||||
select(Ref, MS, AccKeys, Limit)
|
||||
select(Ref, MS, AccKeys, Limit) ->
|
||||
select(Ref, MS, AccKeys, forward, Limit).
|
||||
|
||||
select_reverse(Ref, MS, Limit) ->
|
||||
select(Ref, MS, false, reverse, Limit).
|
||||
|
||||
select_reverse(Ref, MS, AccKeys, Limit) ->
|
||||
select(Ref, MS, AccKeys, reverse, Limit).
|
||||
|
||||
select(Ref, MS, AccKeys, Dir, Limit)
|
||||
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
|
||||
Sel = mk_sel(Ref, MS, Limit),
|
||||
Sel = mk_sel(Ref, MS, Dir, Limit),
|
||||
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
|
||||
|
||||
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
|
||||
Keypat = keypat(MS, keypos(Tab), Ref),
|
||||
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
|
||||
MSpre = maps:get(pre_ms, Ref, MS),
|
||||
Keypat = keypat(MSpre, keypos(Tab), Ref),
|
||||
#sel{tab = Tab,
|
||||
ref = Ref,
|
||||
keypat = Keypat,
|
||||
ms = MS,
|
||||
compiled_ms = ets:match_spec_compile(MS),
|
||||
ms = MSpre,
|
||||
pre_ms = MSpre,
|
||||
compiled_ms = ms_compile(MS),
|
||||
key_only = needs_key_only(MS),
|
||||
limit = Limit}.
|
||||
direction = Dir,
|
||||
limit = Limit,
|
||||
derive_obj_f = derive_f(Ref)}.
|
||||
|
||||
select(Cont) ->
|
||||
case Cont of
|
||||
@@ -59,11 +78,18 @@ continuation_info(_, _) -> undefined.
|
||||
|
||||
continuation_info_(ref, #sel{ref = Ref}) -> Ref;
|
||||
continuation_info_(ms, #sel{ms = MS }) -> MS;
|
||||
continuation_info_(pre_ms, #sel{pre_ms = MSp}) -> MSp;
|
||||
continuation_info_(limit, #sel{limit = L }) -> L;
|
||||
continuation_info_(direction, #sel{direction = Dir}) -> Dir;
|
||||
continuation_info_(_, _) -> undefined.
|
||||
|
||||
fold(Ref, Fun, Acc, MS, Limit) ->
|
||||
do_fold(Ref, Fun, Acc, MS, forward, Limit).
|
||||
|
||||
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
|
||||
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
|
||||
|
||||
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
|
||||
{AccKeys, F} =
|
||||
if is_function(Fun, 3) ->
|
||||
{true, fun({K, Obj}, Acc1) ->
|
||||
@@ -74,7 +100,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
|
||||
true ->
|
||||
mrdb:abort(invalid_fold_fun)
|
||||
end,
|
||||
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
|
||||
fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
|
||||
|
||||
fold_('$end_of_table', _, Acc) ->
|
||||
Acc;
|
||||
@@ -86,12 +112,57 @@ fold_({L, Cont}, Fun, Acc) ->
|
||||
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref, fun(I) ->
|
||||
MovRes = rocksdb:iterator_move(I, first(Ref)),
|
||||
MovRes = fwd_init_seek(I, Prefix),
|
||||
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
|
||||
end).
|
||||
|
||||
first(#{vsn := 1}) -> <<?DATA_START>>;
|
||||
first(_) -> first.
|
||||
rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref, fun(I) ->
|
||||
MovRes = rev_init_seek(I, Prefix),
|
||||
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
|
||||
end).
|
||||
|
||||
fwd_init_seek(I, Pfx) ->
|
||||
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
|
||||
|
||||
fwd_init_seek_tgt(<<>> ) -> first;
|
||||
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
|
||||
|
||||
rev_init_seek(I, Pfx) ->
|
||||
case rev_init_seek_tgt(Pfx) of
|
||||
last ->
|
||||
i_move(I, last);
|
||||
{seek, Bin} ->
|
||||
%% An 'incremented' prefix.
|
||||
%% This will fail if we seek past the end of the table.
|
||||
%% Then, try to seek_for_prev instead (fails if table empty).
|
||||
%% This because rocksdb lacks a "seek backward to last matching prefix".
|
||||
case i_move(I, {seek, Bin}) of
|
||||
{error, invalid_iterator} ->
|
||||
i_move(I, {seek_for_prev, Bin});
|
||||
{ok, _, _} = Ok ->
|
||||
Ok
|
||||
end
|
||||
end.
|
||||
|
||||
rev_init_seek_tgt(<<>>) -> last;
|
||||
rev_init_seek_tgt(Prefix) ->
|
||||
case incr_prefix(Prefix) of
|
||||
last -> last;
|
||||
Pfx1 when is_binary(Pfx1) ->
|
||||
{seek, Pfx1}
|
||||
end.
|
||||
|
||||
incr_prefix(<<>>) -> last;
|
||||
incr_prefix(Pfx) when is_binary(Pfx) ->
|
||||
PfxI = binary:decode_unsigned(Pfx),
|
||||
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
|
||||
case PfxI + 1 of
|
||||
I1 when I1 >= MaxI -> last;
|
||||
I1 ->
|
||||
binary:encode_unsigned(I1)
|
||||
end.
|
||||
|
||||
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
case is_prefix(Pfx, K) of
|
||||
@@ -104,20 +175,37 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
i_rdb_fold(_, _, _, _, Acc, _) ->
|
||||
Acc.
|
||||
|
||||
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
|
||||
case is_prefix(Pfx, K) of
|
||||
true ->
|
||||
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
|
||||
Fun(K, V, Acc), decr(Limit));
|
||||
false when K > Pfx ->
|
||||
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
|
||||
false ->
|
||||
Acc
|
||||
end;
|
||||
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
|
||||
Acc.
|
||||
|
||||
i_select(I, #sel{ keypat = Pfx
|
||||
, compiled_ms = MS
|
||||
, limit = Limit
|
||||
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
|
||||
StartKey = case {Pfx, Vsn, Enc} of
|
||||
{<<>>, 1, {sext, _}} ->
|
||||
<<?DATA_START>>;
|
||||
{_, _, {term, _}} ->
|
||||
<<>>;
|
||||
_ ->
|
||||
Pfx
|
||||
end,
|
||||
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
|
||||
Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
, direction = Dir
|
||||
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
|
||||
{MoveRes, Sel} = case Enc of
|
||||
{term, _} ->
|
||||
%% No defined ordering - do forward select
|
||||
{i_move(I, first), Sel0#sel{direction = forward}};
|
||||
_ ->
|
||||
case Dir of
|
||||
forward ->
|
||||
{fwd_init_seek(I, Pfx), Sel0};
|
||||
reverse ->
|
||||
{rev_init_seek(I, Pfx), Sel0}
|
||||
end
|
||||
end,
|
||||
select_traverse(MoveRes, Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
|
||||
needs_key_only([Pat]) ->
|
||||
needs_key_only_(Pat);
|
||||
@@ -193,16 +281,17 @@ map_vars([H|T], P) ->
|
||||
map_vars([], _) ->
|
||||
[].
|
||||
|
||||
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
|
||||
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
|
||||
AccKeys, Acc) ->
|
||||
case is_prefix(Pfx, K) of
|
||||
true ->
|
||||
DecKey = decode_key(K, R),
|
||||
Rec = decode_val(V, DecKey, R),
|
||||
case ets:match_spec_run([Rec], MS) of
|
||||
Rec0 = decode_val(V, DecKey, R),
|
||||
RecL = derive_object(Rec0, Sel),
|
||||
case ms_run(RecL, MS) of
|
||||
[] ->
|
||||
select_traverse(
|
||||
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
|
||||
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
|
||||
I, Sel, AccKeys, Acc);
|
||||
[Match] ->
|
||||
Acc1 = if AccKeys ->
|
||||
@@ -212,6 +301,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
|
||||
end,
|
||||
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
|
||||
end;
|
||||
false when Dir == reverse, K > Pfx ->
|
||||
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
|
||||
I, Sel, AccKeys, Acc);
|
||||
false when Limit == infinity ->
|
||||
lists:reverse(Acc);
|
||||
false ->
|
||||
@@ -220,6 +312,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
|
||||
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
|
||||
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
|
||||
|
||||
next_or_prev(forward) -> next;
|
||||
next_or_prev(reverse) -> prev.
|
||||
|
||||
select_return(infinity, {L, '$end_of_table'}) ->
|
||||
L;
|
||||
select_return(_, Ret) ->
|
||||
@@ -239,29 +334,40 @@ decr(I) when is_integer(I) ->
|
||||
decr(infinity) ->
|
||||
infinity.
|
||||
|
||||
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
|
||||
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
|
||||
{lists:reverse(Acc),
|
||||
fun(sel) -> Sel;
|
||||
(cont) ->
|
||||
mrdb:with_rdb_iterator(
|
||||
Ref,
|
||||
fun(NewI) ->
|
||||
select_traverse(iterator_next(NewI, K),
|
||||
select_traverse(iterator_next(NewI, K, Dir),
|
||||
Limit, Pfx, MS, NewI, Sel,
|
||||
AccKeys, [])
|
||||
end)
|
||||
end};
|
||||
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
|
||||
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
|
||||
select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
|
||||
|
||||
iterator_next(I, K) ->
|
||||
case rocksdb:iterator_move(I, K) of
|
||||
iterator_next(I, K, Dir) ->
|
||||
case i_move(I, K, Dir) of
|
||||
{ok, K, _} ->
|
||||
rocksdb:iterator_move(I, next);
|
||||
rocksdb:iterator_move(I, next_or_prev(Dir));
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
i_move(I, Tgt) ->
|
||||
rocksdb:iterator_move(I, Tgt).
|
||||
|
||||
i_move(I, K, reverse) ->
|
||||
rocksdb:iterator_move(I, {seek_for_prev, K});
|
||||
i_move(I, K, forward) ->
|
||||
rocksdb:iterator_move(I, K).
|
||||
|
||||
derive_object(R, #sel{derive_obj_f = F}) ->
|
||||
F(R).
|
||||
|
||||
keypat([H|T], KeyPos, Ref) ->
|
||||
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
|
||||
|
||||
@@ -283,3 +389,14 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadP
|
||||
keypat_pfx(_, _, _) ->
|
||||
<<>>.
|
||||
|
||||
derive_f(#{derive_obj_f := F}) when is_function(F, 1) -> F;
|
||||
derive_f(_) -> fun unit_l/1.
|
||||
|
||||
unit_l(X) ->
|
||||
[X].
|
||||
|
||||
ms_compile(MS) ->
|
||||
ets:match_spec_compile(MS).
|
||||
|
||||
ms_run(RecL, MS) ->
|
||||
ets:match_spec_run(RecL, MS).
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
|
||||
%% @doc Statistics API for the mnesia_rocksdb plugin
|
||||
%%
|
||||
%% Some counters are maintained for each active alias. Currently, the following
|
||||
%% counters are supported:
|
||||
%% * inner_retries
|
||||
%% * outer_retries
|
||||
%%
|
||||
-module(mrdb_stats).
|
||||
|
||||
-export([new/0]).
|
||||
|
||||
-export([incr/3,
|
||||
get/1,
|
||||
get/2]).
|
||||
|
||||
-type alias() :: mnesia_rocksdb:alias().
|
||||
-type db_ref() :: mrdb:db_ref().
|
||||
-type counter() :: atom().
|
||||
-type increment() :: integer().
|
||||
|
||||
-type counters() :: #{ counter() := integer() }.
|
||||
|
||||
new() ->
|
||||
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
|
||||
, meta => ctr_meta()}.
|
||||
|
||||
ctr_meta() ->
|
||||
#{ inner_retries => 1
|
||||
, outer_retries => 2 }.
|
||||
|
||||
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
|
||||
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
|
||||
%%
|
||||
%% Note that the first argument may also be a `db_ref()' map,
|
||||
%% corresponding to `mrdb:get_ref({admin, Alias})'.
|
||||
%% @end
|
||||
incr(Alias, Ctr, N) when is_atom(Alias) ->
|
||||
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||
incr_(Ref, Meta, Ctr, N);
|
||||
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
|
||||
incr_(Ref, Meta, Ctr, N).
|
||||
|
||||
-spec get(alias() | db_ref(), counter()) -> integer().
|
||||
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
|
||||
%% @end
|
||||
get(Alias, Ctr) when is_atom(Alias) ->
|
||||
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||
get_(Ref, Meta, Ctr);
|
||||
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
|
||||
get_(Ref, Meta, Ctr).
|
||||
|
||||
-spec get(alias() | db_ref()) -> counters().
|
||||
%% @doc Fetches all known counters for `Alias', in the form of a map,
|
||||
%% `#{Counter => Value}'.
|
||||
%% @end
|
||||
get(Alias) when is_atom(Alias) ->
|
||||
get_(mrdb:get_ref({admin, Alias}));
|
||||
get(Ref) when is_map(Ref) ->
|
||||
get_(Ref).
|
||||
|
||||
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
|
||||
lists:foldl(
|
||||
fun({K, P}, M) ->
|
||||
M#{K := counters:get(Ref, P)}
|
||||
end, Meta, maps:to_list(Meta)).
|
||||
|
||||
get_(Ref, Meta, Attr) ->
|
||||
Ix = maps:get(Attr, Meta),
|
||||
counters:get(Ref, Ix).
|
||||
|
||||
incr_(Ref, Meta, Attr, N) ->
|
||||
Ix = maps:get(Attr, Meta),
|
||||
counters:add(Ref, Ix, N).
|
||||
@@ -1,64 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% basho_bench: Benchmarking Suite
|
||||
%%
|
||||
%% Copyright (c) 2009-2010 Basho Techonologies
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(basho_bench_driver_mnesia_rocksdb).
|
||||
|
||||
-export([new/1,
|
||||
run/4]).
|
||||
|
||||
-include("mnesia_rocksdb_basho_bench.hrl").
|
||||
|
||||
%% ====================================================================
|
||||
%% API
|
||||
%% ====================================================================
|
||||
|
||||
new(_Id) ->
|
||||
Type = basho_bench_config:get(backend, ram_copies),
|
||||
Tab = basho_bench_config:get(mnesia_table, t),
|
||||
ok = bootstrap_mnesia(Tab, Type),
|
||||
{ok, Tab}.
|
||||
|
||||
bootstrap_mnesia(Tab, Type) ->
|
||||
ok = mnesia:create_schema([node()],
|
||||
[{backend_types,
|
||||
[{rocksdb_copies, mnesia_rocksdb}]}]),
|
||||
ok = mnesia:start(),
|
||||
{atomic,ok} = mnesia:create_table(Tab, [{Type, [node()]}]),
|
||||
mnesia:wait_for_tables([Tab], 10000).
|
||||
|
||||
run(get, KeyGen, _ValueGen, State) ->
|
||||
Tab = State,
|
||||
Key = KeyGen(),
|
||||
case mnesia:dirty_read({Tab, Key}) of
|
||||
[] ->
|
||||
{ok, State};
|
||||
[{_, Key, _}] ->
|
||||
{ok, State}
|
||||
end;
|
||||
run(put, KeyGen, ValueGen, State) ->
|
||||
Tab = State,
|
||||
ok = mnesia:dirty_write({Tab, KeyGen(), ValueGen()}),
|
||||
{ok, State};
|
||||
run(delete, KeyGen, _ValueGen, State) ->
|
||||
Tab = State,
|
||||
ok = mnesia:dirty_delete({Tab, KeyGen()}),
|
||||
{ok, State}.
|
||||
+188
-15
@@ -24,8 +24,12 @@
|
||||
, mrdb_abort/1
|
||||
, mrdb_two_procs/1
|
||||
, mrdb_two_procs_tx_restart/1
|
||||
, mrdb_two_procs_tx_inner_restart/1
|
||||
, mrdb_two_procs_snap/1
|
||||
, mrdb_three_procs/1
|
||||
, create_counters/1
|
||||
, update_counters/1
|
||||
, restart_node/1
|
||||
]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
@@ -41,7 +45,8 @@ all() ->
|
||||
groups() ->
|
||||
[
|
||||
{all_tests, [sequence], [ {group, checks}
|
||||
, {group, mrdb} ]}
|
||||
, {group, mrdb}
|
||||
, {group, counters}]}
|
||||
%% , error_handling ]}
|
||||
, {checks, [sequence], [ encoding_sext_attrs
|
||||
, encoding_binary_binary
|
||||
@@ -53,8 +58,13 @@ groups() ->
|
||||
, mrdb_abort
|
||||
, mrdb_two_procs
|
||||
, mrdb_two_procs_tx_restart
|
||||
, mrdb_two_procs_tx_inner_restart
|
||||
, mrdb_two_procs_snap
|
||||
, mrdb_three_procs ]}
|
||||
, {counters, [sequence], [ create_counters
|
||||
, update_counters
|
||||
, restart_node
|
||||
, update_counters ]}
|
||||
].
|
||||
|
||||
|
||||
@@ -206,6 +216,7 @@ mrdb_transactions_(Config) ->
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
-dialyzer(no_return).
|
||||
mrdb_abort_reasons(_Config) ->
|
||||
Prev = mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, true),
|
||||
X = some_value,
|
||||
@@ -267,7 +278,7 @@ mrdb_abort(Config) ->
|
||||
Pre = mrdb:read(tx_abort, a),
|
||||
D0 = get_dict(),
|
||||
TRes = try mrdb:activity(
|
||||
tx, rdb,
|
||||
{tx, #{mnesia_compatible => true}}, rdb,
|
||||
fun() ->
|
||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||
error(abort_here),
|
||||
@@ -287,10 +298,17 @@ mrdb_abort(Config) ->
|
||||
mrdb_two_procs(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
|
||||
tr_flags(
|
||||
{self(), [call, sos, p]},
|
||||
{self(), [call, sos, p, 'receive']},
|
||||
tr_patterns(
|
||||
mrdb, [ {mrdb, insert, 2, x}
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, retry_activity, 3, x}
|
||||
, {mrdb, try_f, 2, x}
|
||||
, {mrdb, incr_attempt, 2, x}
|
||||
, {mrdb_mutex, do, 2, x}
|
||||
, {mrdb_mutex_serializer, do, 2, x}
|
||||
, {?MODULE, wait_for_other, 2, x}
|
||||
, {?MODULE, go_ahead_other, 1, x}
|
||||
, {mrdb, activity, x}], tr_opts()))).
|
||||
|
||||
mrdb_two_procs_(Config) ->
|
||||
@@ -314,11 +332,16 @@ mrdb_two_procs_(Config) ->
|
||||
Pre = mrdb:read(R, a),
|
||||
go_ahead_other(POther),
|
||||
await_other_down(POther, MRef, ?LINE),
|
||||
%% The test proc is still inside the transaction, and POther
|
||||
%% has terminated/committed. If we now read 'a', we should see
|
||||
%% the value that POther wrote (no isolation).
|
||||
[{R, a, 17}] = mrdb:read(R, a),
|
||||
ok = mrdb:insert(R, {R, a, 18})
|
||||
end,
|
||||
go_ahead_other(1, POther),
|
||||
go_ahead_other(0, POther),
|
||||
Do0 = get_dict(),
|
||||
%% Our transaction should fail due to the resource conflict, and because
|
||||
%% we're not using retries.
|
||||
try mrdb:activity({tx, #{no_snapshot => true,
|
||||
retries => 0}}, rdb, F1) of
|
||||
ok -> error(unexpected)
|
||||
@@ -339,6 +362,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
Parent = self(),
|
||||
Created = create_tabs([{R, []}], Config),
|
||||
check_stats(rdb),
|
||||
mrdb:insert(R, {R, a, 1}),
|
||||
Pre = mrdb:read(R, a),
|
||||
F0 = fun() ->
|
||||
@@ -354,7 +378,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
OtherWrite = [{R, a, 17}],
|
||||
Att = get_attempt(),
|
||||
Expected = case Att of
|
||||
1 -> Pre;
|
||||
0 -> Pre;
|
||||
_ -> OtherWrite
|
||||
end,
|
||||
Expected = mrdb:read(R, a),
|
||||
@@ -363,14 +387,104 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
OtherWrite = mrdb:read(R, a),
|
||||
ok = mrdb:insert(R, {R, a, 18})
|
||||
end,
|
||||
go_ahead_other(1, POther),
|
||||
go_ahead_other(0, POther),
|
||||
Do0 = get_dict(),
|
||||
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
|
||||
dictionary_unchanged(Do0),
|
||||
check_stats(rdb),
|
||||
[{R, a, 18}] = mrdb:read(R, a),
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
mrdb_two_procs_tx_inner_restart(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
|
||||
dbg_tr_opts()).
|
||||
|
||||
mrdb_two_procs_tx_inner_restart_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
Parent = self(),
|
||||
Created = create_tabs([{R, []}], Config),
|
||||
mrdb:insert(R, {R, a, 1}),
|
||||
mrdb:insert(R, {R, b, 1}),
|
||||
PreA = mrdb:read(R, a),
|
||||
PreB = mrdb:read(R, b),
|
||||
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
|
||||
F0 = fun() ->
|
||||
wait_for_other(Parent, ?LINE),
|
||||
ok = mrdb:insert(R, {R, a, 17}),
|
||||
wait_for_other(Parent, ?LINE)
|
||||
end,
|
||||
F1 = fun() ->
|
||||
wait_for_other(Parent, ?LINE),
|
||||
ok = mrdb:insert(R, {R, b, 147}),
|
||||
wait_for_other(Parent, ?LINE)
|
||||
end,
|
||||
|
||||
Spawn = fun(F) ->
|
||||
Res = spawn_opt(
|
||||
fun() ->
|
||||
ok = mrdb:activity(tx, rdb, F)
|
||||
end, [monitor]),
|
||||
Res
|
||||
end,
|
||||
{P1, MRef1} = Spawn(F0),
|
||||
{P2, MRef2} = Spawn(F1),
|
||||
F2 = fun() ->
|
||||
PostA = [{R, a, 17}], % once F0 (P1) has committed
|
||||
PostB = [{R, b, 147}], % once F1 (P2) has committed
|
||||
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
|
||||
BRes = mrdb:read(R, b),
|
||||
Att = get_attempt(),
|
||||
ct:log("Att = ~p", [Att]),
|
||||
case Att of
|
||||
0 -> % This is the first run (no retry yet)
|
||||
ARes = PreA,
|
||||
BRes = PreB,
|
||||
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
|
||||
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
|
||||
await_other_down(P1, MRef1, ?LINE),
|
||||
PostA = mrdb:read(R, a); % now, P1's write should leak through here
|
||||
{0, 1} -> % This is the first (outer) retry
|
||||
go_ahead_other(0, P2), % Let P2 write
|
||||
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
|
||||
await_other_down(P2, MRef2, ?LINE),
|
||||
PostA = mrdb:read(R, a), % now we should see writes from both P1
|
||||
%% On OTP 23, the following step might fail due to timing, even
|
||||
%% though the trace output looks as expected. Possibly some quirk
|
||||
%% with leakage propagation time in rocksdb. Let's retry to be sure.
|
||||
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
|
||||
{1, 1} ->
|
||||
PostA = mrdb:read(R, a),
|
||||
PostB = mrdb:read(R, b),
|
||||
ok
|
||||
end,
|
||||
mrdb:insert(R, {R, a, 18}),
|
||||
mrdb:insert(R, {R, b, 18})
|
||||
end,
|
||||
Do0 = get_dict(),
|
||||
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
|
||||
check_stats(rdb),
|
||||
dictionary_unchanged(Do0),
|
||||
[{R, a, 18}] = mrdb:read(R, a),
|
||||
[{R, b, 18}] = mrdb:read(R, b),
|
||||
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
|
||||
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
try_until(Result, F) ->
|
||||
try_until(Result, F, 10).
|
||||
|
||||
try_until(Result, F, N) when N > 0 ->
|
||||
case F() of
|
||||
Result ->
|
||||
ok;
|
||||
_ ->
|
||||
receive after 100 -> ok end,
|
||||
try_until(Result, F, N-1)
|
||||
end;
|
||||
try_until(Result, F, _) ->
|
||||
error({badmatch, {Result, F}}).
|
||||
|
||||
%
|
||||
%% For testing purposes, we use side-effects inside the transactions
|
||||
@@ -383,7 +497,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
%% attempt, and ignore the sync ops on retries.
|
||||
%%
|
||||
-define(IF_FIRST(N, Expr),
|
||||
if N == 1 ->
|
||||
if N == 0 ->
|
||||
Expr;
|
||||
true ->
|
||||
ok
|
||||
@@ -413,8 +527,8 @@ mrdb_two_procs_snap(Config) ->
|
||||
go_ahead_other(Att, POther),
|
||||
ARes = mrdb:read(R, a),
|
||||
ARes = case Att of
|
||||
1 -> Pre;
|
||||
2 -> [{R, a, 17}]
|
||||
0 -> Pre;
|
||||
_ -> [{R, a, 17}]
|
||||
end,
|
||||
await_other_down(POther, MRef, ?LINE),
|
||||
PreB = mrdb:read(R, b),
|
||||
@@ -434,7 +548,7 @@ mrdb_two_procs_snap(Config) ->
|
||||
%% We make sure that P2 commits before finishing the other two, and P3 and the
|
||||
%% main thread sync, so as to maximize the contention for the retry lock.
|
||||
mrdb_three_procs(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
|
||||
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
|
||||
|
||||
mrdb_three_procs_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
@@ -452,7 +566,7 @@ mrdb_three_procs_(Config) ->
|
||||
spawn_opt(fun() ->
|
||||
D0 = get_dict(),
|
||||
do_when_p_allows(
|
||||
1, Parent, ?LINE,
|
||||
0, Parent, ?LINE,
|
||||
fun() ->
|
||||
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
|
||||
end),
|
||||
@@ -488,8 +602,8 @@ mrdb_three_procs_(Config) ->
|
||||
fun() ->
|
||||
Att = get_attempt(),
|
||||
ARes = case Att of
|
||||
1 -> [A0];
|
||||
2 -> [A1]
|
||||
0 -> [A0];
|
||||
_ -> [A1]
|
||||
end,
|
||||
%% First, ensure that P2 tx is running
|
||||
go_ahead_other(Att, P2),
|
||||
@@ -519,6 +633,7 @@ tr_opts() ->
|
||||
, {?MODULE, await_other_down, 3, x}
|
||||
, {?MODULE, do_when_p_allows, 4, x}
|
||||
, {?MODULE, allow_p, 3, x}
|
||||
, {?MODULE, check_stats, 1, x}
|
||||
]}.
|
||||
|
||||
light_tr_opts() ->
|
||||
@@ -529,6 +644,23 @@ light_tr_opts() ->
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, activity, x} ], tr_opts())).
|
||||
|
||||
dbg_tr_opts() ->
|
||||
tr_flags(
|
||||
{self(), [call, sos, p, 'receive']},
|
||||
tr_patterns(
|
||||
mrdb, [ {mrdb, insert, 2, x}
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, retry_activity, 3, x}
|
||||
, {mrdb, try_f, 2, x}
|
||||
, {mrdb, incr_attempt, 2, x}
|
||||
, {mrdb, current_context, 0, x}
|
||||
, {mrdb_mutex, do, 2, x}
|
||||
, {mrdb_mutex_serializer, do, 2, x}
|
||||
, {?MODULE, wait_for_other, 2, x}
|
||||
, {?MODULE, go_ahead_other, 1, x}
|
||||
, {?MODULE, try_until, 3, x}
|
||||
, {mrdb, activity, x} ], tr_opts())).
|
||||
|
||||
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
|
||||
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
|
||||
Opts#{patterns => Ps ++ Pats1}.
|
||||
@@ -542,12 +674,13 @@ wait_for_other(Parent, L) ->
|
||||
wait_for_other(Att, Parent, L) ->
|
||||
wait_for_other(Att, Parent, 1000, L).
|
||||
|
||||
wait_for_other(1, Parent, Timeout, L) ->
|
||||
wait_for_other(0, Parent, Timeout, L) ->
|
||||
MRef = monitor(process, Parent),
|
||||
Parent ! {self(), ready},
|
||||
receive
|
||||
{Parent, cont} ->
|
||||
demonitor(MRef),
|
||||
Parent ! {self(), cont_ack},
|
||||
ok;
|
||||
{'DOWN', MRef, _, _, Reason} ->
|
||||
ct:log("Parent died, Reason = ~p", [Reason]),
|
||||
@@ -586,7 +719,13 @@ go_ahead_other(Att, POther, Timeout) ->
|
||||
go_ahead_other_(POther, Timeout) ->
|
||||
receive
|
||||
{POther, ready} ->
|
||||
POther ! {self(), cont}
|
||||
POther ! {self(), cont},
|
||||
receive
|
||||
{POther, cont_ack} ->
|
||||
ok
|
||||
after Timeout ->
|
||||
error(cont_ack_timeout)
|
||||
end
|
||||
after Timeout ->
|
||||
error(go_ahead_timeout)
|
||||
end.
|
||||
@@ -619,6 +758,35 @@ get_attempt() ->
|
||||
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
|
||||
Attempt.
|
||||
|
||||
create_counters(_Config) ->
|
||||
create_tab(counters, []),
|
||||
mrdb:insert(counters, {counters, c0, 1}),
|
||||
mrdb:update_counter(counters, c1, 1),
|
||||
[{counters, c0, 1}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, 1}] = mrdb:read(counters, c1),
|
||||
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
|
||||
ok.
|
||||
|
||||
restart_node(_Config) ->
|
||||
mnesia:stop(),
|
||||
ok = mnesia:start(),
|
||||
ct:log("mnesia restarted", []),
|
||||
ok.
|
||||
|
||||
update_counters(_Config) ->
|
||||
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
|
||||
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
|
||||
ok = mrdb:update_counter(counters, c0, 1),
|
||||
ok = mrdb:update_counter(counters, c1, 1),
|
||||
ct:log("Incremented c0 and c1 by 1", []),
|
||||
C0 = C0Prev + 1,
|
||||
C1 = C1Prev + 1,
|
||||
[{counters, c0, C0}] = mrdb:read(counters, c0),
|
||||
[{counters, c1, C1}] = mrdb:read(counters, c1),
|
||||
ct:log("c0: ~p, c1: ~p", [C0, C1]),
|
||||
ok.
|
||||
|
||||
create_tabs(Tabs, Config) ->
|
||||
Res = lists:map(fun create_tab/1, Tabs),
|
||||
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
|
||||
@@ -645,3 +813,8 @@ dictionary_unchanged(Old) ->
|
||||
, added := [] } = #{ deleted => Old -- New
|
||||
, added => New -- Old },
|
||||
ok.
|
||||
|
||||
check_stats(Alias) ->
|
||||
Stats = mrdb_stats:get(Alias),
|
||||
ct:log("Stats: ~p", [Stats]),
|
||||
Stats.
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
|
||||
-define(FAIL_MSG(Str, Args), ?ERROR(Str, Args), basho_bench_app:halt_or_kill()).
|
||||
-define(STD_ERR(Str, Args), io:format(standard_error, Str, Args)).
|
||||
|
||||
-define(CONSOLE(Str, Args), lager:info(Str, Args)).
|
||||
|
||||
-define(DEBUG(Str, Args), lager:debug(Str, Args)).
|
||||
-define(INFO(Str, Args), lager:info(Str, Args)).
|
||||
-define(WARN(Str, Args), lager:warning(Str, Args)).
|
||||
-define(ERROR(Str, Args), lager:error(Str, Args)).
|
||||
|
||||
-define(FMT(Str, Args), lists:flatten(io_lib:format(Str, Args))).
|
||||
|
||||
-define(VAL_GEN_BLOB_CFG, value_generator_blob_file).
|
||||
-define(VAL_GEN_SRC_SIZE, value_generator_source_size).
|
||||
@@ -1,18 +0,0 @@
|
||||
{mode, max}.
|
||||
|
||||
{duration, 10}.
|
||||
|
||||
{concurrent, 1}.
|
||||
|
||||
{driver, basho_bench_driver_mnesia_rocksdb}.
|
||||
|
||||
{key_generator, {int_to_bin,{uniform_int, 5000000}}}.
|
||||
|
||||
{value_generator, {fixed_bin, 10000}}.
|
||||
|
||||
{operations, [{get, 2}, {put, 2}, {delete, 1}]}.
|
||||
|
||||
{code_paths, []}.
|
||||
|
||||
{mnesia_table, doc}.
|
||||
{backend, disc_only_copies}.
|
||||
@@ -1,19 +0,0 @@
|
||||
{mode, max}.
|
||||
|
||||
{duration, 10}.
|
||||
|
||||
{concurrent, 1}.
|
||||
|
||||
{driver, basho_bench_driver_mnesia_rocksdb}.
|
||||
|
||||
{key_generator, {int_to_bin,{uniform_int, 5000000}}}.
|
||||
|
||||
{value_generator, {fixed_bin, 10000}}.
|
||||
|
||||
{operations, [{get, 2}, {put, 2}, {delete, 1}]}.
|
||||
|
||||
{code_paths, ["/Users/uwiger/git/rocksdb",
|
||||
"/Users/uwiger/git/mnesia_rocksdb"]}.
|
||||
|
||||
{mnesia_table, rdb}.
|
||||
{backend, rocksdb_copies}.
|
||||
@@ -22,6 +22,7 @@
|
||||
|
||||
-record(t, {k, i, v}).
|
||||
|
||||
-dialyzer({nowarn_function, run/1}).
|
||||
run(Sz) ->
|
||||
mnesia:stop(),
|
||||
init(),
|
||||
|
||||
@@ -79,20 +79,20 @@ cleanup() ->
|
||||
os:cmd("rm *.BUP").
|
||||
|
||||
mods(0) ->
|
||||
[];
|
||||
mods(1) ->
|
||||
[
|
||||
{l, mnesia_rocksdb},
|
||||
{g, rocksdb}
|
||||
];
|
||||
mods(2) ->
|
||||
[
|
||||
%% {l, mnesia_monitor},
|
||||
{g, mnesia_rocksdb},
|
||||
{l, mnesia_bup},
|
||||
{g, mnesia_lib},
|
||||
{g, mnesia_schema},
|
||||
%% {g, mnesia_loader},
|
||||
{g, mnesia_index},
|
||||
{l, mnesia_tm}
|
||||
].
|
||||
[].
|
||||
%% mods(1) ->
|
||||
%% [
|
||||
%% {l, mnesia_rocksdb},
|
||||
%% {g, rocksdb}
|
||||
%% ];
|
||||
%% mods(2) ->
|
||||
%% [
|
||||
%% %% {l, mnesia_monitor},
|
||||
%% {g, mnesia_rocksdb},
|
||||
%% {l, mnesia_bup},
|
||||
%% {g, mnesia_lib},
|
||||
%% {g, mnesia_schema},
|
||||
%% %% {g, mnesia_loader},
|
||||
%% {g, mnesia_index},
|
||||
%% {l, mnesia_tm}
|
||||
%% ].
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
-export([
|
||||
index_plugin_mgmt/1
|
||||
, add_indexes/1
|
||||
, delete_indexes/1
|
||||
, create_bag_index/1
|
||||
, create_ordered_index/1
|
||||
, test_1_ram_copies/1
|
||||
@@ -40,10 +41,12 @@
|
||||
, fail_1_disc_only/1
|
||||
, plugin_ram_copies1/1
|
||||
, plugin_ram_copies2/1
|
||||
, plugin_ram_copies3/1
|
||||
, plugin_disc_copies/1
|
||||
, fail_plugin_disc_only/1
|
||||
, plugin_disc_copies_bag/1
|
||||
, plugin_rdb_ordered/1
|
||||
, plugin_rdb_none/1
|
||||
, index_iterator/1
|
||||
]).
|
||||
|
||||
@@ -73,10 +76,12 @@ run(Config) ->
|
||||
{pfx},mnesia_rocksdb, ix_prefixes),
|
||||
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
|
||||
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
|
||||
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
|
||||
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
|
||||
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
|
||||
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
|
||||
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
|
||||
test_index_plugin(cfg([pl3, rdb, none], Config)),
|
||||
index_plugin_mgmt(Config),
|
||||
ok.
|
||||
|
||||
@@ -94,6 +99,7 @@ groups() ->
|
||||
, create_ordered_index
|
||||
, index_plugin_mgmt
|
||||
, add_indexes
|
||||
, delete_indexes
|
||||
]}
|
||||
, {access, [sequence], [
|
||||
test_1_ram_copies
|
||||
@@ -104,10 +110,12 @@ groups() ->
|
||||
, {plugin, [sequence], [
|
||||
plugin_ram_copies1
|
||||
, plugin_ram_copies2
|
||||
, plugin_ram_copies3
|
||||
, plugin_disc_copies
|
||||
, fail_plugin_disc_only
|
||||
, plugin_disc_copies_bag
|
||||
, plugin_rdb_ordered
|
||||
, plugin_rdb_none
|
||||
]}
|
||||
].
|
||||
|
||||
@@ -144,8 +152,8 @@ end_per_testcase(_, _) ->
|
||||
%% ======================================================================
|
||||
|
||||
cfg([Tab, Type, IxType], Config) ->
|
||||
[{my_config, #{tab => Tab, type => Type, ixtype => IxType}} | Config];
|
||||
cfg(Cfg, Config) when is_map(Cfg) -> [{my_config, Cfg} | Config].
|
||||
[{my_config, #{tab => Tab, type => Type, ixtype => IxType}} | Config].
|
||||
%% cfg(Cfg, Config) when is_map(Cfg) -> [{my_config, Cfg} | Config].
|
||||
|
||||
cfg(Config) -> ?config(my_config, Config).
|
||||
|
||||
@@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
|
||||
|
||||
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
|
||||
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
|
||||
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
|
||||
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
|
||||
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
|
||||
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
|
||||
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
|
||||
|
||||
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
|
||||
|
||||
test(N, Type, T) ->
|
||||
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
|
||||
{attributes,[k,a,b,c]},
|
||||
@@ -204,7 +215,7 @@ add_del_indexes() ->
|
||||
test_index_plugin(Config) ->
|
||||
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
|
||||
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
|
||||
{index, [{{pfx}, IxType}]}]),
|
||||
{index, [ixtype(IxType)]}]),
|
||||
mnesia:dirty_write({Tab, "foobar", "sentence"}),
|
||||
mnesia:dirty_write({Tab, "yellow", "sensor"}),
|
||||
mnesia:dirty_write({Tab, "truth", "white"}),
|
||||
@@ -216,13 +227,40 @@ test_index_plugin(Config) ->
|
||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
IxType == ordered ->
|
||||
IxType == ordered; IxType == none ->
|
||||
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||
Tab, <<"foo">>, {pfx})
|
||||
end,
|
||||
if Type == rdb ->
|
||||
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
|
||||
ok = test_select(Tab,{pfx},[{'_', [], ['$_']}]),
|
||||
MS2 = [{{<<"whi">>,{Tab,"truth",'_'}},[],['$_']}],
|
||||
[_] = mrdb_index:select(Tab, {pfx}, MS2),
|
||||
ok = test_select(Tab, {pfx}, MS2),
|
||||
[{Tab,"foobar","sentence"}] = mrdb:index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
test_select(Tab, Ix, MS) ->
|
||||
Res = mrdb_index:select(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, Res]),
|
||||
RevRes = mrdb_index:select_reverse(Tab, Ix, MS),
|
||||
ct:log("mrdb_index:select_reverse(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, RevRes]),
|
||||
{Res,Res} = {Res, lists:reverse(RevRes)},
|
||||
ok.
|
||||
|
||||
|
||||
ixtype(T) when T==bag;
|
||||
T==ordered ->
|
||||
{{pfx}, T};
|
||||
ixtype(none) ->
|
||||
{pfx}.
|
||||
|
||||
create_bag_index(_Config) ->
|
||||
{aborted, {combine_error, _, _}} =
|
||||
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
|
||||
@@ -239,6 +277,25 @@ add_indexes(_Config) ->
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
ok.
|
||||
|
||||
delete_indexes(_Config) ->
|
||||
T = ?TAB(t1),
|
||||
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
|
||||
ok = mrdb:insert(T, {T, a, 1, 1}),
|
||||
ok = mrdb:insert(T, {T, b, 1, 2}),
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
|
||||
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||
{atomic, ok} = mnesia:del_table_index(T, a),
|
||||
ok = mrdb:delete(T, b),
|
||||
ok = mrdb:insert(T, {T, c, 2, 3}),
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
|
||||
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||
ok.
|
||||
|
||||
ix_fold_acc(K, V, Acc) ->
|
||||
[{K, V} | Acc].
|
||||
|
||||
index_plugin_mgmt(_Config) ->
|
||||
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
|
||||
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
|
||||
|
||||
@@ -48,14 +48,18 @@
|
||||
-record(st, {}).
|
||||
-define(KEYS, [a,b,c]).
|
||||
|
||||
-dialyzer({no_return, [basic_test_/0, test/1, prop_seq/0]}).
|
||||
|
||||
basic_test_() ->
|
||||
{timeout, 60000, [fun() -> test(100) end]}.
|
||||
|
||||
-dialyzer({no_opaque, test/1}).
|
||||
test(N) ->
|
||||
setup_mnesia(),
|
||||
true = proper:quickcheck(?MODULE:prop_seq(), N),
|
||||
ok.
|
||||
|
||||
-dialyzer({no_opaque, prop_seq/0}).
|
||||
prop_seq() ->
|
||||
?FORALL(Cmds, proper_statem:commands(?MODULE),
|
||||
begin
|
||||
|
||||
@@ -260,10 +260,10 @@ plain_transform1(Fun, [F|Fs]) when is_atom(element(1,F)) ->
|
||||
continue ->
|
||||
[list_to_tuple(plain_transform1(Fun, tuple_to_list(F))) |
|
||||
plain_transform1(Fun, Fs)];
|
||||
{done, NewF} ->
|
||||
[NewF | Fs];
|
||||
{error, Reason} ->
|
||||
io:format("Error: ~p (~p)~n", [F,Reason]);
|
||||
%% {done, NewF} ->
|
||||
%% [NewF | Fs];
|
||||
%% {error, Reason} ->
|
||||
%% io:format("Error: ~p (~p)~n", [F,Reason]);
|
||||
NewF when is_tuple(NewF) ->
|
||||
[NewF | plain_transform1(Fun, Fs)]
|
||||
end;
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
-module(mrdb_bench).
|
||||
|
||||
-compile(export_all).
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
||||
init() ->
|
||||
mnesia:delete_schema([node()]),
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
-module(mrdb_fold_SUITE).
|
||||
|
||||
-export([
|
||||
all/0
|
||||
, groups/0
|
||||
, suite/0
|
||||
, init_per_suite/1
|
||||
, end_per_suite/1
|
||||
, init_per_group/2
|
||||
, end_per_group/2
|
||||
, init_per_testcase/2
|
||||
, end_per_testcase/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
no_prefix_fwd_rdb_fold/1
|
||||
, prefixed_fwd_rdb_fold/1
|
||||
, no_prefix_rev_rdb_fold/1
|
||||
, prefixed_rev_rdb_fold/1
|
||||
, prefixed_rev_rdb_fold_max/1
|
||||
, fwd_fold/1
|
||||
, filtered_fwd_fold/1
|
||||
, rev_fold/1
|
||||
, filtered_rev_fold/1
|
||||
, fwd_select/1
|
||||
, rev_select/1
|
||||
, select_cont/1
|
||||
, rev_select_cont/1
|
||||
]).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-record(r, {k, v}).
|
||||
|
||||
suite() ->
|
||||
[].
|
||||
|
||||
all() ->
|
||||
[{group, all_tests}].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{all_tests, [sequence], [ {group, rdb_fold}
|
||||
, {group, fold}
|
||||
, {group, select} ]}
|
||||
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
|
||||
, prefixed_fwd_rdb_fold
|
||||
, no_prefix_rev_rdb_fold
|
||||
, prefixed_rev_rdb_fold
|
||||
, prefixed_rev_rdb_fold_max ]}
|
||||
, {fold, [sequence], [ fwd_fold
|
||||
, filtered_fwd_fold
|
||||
, rev_fold
|
||||
, filtered_rev_fold ]}
|
||||
, {select, [sequence], [ fwd_select
|
||||
, rev_select
|
||||
, select_cont
|
||||
, rev_select_cont ]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
mnesia:stop(),
|
||||
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
mnesia:stop(),
|
||||
ok.
|
||||
|
||||
init_per_group(G, Config) when G==rdb_fold; G==fold ->
|
||||
mk_tab(G),
|
||||
Config;
|
||||
init_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(_, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
mk_tab(G) ->
|
||||
T = tab_name(G),
|
||||
case create_tab(T) of
|
||||
{atomic, ok} -> fill_tab(T);
|
||||
{aborted,{already_exists,T}} -> ok
|
||||
end.
|
||||
|
||||
tab_name(fold ) -> r;
|
||||
%% tab_name(select ) -> r;
|
||||
tab_name(rdb_fold) -> t.
|
||||
|
||||
create_tab(T) ->
|
||||
Opts = tab_opts(T),
|
||||
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
|
||||
|
||||
tab_opts(r) ->
|
||||
[{attributes, [k, v]}, {type, ordered_set}];
|
||||
tab_opts(t) ->
|
||||
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
|
||||
|
||||
no_prefix_fwd_rdb_fold(_Config) ->
|
||||
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
|
||||
Expected = lists:reverse(raw_objs()),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
fwd_fold(_Config) ->
|
||||
Res = mrdb:fold(r, fun simple_acc/2, []),
|
||||
Expected = lists:reverse(objs()),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_fwd_rdb_fold(_Config) ->
|
||||
Pfx = <<"aa">>,
|
||||
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
filtered_fwd_fold(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
no_prefix_rev_rdb_fold(_Config) ->
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
|
||||
Expected = raw_objs(),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
rev_fold(_Config) ->
|
||||
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
|
||||
Expected = objs(),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_rev_rdb_fold(_Config) ->
|
||||
Pfx = <<"aa">>,
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = prefixed_raw_objs(Pfx),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
filtered_rev_fold(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
|
||||
Expected = filtered_objs(MS),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
prefixed_rev_rdb_fold_max(_Config) ->
|
||||
Pfx = <<255,255>>,
|
||||
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
|
||||
Expected = prefixed_raw_objs(Pfx),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
fwd_select(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:select(r, MS),
|
||||
Expected = filtered_objs(MS),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
rev_select(_Config) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Res = mrdb:select_reverse(r, MS),
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
{Res, Res} = {Res, Expected}.
|
||||
|
||||
select_cont(_Cont) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Expected = filtered_objs(MS),
|
||||
select_cont_loop(mrdb:select(r, MS, 1), Expected).
|
||||
|
||||
rev_select_cont(_Cont) ->
|
||||
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
|
||||
Expected = lists:reverse(filtered_objs(MS)),
|
||||
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
|
||||
|
||||
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
|
||||
select_cont_loop(mrdb:select(Cont), Rest);
|
||||
select_cont_loop({[], '$end_of_table'}, []) ->
|
||||
ok.
|
||||
|
||||
simple_acc(#r{} = Obj, Acc) ->
|
||||
[Obj | Acc].
|
||||
|
||||
simple_rdb_acc(K, V, Acc) ->
|
||||
[{K,V} | Acc].
|
||||
|
||||
fill_tab(t = Tab) ->
|
||||
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
|
||||
ok;
|
||||
fill_tab(r = Tab) ->
|
||||
[mrdb:insert(Tab, Obj) || Obj <- objs()],
|
||||
ok.
|
||||
|
||||
prefixed_raw_objs(Pfx) ->
|
||||
[Obj || {K,_} = Obj <- raw_objs(),
|
||||
is_prefix(Pfx, K)].
|
||||
|
||||
raw_objs() ->
|
||||
[ {<<"aa">> , <<"1">>}
|
||||
, {<<"aa1">>, <<"2">>}
|
||||
, {<<"ab">> , <<"3">>}
|
||||
, {<<"ab1">>, <<"4">>}
|
||||
, {<<255,255>>, <<"5">>} ].
|
||||
|
||||
filtered_objs(MS) ->
|
||||
MSC = ets:match_spec_compile(MS),
|
||||
ets:match_spec_run(objs(), MSC).
|
||||
|
||||
objs() ->
|
||||
[ #r{k = {a,a,1}, v = 1}
|
||||
, #r{k = {a,b,2}, v = 2}
|
||||
, #r{k = {a,b,3}, v = 3}
|
||||
, #r{k = {a,c,4}, v = 4}
|
||||
, #r{k = {b,b,5}, v = 5}
|
||||
].
|
||||
|
||||
%% copied from mrdb_select.erl
|
||||
is_prefix(A, B) when is_binary(A), is_binary(B) ->
|
||||
Sa = byte_size(A),
|
||||
case B of
|
||||
<<A:Sa/binary, _/binary>> ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
Reference in New Issue
Block a user