Compare commits
57 Commits
Author | SHA1 | Date | |
---|---|---|---|
318f84bbaf | |||
![]() |
0691d8b055 | ||
ac69c8564f | |||
![]() |
2ca7f36eb1 | ||
![]() |
56d78768d9 | ||
a768d8008f | |||
![]() |
9cea41cb9a | ||
![]() |
d42055c7ac | ||
![]() |
0ae7ecd41d | ||
![]() |
ece9db2b09 | ||
![]() |
0e4382d5f7 | ||
![]() |
d9d82d7ead | ||
![]() |
34285da6d8 | ||
![]() |
cb123ee28a | ||
![]() |
996ae82717 | ||
![]() |
d6b86524fd | ||
![]() |
296813ef7b | ||
![]() |
16d0533acf | ||
![]() |
1d6ae82c6d | ||
![]() |
71ebaf2d66 | ||
![]() |
f2b6116d31 | ||
![]() |
33ee7929d4 | ||
![]() |
78669bb8bf | ||
![]() |
346decee6e | ||
![]() |
4da0fce567 | ||
![]() |
e7d42f9500 | ||
![]() |
699a7d5920 | ||
![]() |
57f02078bc | ||
![]() |
791cec41db | ||
![]() |
6eff62df6d | ||
![]() |
2e430fc5eb | ||
![]() |
e76a01f8c4 | ||
![]() |
4d0a78612a | ||
![]() |
eeb6aff242 | ||
![]() |
f11e16e29f | ||
![]() |
0aecf5ef01 | ||
![]() |
465a220bfe | ||
![]() |
19140c738b | ||
![]() |
bed66b2998 | ||
![]() |
3635eac717 | ||
![]() |
95abe4e36e | ||
![]() |
7c729bd932 | ||
![]() |
4489e5d743 | ||
![]() |
d1a6bf22d5 | ||
![]() |
b65e82ed71 | ||
![]() |
ee9e7eac67 | ||
![]() |
ce2be519b4 | ||
![]() |
8073a0daa5 | ||
![]() |
ab15b7f399 | ||
![]() |
a75f6e0c43 | ||
![]() |
1340bb2050 | ||
![]() |
d1177b6ad4 | ||
![]() |
b908998e6b | ||
![]() |
dfc0125800 | ||
![]() |
296abb23bb | ||
![]() |
7057f4dcbd | ||
![]() |
c4235be94a |
@ -2,15 +2,35 @@ version: 2.1
|
|||||||
|
|
||||||
executors:
|
executors:
|
||||||
aebuilder:
|
aebuilder:
|
||||||
|
parameters:
|
||||||
|
otp:
|
||||||
|
type: string
|
||||||
docker:
|
docker:
|
||||||
- image: aeternity/builder
|
- image: aeternity/builder:focal-<< parameters.otp >>
|
||||||
user: builder
|
user: builder
|
||||||
environment:
|
environment:
|
||||||
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
|
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
|
||||||
|
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
|
||||||
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build_and_test:
|
||||||
executor: aebuilder
|
parameters:
|
||||||
|
otp:
|
||||||
|
type: string
|
||||||
|
executor:
|
||||||
|
name: aebuilder
|
||||||
|
otp: << parameters.otp >>
|
||||||
steps:
|
steps:
|
||||||
- checkout
|
- checkout
|
||||||
- run: make test
|
- run: make test
|
||||||
|
- store_artifacts:
|
||||||
|
path: _build/test/logs
|
||||||
|
|
||||||
|
workflows:
|
||||||
|
commit:
|
||||||
|
jobs:
|
||||||
|
- build_and_test:
|
||||||
|
matrix:
|
||||||
|
parameters:
|
||||||
|
otp: ["otp24", "otp25", "otp26"]
|
||||||
|
12
README.md
12
README.md
@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
|
|||||||
it yourself. If you only need the size occasionally, you may traverse the
|
it yourself. If you only need the size occasionally, you may traverse the
|
||||||
table to count the elements.
|
table to count the elements.
|
||||||
|
|
||||||
|
When `mrdb` transactions abort, they will return a stacktrace caught
|
||||||
|
from within the transaction fun, giving much better debugging info.
|
||||||
|
This is different from how mnesia does it.
|
||||||
|
|
||||||
|
If behavior closer to mnesia's abort returns are needed, say, for backwards
|
||||||
|
compatibility, this can be controlled by setting the environment variable
|
||||||
|
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
|
||||||
|
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
|
||||||
|
For really performance-critical transactions which may abort often, it might
|
||||||
|
make a difference to set this option to `true`, since there is a cost involved
|
||||||
|
in producing stacktraces.
|
||||||
|
|
||||||
|
|
||||||
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
||||||
|
|
||||||
|
@ -2,4 +2,5 @@
|
|||||||
{application,mnesia_rocksdb}.
|
{application,mnesia_rocksdb}.
|
||||||
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
|
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
|
||||||
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
|
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
|
||||||
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
|
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
|
||||||
|
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
|
||||||
|
@ -4,8 +4,8 @@
|
|||||||
{deps,
|
{deps,
|
||||||
[
|
[
|
||||||
{sext, "1.8.0"},
|
{sext, "1.8.0"},
|
||||||
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
|
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||||
{hut, "1.3.0"}
|
{hut, "1.4.0"}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{xref_checks, [
|
{xref_checks, [
|
||||||
@ -14,6 +14,10 @@
|
|||||||
deprecated_function_calls
|
deprecated_function_calls
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{dialyzer, [{plt_apps, all_deps},
|
||||||
|
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
|
||||||
|
]}.
|
||||||
|
|
||||||
{profiles,
|
{profiles,
|
||||||
[
|
[
|
||||||
{test,
|
{test,
|
||||||
|
@ -1,11 +1,4 @@
|
|||||||
%% -*- erlang-mode -*-
|
%% -*- erlang-mode -*-
|
||||||
case os:getenv("ERLANG_ROCKSDB_OPTS") of
|
|
||||||
false ->
|
|
||||||
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
|
|
||||||
_ ->
|
|
||||||
%% If manually set, we assume it's throught through
|
|
||||||
skip
|
|
||||||
end.
|
|
||||||
case os:getenv("DEBUG") of
|
case os:getenv("DEBUG") of
|
||||||
"true" ->
|
"true" ->
|
||||||
Opts = proplists:get_value(erl_opts, CONFIG, []),
|
Opts = proplists:get_value(erl_opts, CONFIG, []),
|
||||||
|
10
rebar.lock
10
rebar.lock
@ -1,15 +1,15 @@
|
|||||||
{"1.2.0",
|
{"1.2.0",
|
||||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
|
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||||
{<<"rocksdb">>,
|
{<<"rocksdb">>,
|
||||||
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
|
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||||
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
|
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||||
0},
|
0},
|
||||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||||
[
|
[
|
||||||
{pkg_hash,[
|
{pkg_hash,[
|
||||||
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
|
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
|
||||||
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
|
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
|
||||||
{pkg_hash_ext,[
|
{pkg_hash_ext,[
|
||||||
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
|
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
|
||||||
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
|
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
|
||||||
].
|
].
|
||||||
|
@ -346,14 +346,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
|
|||||||
semantics(_Alias, _) -> undefined.
|
semantics(_Alias, _) -> undefined.
|
||||||
|
|
||||||
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
|
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
|
||||||
case info(Alias, Tab, {index_consistent, PosInfo}) of
|
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
|
||||||
true -> true;
|
|
||||||
_ -> false
|
|
||||||
end.
|
|
||||||
|
|
||||||
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
|
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
|
||||||
when is_boolean(Bool) ->
|
when is_boolean(Bool) ->
|
||||||
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
|
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
|
||||||
|
|
||||||
|
|
||||||
%% PRIVATE FUN
|
%% PRIVATE FUN
|
||||||
@ -454,8 +451,13 @@ close_table(Alias, Tab) ->
|
|||||||
error ->
|
error ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
case get(mnesia_dumper_dets) of
|
||||||
close_table_(Alias, Tab)
|
undefined ->
|
||||||
|
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
||||||
|
close_table_(Alias, Tab);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
close_table_(Alias, Tab) ->
|
close_table_(Alias, Tab) ->
|
||||||
@ -798,7 +800,7 @@ handle_call({create_table, Tab, Props}, _From,
|
|||||||
exit:{aborted, Error} ->
|
exit:{aborted, Error} ->
|
||||||
{reply, {aborted, Error}, St}
|
{reply, {aborted, Error}, St}
|
||||||
end;
|
end;
|
||||||
handle_call({load_table, _LoadReason, Props}, _From,
|
handle_call({load_table, _LoadReason, Props}, _,
|
||||||
#st{alias = Alias, tab = Tab} = St) ->
|
#st{alias = Alias, tab = Tab} = St) ->
|
||||||
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
|
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
|
||||||
{reply, ok, St#st{status = active}};
|
{reply, ok, St#st{status = active}};
|
||||||
@ -826,7 +828,7 @@ handle_call({delete, Key}, _From, St) ->
|
|||||||
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
|
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
|
||||||
Res = mrdb:match_delete(get_ref(Tab), Pat),
|
Res = mrdb:match_delete(get_ref(Tab), Pat),
|
||||||
{reply, Res, St};
|
{reply, Res, St};
|
||||||
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
|
||||||
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
|
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
|
||||||
{reply, ok, St#st{status = undefined}};
|
{reply, ok, St#st{status = undefined}};
|
||||||
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
, read_info/2 %% (Alias, Tab)
|
, read_info/2 %% (Alias, Tab)
|
||||||
, read_info/4 %% (Alias, Tab, Key, Default)
|
, read_info/4 %% (Alias, Tab, Key, Default)
|
||||||
, write_info/4 %% (Alias, Tab, Key, Value)
|
, write_info/4 %% (Alias, Tab, Key, Value)
|
||||||
|
, delete_info/3 %% (Alias, Tab, Key)
|
||||||
, write_table_property/3 %% (Alias, Tab, Property)
|
, write_table_property/3 %% (Alias, Tab, Property)
|
||||||
]).
|
]).
|
||||||
|
|
||||||
@ -280,9 +281,20 @@ write_info(Alias, Tab, K, V) ->
|
|||||||
write_info_(get_ref({admin, Alias}), Tab, K, V).
|
write_info_(get_ref({admin, Alias}), Tab, K, V).
|
||||||
|
|
||||||
write_info_(Ref, Tab, K, V) ->
|
write_info_(Ref, Tab, K, V) ->
|
||||||
|
write_info_encv(Ref, Tab, K, term_to_binary(V)).
|
||||||
|
|
||||||
|
write_info_encv(Ref, Tab, K, V) ->
|
||||||
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
||||||
maybe_write_standalone_info(Ref, K, V),
|
maybe_write_standalone_info(Ref, K, V),
|
||||||
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
|
mrdb:rdb_put(Ref, EncK, V, []).
|
||||||
|
|
||||||
|
delete_info(Alias, Tab, K) ->
|
||||||
|
delete_info_(get_ref({admin, Alias}), Tab, K).
|
||||||
|
|
||||||
|
delete_info_(Ref, Tab, K) ->
|
||||||
|
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
|
||||||
|
maybe_delete_standalone_info(Ref, K),
|
||||||
|
mrdb:rdb_delete(Ref, EncK, []).
|
||||||
|
|
||||||
maybe_write_standalone_info(Ref, K, V) ->
|
maybe_write_standalone_info(Ref, K, V) ->
|
||||||
case Ref of
|
case Ref of
|
||||||
@ -295,6 +307,16 @@ maybe_write_standalone_info(Ref, K, V) ->
|
|||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_delete_standalone_info(Ref, K) ->
|
||||||
|
case Ref of
|
||||||
|
#{type := standalone, vsn := 1, db_ref := DbRef} ->
|
||||||
|
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
|
||||||
|
Key = <<?INFO_TAG, EncK/binary>>,
|
||||||
|
rocksdb:delete(DbRef, Key, []);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
||||||
call(Alias, {write_table_property, Tab, Prop}).
|
call(Alias, {write_table_property, Tab, Prop}).
|
||||||
|
|
||||||
@ -323,7 +345,6 @@ call(Alias, Req, Timeout) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
mrdb_mutex:ensure_tab(),
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
@ -405,12 +426,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
|
|||||||
%% We need to store the persistent ref explicitly here,
|
%% We need to store the persistent ref explicitly here,
|
||||||
%% since mnesia knows nothing of our admin table.
|
%% since mnesia knows nothing of our admin table.
|
||||||
AdminTab = {admin, Alias},
|
AdminTab = {admin, Alias},
|
||||||
|
Stats = mrdb_stats:new(),
|
||||||
CfI = update_cf_info(AdminTab, #{ status => open
|
CfI = update_cf_info(AdminTab, #{ status => open
|
||||||
, name => AdminTab
|
, name => AdminTab
|
||||||
, vsn => ?VSN
|
, vsn => ?VSN
|
||||||
, encoding => {sext,{value,term}}
|
, encoding => {sext,{value,term}}
|
||||||
, attr_pos => #{key => 1,
|
, attr_pos => #{key => 1,
|
||||||
value => 2}
|
value => 2}
|
||||||
|
, stats => Stats
|
||||||
, mountpoint => MP
|
, mountpoint => MP
|
||||||
, properties =>
|
, properties =>
|
||||||
#{ attributes => [key, val]
|
#{ attributes => [key, val]
|
||||||
@ -510,12 +533,17 @@ intersection(A, B) ->
|
|||||||
|
|
||||||
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
|
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
|
||||||
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
|
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
|
||||||
case create_trec(Alias, Name, Props, Backend, St) of
|
case find_cf(Alias, Name, Backend, St) of
|
||||||
{ok, NewCf} ->
|
{ok, TRec} ->
|
||||||
St1 = update_cf(Alias, Name, NewCf, St),
|
{reply, {ok, TRec}, St};
|
||||||
{reply, {ok, NewCf}, St1};
|
error ->
|
||||||
{error, _} = Error ->
|
case create_trec(Alias, Name, Props, Backend, St) of
|
||||||
{reply, Error, St}
|
{ok, NewCf} ->
|
||||||
|
St1 = update_cf(Alias, Name, NewCf, St),
|
||||||
|
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
|
||||||
|
{error, _} = Error ->
|
||||||
|
{reply, Error, St}
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
|
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
|
||||||
try
|
try
|
||||||
@ -628,8 +656,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
|||||||
case {Op, lists:member(I, Index)} of
|
case {Op, lists:member(I, Index)} of
|
||||||
{delete, true} ->
|
{delete, true} ->
|
||||||
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
|
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
|
||||||
|
delete_info(Alias, Main, {index_consistent, I}),
|
||||||
maybe_update_pt(Main, CfM1),
|
maybe_update_pt(Main, CfM1),
|
||||||
update_cf(Alias, Main, CfM1, St);
|
update_cf(Alias, Main, CfM1, St);
|
||||||
|
{create, _} ->
|
||||||
|
%% Due to a previous bug, this marker might linger
|
||||||
|
%% In any case, it mustn't be there for a newly created index
|
||||||
|
delete_info(Alias, Main, {index_consistent, I}),
|
||||||
|
St;
|
||||||
_ ->
|
_ ->
|
||||||
St
|
St
|
||||||
end;
|
end;
|
||||||
@ -640,6 +674,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
|||||||
maybe_update_main(_, _, _, St) ->
|
maybe_update_main(_, _, _, St) ->
|
||||||
St.
|
St.
|
||||||
|
|
||||||
|
|
||||||
%% The pt may not have been created yet. If so, don't do it here.
|
%% The pt may not have been created yet. If so, don't do it here.
|
||||||
maybe_update_pt(Name, Ref) ->
|
maybe_update_pt(Name, Ref) ->
|
||||||
case get_pt(Name, error) of
|
case get_pt(Name, error) of
|
||||||
@ -1223,7 +1258,18 @@ load_info_(Res, I, ARef, Tab) ->
|
|||||||
DecK = mnesia_rocksdb_lib:decode_key(K),
|
DecK = mnesia_rocksdb_lib:decode_key(K),
|
||||||
case read_info_(ARef, Tab, DecK, undefined) of
|
case read_info_(ARef, Tab, DecK, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
write_info_(ARef, Tab, DecK, V);
|
write_info_encv(ARef, Tab, DecK, V);
|
||||||
|
<<131,_/binary>> = Value ->
|
||||||
|
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
|
||||||
|
try binary_to_term(Value) of
|
||||||
|
_DecVal ->
|
||||||
|
%% We haven't been storing erlang-term encoded data as info,
|
||||||
|
%% so assume this is double-encoded and correct
|
||||||
|
write_info_encv(ARef, Tab, DecK, Value)
|
||||||
|
catch
|
||||||
|
error:_ ->
|
||||||
|
skip
|
||||||
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
skip
|
skip
|
||||||
end,
|
end,
|
||||||
@ -1443,24 +1489,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
|
|||||||
%% not yet created
|
%% not yet created
|
||||||
CFs = cfs(CFs0),
|
CFs = cfs(CFs0),
|
||||||
file:make_dir(MP),
|
file:make_dir(MP),
|
||||||
OpenOpts = [ {create_if_missing, true}
|
OpenRes = rocksdb_open(MP, Opts, CFs),
|
||||||
, {create_missing_column_families, true}
|
|
||||||
, {merge_operator, erlang_merge_operator}
|
|
||||||
| Opts ],
|
|
||||||
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
|
|
||||||
map_cfs(OpenRes, CFs, Alias, Acc0);
|
map_cfs(OpenRes, CFs, Alias, Acc0);
|
||||||
false ->
|
false ->
|
||||||
{error, enoent};
|
{error, enoent};
|
||||||
true ->
|
true ->
|
||||||
%% Assumption: even an old rocksdb database file will have at least "default"
|
%% Assumption: even an old rocksdb database file will have at least "default"
|
||||||
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
|
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
|
||||||
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
|
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
|
||||||
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
open_opts(Opts) ->
|
||||||
|
[ {create_if_missing, true}
|
||||||
|
, {create_missing_column_families, true}
|
||||||
|
, {merge_operator, erlang_merge_operator}
|
||||||
|
| Opts ].
|
||||||
|
|
||||||
rocksdb_open(MP, Opts, CFs) ->
|
rocksdb_open(MP, Opts, CFs) ->
|
||||||
%% rocksdb:open(MP, Opts, CFs),
|
%% rocksdb:open(MP, Opts, CFs),
|
||||||
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
|
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
|
||||||
|
|
||||||
is_open(Alias, #st{backends = Bs}) ->
|
is_open(Alias, #st{backends = Bs}) ->
|
||||||
case maps:find(Alias, Bs) of
|
case maps:find(Alias, Bs) of
|
||||||
|
@ -42,4 +42,5 @@ start_link() ->
|
|||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
|
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
|
||||||
|
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
|
||||||
|
193
src/mrdb.erl
193
src/mrdb.erl
@ -53,6 +53,7 @@
|
|||||||
, delete/2 , delete/3
|
, delete/2 , delete/3
|
||||||
, delete_object/2, delete_object/3
|
, delete_object/2, delete_object/3
|
||||||
, match_delete/2
|
, match_delete/2
|
||||||
|
, merge/3 , merge/4
|
||||||
, clear_table/1
|
, clear_table/1
|
||||||
, batch_write/2 , batch_write/3
|
, batch_write/2 , batch_write/3
|
||||||
, update_counter/3, update_counter/4
|
, update_counter/3, update_counter/4
|
||||||
@ -105,6 +106,8 @@
|
|||||||
-include("mnesia_rocksdb.hrl").
|
-include("mnesia_rocksdb.hrl").
|
||||||
-include("mnesia_rocksdb_int.hrl").
|
-include("mnesia_rocksdb_int.hrl").
|
||||||
|
|
||||||
|
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
|
||||||
|
|
||||||
-type tab_name() :: atom().
|
-type tab_name() :: atom().
|
||||||
-type alias() :: atom().
|
-type alias() :: atom().
|
||||||
-type admin_tab() :: {admin, alias()}.
|
-type admin_tab() :: {admin, alias()}.
|
||||||
@ -115,7 +118,9 @@
|
|||||||
| index()
|
| index()
|
||||||
| retainer().
|
| retainer().
|
||||||
|
|
||||||
-type retries() :: non_neg_integer().
|
-type inner() :: non_neg_integer().
|
||||||
|
-type outer() :: non_neg_integer().
|
||||||
|
-type retries() :: outer() | {inner(), outer()}.
|
||||||
|
|
||||||
%% activity type 'ets' makes no sense in this context
|
%% activity type 'ets' makes no sense in this context
|
||||||
-type mnesia_activity_type() :: transaction
|
-type mnesia_activity_type() :: transaction
|
||||||
@ -141,7 +146,7 @@
|
|||||||
|
|
||||||
-type tx_activity() :: #{ type := 'tx'
|
-type tx_activity() :: #{ type := 'tx'
|
||||||
, handle := tx_handle()
|
, handle := tx_handle()
|
||||||
, attempt := non_neg_integer() }.
|
, attempt := 'undefined' | retries() }.
|
||||||
-type batch_activity() :: #{ type := 'batch'
|
-type batch_activity() :: #{ type := 'batch'
|
||||||
, handle := batch_handle() }.
|
, handle := batch_handle() }.
|
||||||
-type activity() :: tx_activity() | batch_activity().
|
-type activity() :: tx_activity() | batch_activity().
|
||||||
@ -246,7 +251,7 @@ release_snapshot(SHandle) ->
|
|||||||
%% <li> `{tx, TxOpts}' - A `rocksdb' transaction with sligth modifications</li>
|
%% <li> `{tx, TxOpts}' - A `rocksdb' transaction with sligth modifications</li>
|
||||||
%% <li> `batch' - A `rocksdb' batch operation</li>
|
%% <li> `batch' - A `rocksdb' batch operation</li>
|
||||||
%% </ul>
|
%% </ul>
|
||||||
%%
|
%%
|
||||||
%% By default, transactions are combined with a snapshot with 1 retry.
|
%% By default, transactions are combined with a snapshot with 1 retry.
|
||||||
%% The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
|
%% The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
|
||||||
%% A transaction will be retried if it detects that the commit set conflicts with recent changes.
|
%% A transaction will be retried if it detects that the commit set conflicts with recent changes.
|
||||||
@ -254,6 +259,14 @@ release_snapshot(SHandle) ->
|
|||||||
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
|
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
|
||||||
%% the commit set. It will then be re-run again, until the retry count is exhausted.
|
%% the commit set. It will then be re-run again, until the retry count is exhausted.
|
||||||
%%
|
%%
|
||||||
|
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
|
||||||
|
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
|
||||||
|
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
|
||||||
|
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
|
||||||
|
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
|
||||||
|
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
|
||||||
|
%% attempt, but only on outer retries (the first retry is always an outer retry).
|
||||||
|
%%
|
||||||
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
|
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
|
||||||
%%
|
%%
|
||||||
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
||||||
@ -269,41 +282,92 @@ activity(Type, Alias, F) ->
|
|||||||
#{ alias => Alias
|
#{ alias => Alias
|
||||||
, db_ref => DbRef }, TxCtxt);
|
, db_ref => DbRef }, TxCtxt);
|
||||||
batch ->
|
batch ->
|
||||||
Batch = get_batch_(DbRef),
|
Batch = init_batch_ref(DbRef),
|
||||||
#{ activity => #{ type => batch
|
#{ activity => #{ type => batch
|
||||||
, handle => Batch }
|
, handle => Batch }
|
||||||
, alias => Alias
|
, alias => Alias
|
||||||
, db_ref => DbRef }
|
, db_ref => DbRef }
|
||||||
end,
|
end,
|
||||||
do_activity(F, Alias, Ctxt, false).
|
do_activity(F, Alias, Ctxt).
|
||||||
|
|
||||||
do_activity(F, Alias, Ctxt, WithLock) ->
|
do_activity(F, Alias, Ctxt) ->
|
||||||
try run_f(F, Ctxt, WithLock, Alias) of
|
try try_f(F, Ctxt)
|
||||||
Res ->
|
|
||||||
try commit_and_pop(Res)
|
|
||||||
catch
|
|
||||||
throw:{?MODULE, busy} ->
|
|
||||||
do_activity(F, Alias, Ctxt, true)
|
|
||||||
end
|
|
||||||
catch
|
catch
|
||||||
|
throw:{?MODULE, busy} ->
|
||||||
|
retry_activity(F, Alias, Ctxt)
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_f(F, Ctxt) ->
|
||||||
|
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
|
||||||
|
|
||||||
|
try_f(false, F, Ctxt) ->
|
||||||
|
try run_f(F, Ctxt) of
|
||||||
|
Res ->
|
||||||
|
commit_and_pop(Res)
|
||||||
|
catch
|
||||||
|
throw:Something ->
|
||||||
|
abort_and_pop(throw, Something);
|
||||||
|
Cat:Err:T ->
|
||||||
|
%% Without capturing the stacktract here,
|
||||||
|
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||||
|
abort_and_pop(Cat, {Err, T})
|
||||||
|
end;
|
||||||
|
try_f(true, F, Ctxt) ->
|
||||||
|
try run_f(F, Ctxt) of
|
||||||
|
Res ->
|
||||||
|
commit_and_pop(Res)
|
||||||
|
catch
|
||||||
|
throw:Something ->
|
||||||
|
abort_and_pop(throw, Something);
|
||||||
Cat:Err ->
|
Cat:Err ->
|
||||||
|
%% Without capturing the stacktract here,
|
||||||
|
%% debugging gets pretty difficult
|
||||||
abort_and_pop(Cat, Err)
|
abort_and_pop(Cat, Err)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_f(F, Ctxt, false, _) ->
|
|
||||||
push_ctxt(Ctxt),
|
|
||||||
F();
|
|
||||||
run_f(F, Ctxt, true, Alias) ->
|
|
||||||
mrdb_mutex:do(
|
|
||||||
Alias,
|
|
||||||
fun() ->
|
|
||||||
push_ctxt(incr_attempt(Ctxt)),
|
|
||||||
F()
|
|
||||||
end).
|
|
||||||
|
|
||||||
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
|
run_f(F, Ctxt) ->
|
||||||
|
push_ctxt(Ctxt),
|
||||||
|
F().
|
||||||
|
|
||||||
|
incr_attempt(0, {_,O}) when O > 0 ->
|
||||||
|
{outer, {0,1}};
|
||||||
|
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
|
||||||
|
is_integer(Ri), is_integer(Ro) ->
|
||||||
|
if I < Ri -> {inner, {I+1, O}};
|
||||||
|
O < Ro -> {outer, {0, O+1}};
|
||||||
|
true ->
|
||||||
|
error
|
||||||
|
end;
|
||||||
|
incr_attempt(_, _) ->
|
||||||
|
error.
|
||||||
|
|
||||||
|
retry_activity(F, Alias, #{activity := #{ type := Type
|
||||||
|
, attempt := A
|
||||||
|
, retries := R} = Act} = Ctxt) ->
|
||||||
|
case incr_attempt(A, R) of
|
||||||
|
{RetryCtxt, A1} ->
|
||||||
|
Act1 = Act#{attempt := A1},
|
||||||
|
Ctxt1 = Ctxt#{activity := Act1},
|
||||||
|
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
|
||||||
|
catch
|
||||||
|
throw:{?MODULE, busy} ->
|
||||||
|
retry_activity(F, Alias, Ctxt1)
|
||||||
|
end;
|
||||||
|
error ->
|
||||||
|
return_abort(Type, error, retry_limit)
|
||||||
|
end.
|
||||||
|
|
||||||
|
retry_activity_(inner, F, Alias, Ctxt) ->
|
||||||
|
mrdb_stats:incr(Alias, inner_retries, 1),
|
||||||
|
try_f(F, Ctxt);
|
||||||
|
retry_activity_(outer, F, Alias, Ctxt) ->
|
||||||
|
mrdb_stats:incr(Alias, outer_retries, 1),
|
||||||
|
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
|
||||||
|
|
||||||
|
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
|
||||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||||
Act1 = Act#{attempt := A+1, handle := TxH},
|
Act1 = Act#{handle := TxH},
|
||||||
C1 = C#{ activity := Act1 },
|
C1 = C#{ activity := Act1 },
|
||||||
case maps:is_key(snapshot, C) of
|
case maps:is_key(snapshot, C) of
|
||||||
true ->
|
true ->
|
||||||
@ -368,7 +432,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
|
|||||||
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
||||||
|
|
||||||
check_tx_opts(Opts) ->
|
check_tx_opts(Opts) ->
|
||||||
case maps:without([no_snapshot, retries], Opts) of
|
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
|
||||||
Other when map_size(Other) > 0 ->
|
Other when map_size(Other) > 0 ->
|
||||||
abort({invalid_tx_opts, maps:keys(Other)});
|
abort({invalid_tx_opts, maps:keys(Other)});
|
||||||
_ ->
|
_ ->
|
||||||
@ -376,10 +440,14 @@ check_tx_opts(Opts) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
check_retries(#{retries := Retries} = Opts) ->
|
check_retries(#{retries := Retries} = Opts) ->
|
||||||
if is_integer(Retries), Retries >= 0 ->
|
case Retries of
|
||||||
Opts;
|
_ when is_integer(Retries), Retries >= 0 ->
|
||||||
true ->
|
Opts#{retries := {0, Retries}};
|
||||||
error({invalid_tx_option, {retries, Retries}})
|
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
|
||||||
|
Inner >= 0, Outer >= 0 ->
|
||||||
|
Opts;
|
||||||
|
_ ->
|
||||||
|
error({invalid_tx_option, {retries, Retries}})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
|
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
|
||||||
@ -394,7 +462,7 @@ create_tx(Opts, DbRef) ->
|
|||||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||||
Opts#{activity => maps:merge(Opts, #{ type => tx
|
Opts#{activity => maps:merge(Opts, #{ type => tx
|
||||||
, handle => TxH
|
, handle => TxH
|
||||||
, attempt => 1})}.
|
, attempt => 0 })}.
|
||||||
|
|
||||||
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
|
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
|
||||||
case NoSnap of
|
case NoSnap of
|
||||||
@ -414,8 +482,7 @@ commit_and_pop(Res) ->
|
|||||||
Res;
|
Res;
|
||||||
{error, {error, "Resource busy" ++ _ = Busy}} ->
|
{error, {error, "Resource busy" ++ _ = Busy}} ->
|
||||||
case A of
|
case A of
|
||||||
#{retries := Retries, attempt := Att}
|
#{retries := {I,O}} when I > 0; O > 0 ->
|
||||||
when Att =< Retries ->
|
|
||||||
throw({?MODULE, busy});
|
throw({?MODULE, busy});
|
||||||
_ ->
|
_ ->
|
||||||
error({error, Busy})
|
error({error, Busy})
|
||||||
@ -477,6 +544,11 @@ re_throw(Cat, Err) ->
|
|||||||
mnesia_compatible_aborts() ->
|
mnesia_compatible_aborts() ->
|
||||||
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
||||||
|
|
||||||
|
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
|
||||||
|
Bool;
|
||||||
|
mnesia_compatible_aborts(_) ->
|
||||||
|
mnesia_compatible_aborts().
|
||||||
|
|
||||||
fix_error({aborted, Err}) ->
|
fix_error({aborted, Err}) ->
|
||||||
Err;
|
Err;
|
||||||
fix_error(Err) ->
|
fix_error(Err) ->
|
||||||
@ -486,7 +558,7 @@ rdb_transaction(DbRef, Opts) ->
|
|||||||
rocksdb:transaction(DbRef, Opts).
|
rocksdb:transaction(DbRef, Opts).
|
||||||
|
|
||||||
rdb_transaction_commit_and_pop(H) ->
|
rdb_transaction_commit_and_pop(H) ->
|
||||||
try rdb_transaction_commit(H)
|
try rdb_transaction_commit(H)
|
||||||
after
|
after
|
||||||
pop_ctxt()
|
pop_ctxt()
|
||||||
end.
|
end.
|
||||||
@ -507,10 +579,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
|
|||||||
pop_ctxt()
|
pop_ctxt()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
rdb_release_batch(?BATCH_REF_DUMMY) ->
|
||||||
|
ok;
|
||||||
rdb_release_batch(H) ->
|
rdb_release_batch(H) ->
|
||||||
rocksdb:release_batch(H).
|
rocksdb:release_batch(H).
|
||||||
|
|
||||||
%% @doc Aborts an ongoing {@link activity/2}
|
%% @doc Aborts an ongoing {@link activity/2}
|
||||||
|
-spec abort(_) -> no_return().
|
||||||
abort(Reason) ->
|
abort(Reason) ->
|
||||||
case mnesia_compatible_aborts() of
|
case mnesia_compatible_aborts() of
|
||||||
true ->
|
true ->
|
||||||
@ -529,7 +604,7 @@ new_tx(#{activity := _}, _) ->
|
|||||||
new_tx(Tab, Opts) ->
|
new_tx(Tab, Opts) ->
|
||||||
#{db_ref := DbRef} = R = ensure_ref(Tab),
|
#{db_ref := DbRef} = R = ensure_ref(Tab),
|
||||||
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
|
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
|
||||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
|
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
|
||||||
|
|
||||||
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
|
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
|
||||||
tx_ref(Tab, TxH) ->
|
tx_ref(Tab, TxH) ->
|
||||||
@ -539,7 +614,7 @@ tx_ref(Tab, TxH) ->
|
|||||||
#{activity := #{type := tx, handle := OtherTxH}} ->
|
#{activity := #{type := tx, handle := OtherTxH}} ->
|
||||||
error({tx_handle_conflict, OtherTxH});
|
error({tx_handle_conflict, OtherTxH});
|
||||||
R ->
|
R ->
|
||||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
|
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec tx_commit(tx_handle() | db_ref()) -> ok.
|
-spec tx_commit(tx_handle() | db_ref()) -> ok.
|
||||||
@ -666,6 +741,21 @@ insert(Tab, Obj0, Opts) ->
|
|||||||
EncVal = encode_val(Obj, Ref),
|
EncVal = encode_val(Obj, Ref),
|
||||||
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
|
||||||
|
|
||||||
|
merge(Tab, Key, MergeOp) ->
|
||||||
|
merge(Tab, Key, MergeOp, []).
|
||||||
|
|
||||||
|
merge(Tab, Key, MergeOp, Opts) ->
|
||||||
|
#{encoding := Enc} = Ref = ensure_ref(Tab),
|
||||||
|
case Enc of
|
||||||
|
{_, {value, term}} ->
|
||||||
|
merge_(Ref, Key, MergeOp, Opts);
|
||||||
|
_ ->
|
||||||
|
abort(badarg)
|
||||||
|
end.
|
||||||
|
|
||||||
|
merge_(Ref, Key, MergeOp, Opts) ->
|
||||||
|
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
|
||||||
|
|
||||||
validate_obj(Obj, #{mode := mnesia}) ->
|
validate_obj(Obj, #{mode := mnesia}) ->
|
||||||
Obj;
|
Obj;
|
||||||
validate_obj(Obj, #{attr_pos := AP,
|
validate_obj(Obj, #{attr_pos := AP,
|
||||||
@ -798,7 +888,7 @@ update_index_do_bag(Ixs, Name, R, Key, Obj, Opts) ->
|
|||||||
not_found
|
not_found
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_index_do([{_Pos,ordered} = Ix|Ixs], Name, R, Key, Obj, Rest, Opts) ->
|
update_index_do([{_Pos,ordered} = Ix|Ixs], Name, R, Key, Obj, Rest, Opts) ->
|
||||||
Tab = {Name, index, Ix},
|
Tab = {Name, index, Ix},
|
||||||
#{ix_vals_f := IxValsF} = IRef = ensure_ref(Tab, R),
|
#{ix_vals_f := IxValsF} = IRef = ensure_ref(Tab, R),
|
||||||
@ -889,7 +979,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
|
|||||||
_ when is_atom(Ix) ->
|
_ when is_atom(Ix) ->
|
||||||
{attr_pos(Ix, Ref), ordered};
|
{attr_pos(Ix, Ref), ordered};
|
||||||
{_} ->
|
{_} ->
|
||||||
Ix;
|
{Ix, ordered};
|
||||||
_ when is_integer(Ix) ->
|
_ when is_integer(Ix) ->
|
||||||
{Ix, ordered}
|
{Ix, ordered}
|
||||||
end,
|
end,
|
||||||
@ -970,7 +1060,7 @@ alias_of(Tab) ->
|
|||||||
%% and when releasing, all batches are released. This will not ensure
|
%% and when releasing, all batches are released. This will not ensure
|
||||||
%% atomicity, but there is no way in rocksdb to achieve atomicity
|
%% atomicity, but there is no way in rocksdb to achieve atomicity
|
||||||
%% across db instances. At least, data should end up where you expect.
|
%% across db instances. At least, data should end up where you expect.
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
-spec as_batch(ref_or_tab(), fun( (db_ref()) -> Res )) -> Res.
|
-spec as_batch(ref_or_tab(), fun( (db_ref()) -> Res )) -> Res.
|
||||||
as_batch(Tab, F) ->
|
as_batch(Tab, F) ->
|
||||||
@ -1010,18 +1100,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
|
|||||||
get_batch(_) ->
|
get_batch(_) ->
|
||||||
{error, badarg}.
|
{error, badarg}.
|
||||||
|
|
||||||
get_batch_(DbRef) ->
|
init_batch_ref(DbRef) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
{ok, Batch} = rdb_batch(),
|
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
|
||||||
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
|
|
||||||
Ref.
|
Ref.
|
||||||
|
|
||||||
|
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
|
||||||
|
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
|
||||||
|
|
||||||
get_batch_(DbRef, BatchRef) ->
|
get_batch_(DbRef, BatchRef) ->
|
||||||
Key = batch_ref_key(BatchRef),
|
Key = batch_ref_key(BatchRef),
|
||||||
case pdict_get(Key) of
|
case pdict_get(Key) of
|
||||||
undefined ->
|
undefined ->
|
||||||
error(stale_batch_ref);
|
error(stale_batch_ref);
|
||||||
#{DbRef := Batch} ->
|
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
|
||||||
Batch;
|
Batch;
|
||||||
Map ->
|
Map ->
|
||||||
{ok, Batch} = rdb_batch(),
|
{ok, Batch} = rdb_batch(),
|
||||||
@ -1050,7 +1142,9 @@ write_batches(BatchRef, Opts) ->
|
|||||||
pdict_erase(Key),
|
pdict_erase(Key),
|
||||||
ret_batch_write_acc(
|
ret_batch_write_acc(
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(DbRef, Batch, Acc) ->
|
fun(_, ?BATCH_REF_DUMMY, Acc) ->
|
||||||
|
Acc;
|
||||||
|
(DbRef, Batch, Acc) ->
|
||||||
case rocksdb:write_batch(DbRef, Batch, Opts) of
|
case rocksdb:write_batch(DbRef, Batch, Opts) of
|
||||||
ok ->
|
ok ->
|
||||||
Acc;
|
Acc;
|
||||||
@ -1269,7 +1363,7 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
|||||||
true = valid_limit(Limit),
|
true = valid_limit(Limit),
|
||||||
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||||
|
|
||||||
valid_limit(L) ->
|
valid_limit(L) ->
|
||||||
case L of
|
case L of
|
||||||
infinity ->
|
infinity ->
|
||||||
true;
|
true;
|
||||||
@ -1494,8 +1588,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
|
|||||||
rdb_iterator(R, Opts) ->
|
rdb_iterator(R, Opts) ->
|
||||||
rdb_iterator_(R, read_opts(R, Opts)).
|
rdb_iterator_(R, read_opts(R, Opts)).
|
||||||
|
|
||||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
||||||
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
|
%% Note: versions before 1.8.0 expected DbRef as first argument
|
||||||
|
rocksdb:transaction_iterator(TxH, CfH, ROpts);
|
||||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
|
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
|
||||||
rocksdb:iterator(DbRef, CfH, ROpts).
|
rocksdb:iterator(DbRef, CfH, ROpts).
|
||||||
|
|
||||||
@ -1506,11 +1601,11 @@ rdb_merge_(#{db_ref := DbRef, cf_handle := CfH}, K, Op, WOpts) ->
|
|||||||
rocksdb:merge(DbRef, CfH, K, Op, WOpts).
|
rocksdb:merge(DbRef, CfH, K, Op, WOpts).
|
||||||
|
|
||||||
write_opts(#{write_opts := Os}, Opts) -> Os ++ Opts;
|
write_opts(#{write_opts := Os}, Opts) -> Os ++ Opts;
|
||||||
write_opts(_, Opts) ->
|
write_opts(_, Opts) ->
|
||||||
Opts.
|
Opts.
|
||||||
|
|
||||||
read_opts(#{read_opts := Os}, Opts) -> Os ++ Opts;
|
read_opts(#{read_opts := Os}, Opts) -> Os ++ Opts;
|
||||||
read_opts(_, Opts) ->
|
read_opts(_, Opts) ->
|
||||||
Opts.
|
Opts.
|
||||||
|
|
||||||
-define(EOT, '$end_of_table').
|
-define(EOT, '$end_of_table').
|
||||||
|
@ -6,11 +6,14 @@
|
|||||||
, iterator_move/2
|
, iterator_move/2
|
||||||
, iterator/2
|
, iterator/2
|
||||||
, iterator_close/1
|
, iterator_close/1
|
||||||
|
, fold/4
|
||||||
|
, rev_fold/4
|
||||||
|
, index_ref/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(mrdb_ix_iter, { i :: mrdb:iterator()
|
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
|
||||||
, type = set :: set | bag
|
, type = set :: set | bag
|
||||||
, sub :: mrdb:ref() | pid()
|
, sub :: pid() | mrdb:db_ref()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type ix_iterator() :: #mrdb_ix_iter{}.
|
-type ix_iterator() :: #mrdb_ix_iter{}.
|
||||||
@ -19,7 +22,7 @@
|
|||||||
|
|
||||||
-type object() :: tuple().
|
-type object() :: tuple().
|
||||||
|
|
||||||
-record(subst, { i :: mrdb:iterator()
|
-record(subst, { i :: mrdb:mrdb_iterator()
|
||||||
, vals_f
|
, vals_f
|
||||||
, cur
|
, cur
|
||||||
, mref }).
|
, mref }).
|
||||||
@ -62,6 +65,41 @@ iterator(Tab, IxPos) ->
|
|||||||
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
|
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
|
||||||
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
|
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
|
||||||
|
|
||||||
|
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||||
|
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||||
|
when Acc :: any().
|
||||||
|
%% Folds over the index table corresponding to Tab and IxPos.
|
||||||
|
%% The fold fun is called with the index key and the corresponding object,
|
||||||
|
%% or [] if there is no such object.
|
||||||
|
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||||
|
fold_(Tab, IxPos, first, next, FoldFun, Acc).
|
||||||
|
|
||||||
|
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||||
|
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||||
|
when Acc :: any().
|
||||||
|
%% Like fold/4 above, but folds over the index in the reverse order.
|
||||||
|
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||||
|
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
|
||||||
|
|
||||||
|
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
|
||||||
|
with_iterator(
|
||||||
|
Tab, IxPos,
|
||||||
|
fun(I) ->
|
||||||
|
iter_fold(I, Start, Dir, FoldFun, Acc)
|
||||||
|
end).
|
||||||
|
|
||||||
|
iter_fold(I, Start, Dir, Fun, Acc) ->
|
||||||
|
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
|
||||||
|
|
||||||
|
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
|
||||||
|
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
|
||||||
|
iter_fold_({error, _}, _, _, _, Acc) ->
|
||||||
|
Acc.
|
||||||
|
|
||||||
|
index_ref(Tab, Pos) ->
|
||||||
|
TRef = mrdb:ensure_ref(Tab),
|
||||||
|
ensure_index_ref(Pos, TRef).
|
||||||
|
|
||||||
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||||
case mrdb:iterator_move(I, Dir) of
|
case mrdb:iterator_move(I, Dir) of
|
||||||
{ok, {{FKey, PKey}}} ->
|
{ok, {{FKey, PKey}}} ->
|
||||||
@ -83,6 +121,7 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
|||||||
Other
|
Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
|
||||||
opt_read(R, Key) ->
|
opt_read(R, Key) ->
|
||||||
case mrdb:read(R, Key, []) of
|
case mrdb:read(R, Key, []) of
|
||||||
[Obj] ->
|
[Obj] ->
|
||||||
|
@ -3,79 +3,109 @@
|
|||||||
|
|
||||||
-export([ do/2 ]).
|
-export([ do/2 ]).
|
||||||
|
|
||||||
-export([ ensure_tab/0 ]).
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-endif.
|
||||||
|
|
||||||
-define(LOCK_TAB, ?MODULE).
|
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
|
||||||
|
%% critical section.
|
||||||
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
|
|
||||||
%% The claim operation is done using an atomic list of two updates:
|
|
||||||
%% first, incrementing with 0 - this returns the previous value
|
|
||||||
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
|
|
||||||
%% regardless of previous value. This means that if [0,1] is returned, the resource
|
|
||||||
%% was not locked previously; if [1,1] is returned, it was.
|
|
||||||
%%
|
%%
|
||||||
%% Releasing the resource is done by deleting the resource. If we just decrement,
|
%% Releasing the resource is done by notifying the server.
|
||||||
%% we will end up with lingering unlocked resources, so we might as well delete.
|
%%
|
||||||
%% Either operation is atomic, and the claim op creates the object if it's missing.
|
%% Previous implementations tested:
|
||||||
|
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
|
||||||
|
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
|
||||||
|
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
|
||||||
|
%% This is a pretty simple implementation, but it lacks ordering, and appeared
|
||||||
|
%% to sometimes lead to starvation.
|
||||||
|
%% * A duplicate_bag ets table: These are defined such that insertion order is
|
||||||
|
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
|
||||||
|
%% Unfortunately, performance plummeted when tested with > 25 concurrent
|
||||||
|
%% processes. While this is likely a very high number, given that we're talking
|
||||||
|
%% about contention in a system using optimistic concurrency, it's never good
|
||||||
|
%% if performance falls off a cliff.
|
||||||
|
|
||||||
do(Rsrc, F) when is_function(F, 0) ->
|
do(Rsrc, F) when is_function(F, 0) ->
|
||||||
true = claim(Rsrc),
|
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
|
||||||
try F()
|
try F()
|
||||||
after
|
after
|
||||||
release(Rsrc)
|
mrdb_mutex_serializer:done(Rsrc, Ref)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
claim(Rsrc) ->
|
-ifdef(TEST).
|
||||||
case claim_(Rsrc) of
|
|
||||||
true -> true;
|
mutex_test_() ->
|
||||||
false -> busy_wait(Rsrc, 1000)
|
{foreach,
|
||||||
|
fun setup/0,
|
||||||
|
fun cleanup/1,
|
||||||
|
[
|
||||||
|
{"Check that all operations complete", fun swarm_do/0}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
setup() ->
|
||||||
|
case whereis(mrdb_mutex_serializer) of
|
||||||
|
undefined ->
|
||||||
|
{ok, Pid} = mrdb_mutex_serializer:start_link(),
|
||||||
|
Pid;
|
||||||
|
Pid ->
|
||||||
|
Pid
|
||||||
end.
|
end.
|
||||||
|
|
||||||
claim_(Rsrc) ->
|
cleanup(Pid) ->
|
||||||
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
|
unlink(Pid),
|
||||||
{2, 1, 1, 1}], {Rsrc, 0}) of
|
exit(Pid, kill).
|
||||||
[0, 1] ->
|
|
||||||
%% have lock
|
|
||||||
true;
|
|
||||||
[1, 1] ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% The busy-wait function makes use of the fact that we can read a timer to find out
|
swarm_do() ->
|
||||||
%% if it still has time remaining. This reduces the need for selective receive, looking
|
Rsrc = ?LINE,
|
||||||
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
|
Pid = spawn(fun() -> collect([]) end),
|
||||||
%% also be necessary for the `read_timer/1` value to refresh.
|
L = lists:seq(1, 1000),
|
||||||
%%
|
Evens = [X || X <- L, is_even(X)],
|
||||||
busy_wait(Rsrc, Timeout) ->
|
Pids = [spawn_monitor(fun() ->
|
||||||
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
|
send_even(Rsrc, N, Pid)
|
||||||
do_wait(Rsrc, Ref).
|
end) || N <- lists:seq(1,1000)],
|
||||||
|
await_pids(Pids),
|
||||||
do_wait(Rsrc, Ref) ->
|
Results = fetch(Pid),
|
||||||
erlang:yield(),
|
{incorrect_results, []} = {incorrect_results, Results -- Evens},
|
||||||
case erlang:read_timer(Ref) of
|
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
|
||||||
false ->
|
|
||||||
erlang:cancel_timer(Ref),
|
|
||||||
error(lock_wait_timeout);
|
|
||||||
_ ->
|
|
||||||
case claim_(Rsrc) of
|
|
||||||
true ->
|
|
||||||
erlang:cancel_timer(Ref),
|
|
||||||
ok;
|
|
||||||
false ->
|
|
||||||
do_wait(Rsrc, Ref)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
release(Rsrc) ->
|
|
||||||
ets:delete(?LOCK_TAB, Rsrc),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
collect(Acc) ->
|
||||||
%% Called by the process holding the ets table.
|
receive
|
||||||
ensure_tab() ->
|
{_, result, N} ->
|
||||||
case ets:info(?LOCK_TAB, name) of
|
collect([N|Acc]);
|
||||||
undefined ->
|
{From, fetch} ->
|
||||||
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
|
From ! {fetch_reply, Acc},
|
||||||
_ ->
|
done
|
||||||
true
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
fetch(Pid) ->
|
||||||
|
Pid ! {self(), fetch},
|
||||||
|
receive
|
||||||
|
{fetch_reply, Result} ->
|
||||||
|
Result
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_even(N) ->
|
||||||
|
(N rem 2) =:= 0.
|
||||||
|
|
||||||
|
await_pids([{_, MRef}|Pids]) ->
|
||||||
|
receive
|
||||||
|
{'DOWN', MRef, _, _, _} ->
|
||||||
|
await_pids(Pids)
|
||||||
|
after 10000 ->
|
||||||
|
error(timeout)
|
||||||
|
end;
|
||||||
|
await_pids([]) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
send_even(Rsrc, N, Pid) ->
|
||||||
|
do(Rsrc, fun() ->
|
||||||
|
case is_even(N) of
|
||||||
|
true ->
|
||||||
|
Pid ! {self(), result, N};
|
||||||
|
false ->
|
||||||
|
exit(not_even)
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
98
src/mrdb_mutex_serializer.erl
Normal file
98
src/mrdb_mutex_serializer.erl
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
-module(mrdb_mutex_serializer).
|
||||||
|
|
||||||
|
-behavior(gen_server).
|
||||||
|
|
||||||
|
-export([wait/1,
|
||||||
|
done/2]).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3]).
|
||||||
|
|
||||||
|
-record(st, { queues = #{}
|
||||||
|
, empty_q = queue:new() }). %% perhaps silly optimization
|
||||||
|
|
||||||
|
wait(Rsrc) ->
|
||||||
|
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
|
||||||
|
|
||||||
|
done(Rsrc, Ref) ->
|
||||||
|
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
{ok, #st{}}.
|
||||||
|
|
||||||
|
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
|
||||||
|
, empty_q = NewQ } = St) ->
|
||||||
|
MRef = erlang:monitor(process, Pid),
|
||||||
|
Q0 = maps:get(Rsrc, Queues, NewQ),
|
||||||
|
WasEmpty = queue:is_empty(Q0),
|
||||||
|
Q1 = queue:in({From, MRef}, Q0),
|
||||||
|
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
|
||||||
|
case WasEmpty of
|
||||||
|
true ->
|
||||||
|
{reply, {ok, MRef}, St1};
|
||||||
|
false ->
|
||||||
|
{noreply, St1}
|
||||||
|
end.
|
||||||
|
|
||||||
|
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
|
||||||
|
case maps:find(Rsrc, Queues) of
|
||||||
|
{ok, Q} ->
|
||||||
|
case queue:out(Q) of
|
||||||
|
{{value, {_From, MRef}}, Q1} ->
|
||||||
|
erlang:demonitor(MRef),
|
||||||
|
ok = maybe_dispatch_one(Q1),
|
||||||
|
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
|
||||||
|
{_, _} ->
|
||||||
|
%% Not the lock holder
|
||||||
|
{noreply, St}
|
||||||
|
end;
|
||||||
|
error ->
|
||||||
|
{noreply, St}
|
||||||
|
end.
|
||||||
|
|
||||||
|
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
|
||||||
|
Queues1 =
|
||||||
|
maps:map(
|
||||||
|
fun(_Rsrc, Q) ->
|
||||||
|
drop_or_filter(Q, MRef)
|
||||||
|
end, Queues),
|
||||||
|
{noreply, St#st{ queues = Queues1 }};
|
||||||
|
handle_info(_, St) ->
|
||||||
|
{noreply, St}.
|
||||||
|
|
||||||
|
terminate(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_FromVsn, St, _Extra) ->
|
||||||
|
{ok, St}.
|
||||||
|
|
||||||
|
%% In this function, we don't actually pop
|
||||||
|
maybe_dispatch_one(Q) ->
|
||||||
|
case queue:peek(Q) of
|
||||||
|
empty ->
|
||||||
|
ok;
|
||||||
|
{value, {From, MRef}} ->
|
||||||
|
gen_server:reply(From, {ok, MRef}),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
drop_or_filter(Q, MRef) ->
|
||||||
|
case queue:peek(Q) of
|
||||||
|
{value, {_, MRef}} ->
|
||||||
|
Q1 = queue:drop(Q),
|
||||||
|
ok = maybe_dispatch_one(Q1),
|
||||||
|
Q1;
|
||||||
|
{value, _Other} ->
|
||||||
|
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
|
||||||
|
empty ->
|
||||||
|
Q
|
||||||
|
end.
|
74
src/mrdb_stats.erl
Normal file
74
src/mrdb_stats.erl
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
|
||||||
|
%% @doc Statistics API for the mnesia_rocksdb plugin
|
||||||
|
%%
|
||||||
|
%% Some counters are maintained for each active alias. Currently, the following
|
||||||
|
%% counters are supported:
|
||||||
|
%% * inner_retries
|
||||||
|
%% * outer_retries
|
||||||
|
%%
|
||||||
|
-module(mrdb_stats).
|
||||||
|
|
||||||
|
-export([new/0]).
|
||||||
|
|
||||||
|
-export([incr/3,
|
||||||
|
get/1,
|
||||||
|
get/2]).
|
||||||
|
|
||||||
|
-type alias() :: mnesia_rocksdb:alias().
|
||||||
|
-type db_ref() :: mrdb:db_ref().
|
||||||
|
-type counter() :: atom().
|
||||||
|
-type increment() :: integer().
|
||||||
|
|
||||||
|
-type counters() :: #{ counter() := integer() }.
|
||||||
|
|
||||||
|
new() ->
|
||||||
|
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
|
||||||
|
, meta => ctr_meta()}.
|
||||||
|
|
||||||
|
ctr_meta() ->
|
||||||
|
#{ inner_retries => 1
|
||||||
|
, outer_retries => 2 }.
|
||||||
|
|
||||||
|
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
|
||||||
|
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
|
||||||
|
%%
|
||||||
|
%% Note that the first argument may also be a `db_ref()' map,
|
||||||
|
%% corresponding to `mrdb:get_ref({admin, Alias})'.
|
||||||
|
%% @end
|
||||||
|
incr(Alias, Ctr, N) when is_atom(Alias) ->
|
||||||
|
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||||
|
incr_(Ref, Meta, Ctr, N);
|
||||||
|
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
|
||||||
|
incr_(Ref, Meta, Ctr, N).
|
||||||
|
|
||||||
|
-spec get(alias() | db_ref(), counter()) -> integer().
|
||||||
|
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
|
||||||
|
%% @end
|
||||||
|
get(Alias, Ctr) when is_atom(Alias) ->
|
||||||
|
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||||
|
get_(Ref, Meta, Ctr);
|
||||||
|
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
|
||||||
|
get_(Ref, Meta, Ctr).
|
||||||
|
|
||||||
|
-spec get(alias() | db_ref()) -> counters().
|
||||||
|
%% @doc Fetches all known counters for `Alias', in the form of a map,
|
||||||
|
%% `#{Counter => Value}'.
|
||||||
|
%% @end
|
||||||
|
get(Alias) when is_atom(Alias) ->
|
||||||
|
get_(mrdb:get_ref({admin, Alias}));
|
||||||
|
get(Ref) when is_map(Ref) ->
|
||||||
|
get_(Ref).
|
||||||
|
|
||||||
|
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({K, P}, M) ->
|
||||||
|
M#{K := counters:get(Ref, P)}
|
||||||
|
end, Meta, maps:to_list(Meta)).
|
||||||
|
|
||||||
|
get_(Ref, Meta, Attr) ->
|
||||||
|
Ix = maps:get(Attr, Meta),
|
||||||
|
counters:get(Ref, Ix).
|
||||||
|
|
||||||
|
incr_(Ref, Meta, Attr, N) ->
|
||||||
|
Ix = maps:get(Attr, Meta),
|
||||||
|
counters:add(Ref, Ix, N).
|
@ -24,8 +24,12 @@
|
|||||||
, mrdb_abort/1
|
, mrdb_abort/1
|
||||||
, mrdb_two_procs/1
|
, mrdb_two_procs/1
|
||||||
, mrdb_two_procs_tx_restart/1
|
, mrdb_two_procs_tx_restart/1
|
||||||
|
, mrdb_two_procs_tx_inner_restart/1
|
||||||
, mrdb_two_procs_snap/1
|
, mrdb_two_procs_snap/1
|
||||||
, mrdb_three_procs/1
|
, mrdb_three_procs/1
|
||||||
|
, create_counters/1
|
||||||
|
, update_counters/1
|
||||||
|
, restart_node/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
@ -41,7 +45,8 @@ all() ->
|
|||||||
groups() ->
|
groups() ->
|
||||||
[
|
[
|
||||||
{all_tests, [sequence], [ {group, checks}
|
{all_tests, [sequence], [ {group, checks}
|
||||||
, {group, mrdb} ]}
|
, {group, mrdb}
|
||||||
|
, {group, counters}]}
|
||||||
%% , error_handling ]}
|
%% , error_handling ]}
|
||||||
, {checks, [sequence], [ encoding_sext_attrs
|
, {checks, [sequence], [ encoding_sext_attrs
|
||||||
, encoding_binary_binary
|
, encoding_binary_binary
|
||||||
@ -53,8 +58,13 @@ groups() ->
|
|||||||
, mrdb_abort
|
, mrdb_abort
|
||||||
, mrdb_two_procs
|
, mrdb_two_procs
|
||||||
, mrdb_two_procs_tx_restart
|
, mrdb_two_procs_tx_restart
|
||||||
|
, mrdb_two_procs_tx_inner_restart
|
||||||
, mrdb_two_procs_snap
|
, mrdb_two_procs_snap
|
||||||
, mrdb_three_procs ]}
|
, mrdb_three_procs ]}
|
||||||
|
, {counters, [sequence], [ create_counters
|
||||||
|
, update_counters
|
||||||
|
, restart_node
|
||||||
|
, update_counters ]}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
||||||
@ -267,7 +277,7 @@ mrdb_abort(Config) ->
|
|||||||
Pre = mrdb:read(tx_abort, a),
|
Pre = mrdb:read(tx_abort, a),
|
||||||
D0 = get_dict(),
|
D0 = get_dict(),
|
||||||
TRes = try mrdb:activity(
|
TRes = try mrdb:activity(
|
||||||
tx, rdb,
|
{tx, #{mnesia_compatible => true}}, rdb,
|
||||||
fun() ->
|
fun() ->
|
||||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||||
error(abort_here),
|
error(abort_here),
|
||||||
@ -287,10 +297,17 @@ mrdb_abort(Config) ->
|
|||||||
mrdb_two_procs(Config) ->
|
mrdb_two_procs(Config) ->
|
||||||
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
|
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
|
||||||
tr_flags(
|
tr_flags(
|
||||||
{self(), [call, sos, p]},
|
{self(), [call, sos, p, 'receive']},
|
||||||
tr_patterns(
|
tr_patterns(
|
||||||
mrdb, [ {mrdb, insert, 2, x}
|
mrdb, [ {mrdb, insert, 2, x}
|
||||||
, {mrdb, read, 2, x}
|
, {mrdb, read, 2, x}
|
||||||
|
, {mrdb, retry_activity, 3, x}
|
||||||
|
, {mrdb, try_f, 2, x}
|
||||||
|
, {mrdb, incr_attempt, 2, x}
|
||||||
|
, {mrdb_mutex, do, 2, x}
|
||||||
|
, {mrdb_mutex_serializer, do, 2, x}
|
||||||
|
, {?MODULE, wait_for_other, 2, x}
|
||||||
|
, {?MODULE, go_ahead_other, 1, x}
|
||||||
, {mrdb, activity, x}], tr_opts()))).
|
, {mrdb, activity, x}], tr_opts()))).
|
||||||
|
|
||||||
mrdb_two_procs_(Config) ->
|
mrdb_two_procs_(Config) ->
|
||||||
@ -314,11 +331,16 @@ mrdb_two_procs_(Config) ->
|
|||||||
Pre = mrdb:read(R, a),
|
Pre = mrdb:read(R, a),
|
||||||
go_ahead_other(POther),
|
go_ahead_other(POther),
|
||||||
await_other_down(POther, MRef, ?LINE),
|
await_other_down(POther, MRef, ?LINE),
|
||||||
|
%% The test proc is still inside the transaction, and POther
|
||||||
|
%% has terminated/committed. If we now read 'a', we should see
|
||||||
|
%% the value that POther wrote (no isolation).
|
||||||
[{R, a, 17}] = mrdb:read(R, a),
|
[{R, a, 17}] = mrdb:read(R, a),
|
||||||
ok = mrdb:insert(R, {R, a, 18})
|
ok = mrdb:insert(R, {R, a, 18})
|
||||||
end,
|
end,
|
||||||
go_ahead_other(1, POther),
|
go_ahead_other(0, POther),
|
||||||
Do0 = get_dict(),
|
Do0 = get_dict(),
|
||||||
|
%% Our transaction should fail due to the resource conflict, and because
|
||||||
|
%% we're not using retries.
|
||||||
try mrdb:activity({tx, #{no_snapshot => true,
|
try mrdb:activity({tx, #{no_snapshot => true,
|
||||||
retries => 0}}, rdb, F1) of
|
retries => 0}}, rdb, F1) of
|
||||||
ok -> error(unexpected)
|
ok -> error(unexpected)
|
||||||
@ -339,6 +361,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
|||||||
R = ?FUNCTION_NAME,
|
R = ?FUNCTION_NAME,
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
Created = create_tabs([{R, []}], Config),
|
Created = create_tabs([{R, []}], Config),
|
||||||
|
check_stats(rdb),
|
||||||
mrdb:insert(R, {R, a, 1}),
|
mrdb:insert(R, {R, a, 1}),
|
||||||
Pre = mrdb:read(R, a),
|
Pre = mrdb:read(R, a),
|
||||||
F0 = fun() ->
|
F0 = fun() ->
|
||||||
@ -354,7 +377,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
|||||||
OtherWrite = [{R, a, 17}],
|
OtherWrite = [{R, a, 17}],
|
||||||
Att = get_attempt(),
|
Att = get_attempt(),
|
||||||
Expected = case Att of
|
Expected = case Att of
|
||||||
1 -> Pre;
|
0 -> Pre;
|
||||||
_ -> OtherWrite
|
_ -> OtherWrite
|
||||||
end,
|
end,
|
||||||
Expected = mrdb:read(R, a),
|
Expected = mrdb:read(R, a),
|
||||||
@ -363,14 +386,104 @@ mrdb_two_procs_tx_restart_(Config) ->
|
|||||||
OtherWrite = mrdb:read(R, a),
|
OtherWrite = mrdb:read(R, a),
|
||||||
ok = mrdb:insert(R, {R, a, 18})
|
ok = mrdb:insert(R, {R, a, 18})
|
||||||
end,
|
end,
|
||||||
go_ahead_other(1, POther),
|
go_ahead_other(0, POther),
|
||||||
Do0 = get_dict(),
|
Do0 = get_dict(),
|
||||||
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
|
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
|
||||||
dictionary_unchanged(Do0),
|
dictionary_unchanged(Do0),
|
||||||
|
check_stats(rdb),
|
||||||
[{R, a, 18}] = mrdb:read(R, a),
|
[{R, a, 18}] = mrdb:read(R, a),
|
||||||
delete_tabs(Created),
|
delete_tabs(Created),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
mrdb_two_procs_tx_inner_restart(Config) ->
|
||||||
|
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
|
||||||
|
dbg_tr_opts()).
|
||||||
|
|
||||||
|
mrdb_two_procs_tx_inner_restart_(Config) ->
|
||||||
|
R = ?FUNCTION_NAME,
|
||||||
|
Parent = self(),
|
||||||
|
Created = create_tabs([{R, []}], Config),
|
||||||
|
mrdb:insert(R, {R, a, 1}),
|
||||||
|
mrdb:insert(R, {R, b, 1}),
|
||||||
|
PreA = mrdb:read(R, a),
|
||||||
|
PreB = mrdb:read(R, b),
|
||||||
|
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
|
||||||
|
F0 = fun() ->
|
||||||
|
wait_for_other(Parent, ?LINE),
|
||||||
|
ok = mrdb:insert(R, {R, a, 17}),
|
||||||
|
wait_for_other(Parent, ?LINE)
|
||||||
|
end,
|
||||||
|
F1 = fun() ->
|
||||||
|
wait_for_other(Parent, ?LINE),
|
||||||
|
ok = mrdb:insert(R, {R, b, 147}),
|
||||||
|
wait_for_other(Parent, ?LINE)
|
||||||
|
end,
|
||||||
|
|
||||||
|
Spawn = fun(F) ->
|
||||||
|
Res = spawn_opt(
|
||||||
|
fun() ->
|
||||||
|
ok = mrdb:activity(tx, rdb, F)
|
||||||
|
end, [monitor]),
|
||||||
|
Res
|
||||||
|
end,
|
||||||
|
{P1, MRef1} = Spawn(F0),
|
||||||
|
{P2, MRef2} = Spawn(F1),
|
||||||
|
F2 = fun() ->
|
||||||
|
PostA = [{R, a, 17}], % once F0 (P1) has committed
|
||||||
|
PostB = [{R, b, 147}], % once F1 (P2) has committed
|
||||||
|
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
|
||||||
|
BRes = mrdb:read(R, b),
|
||||||
|
Att = get_attempt(),
|
||||||
|
ct:log("Att = ~p", [Att]),
|
||||||
|
case Att of
|
||||||
|
0 -> % This is the first run (no retry yet)
|
||||||
|
ARes = PreA,
|
||||||
|
BRes = PreB,
|
||||||
|
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
|
||||||
|
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
|
||||||
|
await_other_down(P1, MRef1, ?LINE),
|
||||||
|
PostA = mrdb:read(R, a); % now, P1's write should leak through here
|
||||||
|
{0, 1} -> % This is the first (outer) retry
|
||||||
|
go_ahead_other(0, P2), % Let P2 write
|
||||||
|
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
|
||||||
|
await_other_down(P2, MRef2, ?LINE),
|
||||||
|
PostA = mrdb:read(R, a), % now we should see writes from both P1
|
||||||
|
%% On OTP 23, the following step might fail due to timing, even
|
||||||
|
%% though the trace output looks as expected. Possibly some quirk
|
||||||
|
%% with leakage propagation time in rocksdb. Let's retry to be sure.
|
||||||
|
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
|
||||||
|
{1, 1} ->
|
||||||
|
PostA = mrdb:read(R, a),
|
||||||
|
PostB = mrdb:read(R, b),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
mrdb:insert(R, {R, a, 18}),
|
||||||
|
mrdb:insert(R, {R, b, 18})
|
||||||
|
end,
|
||||||
|
Do0 = get_dict(),
|
||||||
|
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
|
||||||
|
check_stats(rdb),
|
||||||
|
dictionary_unchanged(Do0),
|
||||||
|
[{R, a, 18}] = mrdb:read(R, a),
|
||||||
|
[{R, b, 18}] = mrdb:read(R, b),
|
||||||
|
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
|
||||||
|
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
|
||||||
|
delete_tabs(Created),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
try_until(Result, F) ->
|
||||||
|
try_until(Result, F, 10).
|
||||||
|
|
||||||
|
try_until(Result, F, N) when N > 0 ->
|
||||||
|
case F() of
|
||||||
|
Result ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
receive after 100 -> ok end,
|
||||||
|
try_until(Result, F, N-1)
|
||||||
|
end;
|
||||||
|
try_until(Result, F, _) ->
|
||||||
|
error({badmatch, {Result, F}}).
|
||||||
|
|
||||||
%
|
%
|
||||||
%% For testing purposes, we use side-effects inside the transactions
|
%% For testing purposes, we use side-effects inside the transactions
|
||||||
@ -383,7 +496,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
|||||||
%% attempt, and ignore the sync ops on retries.
|
%% attempt, and ignore the sync ops on retries.
|
||||||
%%
|
%%
|
||||||
-define(IF_FIRST(N, Expr),
|
-define(IF_FIRST(N, Expr),
|
||||||
if N == 1 ->
|
if N == 0 ->
|
||||||
Expr;
|
Expr;
|
||||||
true ->
|
true ->
|
||||||
ok
|
ok
|
||||||
@ -413,8 +526,8 @@ mrdb_two_procs_snap(Config) ->
|
|||||||
go_ahead_other(Att, POther),
|
go_ahead_other(Att, POther),
|
||||||
ARes = mrdb:read(R, a),
|
ARes = mrdb:read(R, a),
|
||||||
ARes = case Att of
|
ARes = case Att of
|
||||||
1 -> Pre;
|
0 -> Pre;
|
||||||
2 -> [{R, a, 17}]
|
_ -> [{R, a, 17}]
|
||||||
end,
|
end,
|
||||||
await_other_down(POther, MRef, ?LINE),
|
await_other_down(POther, MRef, ?LINE),
|
||||||
PreB = mrdb:read(R, b),
|
PreB = mrdb:read(R, b),
|
||||||
@ -434,7 +547,7 @@ mrdb_two_procs_snap(Config) ->
|
|||||||
%% We make sure that P2 commits before finishing the other two, and P3 and the
|
%% We make sure that P2 commits before finishing the other two, and P3 and the
|
||||||
%% main thread sync, so as to maximize the contention for the retry lock.
|
%% main thread sync, so as to maximize the contention for the retry lock.
|
||||||
mrdb_three_procs(Config) ->
|
mrdb_three_procs(Config) ->
|
||||||
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
|
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
|
||||||
|
|
||||||
mrdb_three_procs_(Config) ->
|
mrdb_three_procs_(Config) ->
|
||||||
R = ?FUNCTION_NAME,
|
R = ?FUNCTION_NAME,
|
||||||
@ -452,7 +565,7 @@ mrdb_three_procs_(Config) ->
|
|||||||
spawn_opt(fun() ->
|
spawn_opt(fun() ->
|
||||||
D0 = get_dict(),
|
D0 = get_dict(),
|
||||||
do_when_p_allows(
|
do_when_p_allows(
|
||||||
1, Parent, ?LINE,
|
0, Parent, ?LINE,
|
||||||
fun() ->
|
fun() ->
|
||||||
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
|
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
|
||||||
end),
|
end),
|
||||||
@ -488,8 +601,8 @@ mrdb_three_procs_(Config) ->
|
|||||||
fun() ->
|
fun() ->
|
||||||
Att = get_attempt(),
|
Att = get_attempt(),
|
||||||
ARes = case Att of
|
ARes = case Att of
|
||||||
1 -> [A0];
|
0 -> [A0];
|
||||||
2 -> [A1]
|
_ -> [A1]
|
||||||
end,
|
end,
|
||||||
%% First, ensure that P2 tx is running
|
%% First, ensure that P2 tx is running
|
||||||
go_ahead_other(Att, P2),
|
go_ahead_other(Att, P2),
|
||||||
@ -519,6 +632,7 @@ tr_opts() ->
|
|||||||
, {?MODULE, await_other_down, 3, x}
|
, {?MODULE, await_other_down, 3, x}
|
||||||
, {?MODULE, do_when_p_allows, 4, x}
|
, {?MODULE, do_when_p_allows, 4, x}
|
||||||
, {?MODULE, allow_p, 3, x}
|
, {?MODULE, allow_p, 3, x}
|
||||||
|
, {?MODULE, check_stats, 1, x}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
light_tr_opts() ->
|
light_tr_opts() ->
|
||||||
@ -529,6 +643,23 @@ light_tr_opts() ->
|
|||||||
, {mrdb, read, 2, x}
|
, {mrdb, read, 2, x}
|
||||||
, {mrdb, activity, x} ], tr_opts())).
|
, {mrdb, activity, x} ], tr_opts())).
|
||||||
|
|
||||||
|
dbg_tr_opts() ->
|
||||||
|
tr_flags(
|
||||||
|
{self(), [call, sos, p, 'receive']},
|
||||||
|
tr_patterns(
|
||||||
|
mrdb, [ {mrdb, insert, 2, x}
|
||||||
|
, {mrdb, read, 2, x}
|
||||||
|
, {mrdb, retry_activity, 3, x}
|
||||||
|
, {mrdb, try_f, 2, x}
|
||||||
|
, {mrdb, incr_attempt, 2, x}
|
||||||
|
, {mrdb, current_context, 0, x}
|
||||||
|
, {mrdb_mutex, do, 2, x}
|
||||||
|
, {mrdb_mutex_serializer, do, 2, x}
|
||||||
|
, {?MODULE, wait_for_other, 2, x}
|
||||||
|
, {?MODULE, go_ahead_other, 1, x}
|
||||||
|
, {?MODULE, try_until, 3, x}
|
||||||
|
, {mrdb, activity, x} ], tr_opts())).
|
||||||
|
|
||||||
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
|
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
|
||||||
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
|
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
|
||||||
Opts#{patterns => Ps ++ Pats1}.
|
Opts#{patterns => Ps ++ Pats1}.
|
||||||
@ -542,12 +673,13 @@ wait_for_other(Parent, L) ->
|
|||||||
wait_for_other(Att, Parent, L) ->
|
wait_for_other(Att, Parent, L) ->
|
||||||
wait_for_other(Att, Parent, 1000, L).
|
wait_for_other(Att, Parent, 1000, L).
|
||||||
|
|
||||||
wait_for_other(1, Parent, Timeout, L) ->
|
wait_for_other(0, Parent, Timeout, L) ->
|
||||||
MRef = monitor(process, Parent),
|
MRef = monitor(process, Parent),
|
||||||
Parent ! {self(), ready},
|
Parent ! {self(), ready},
|
||||||
receive
|
receive
|
||||||
{Parent, cont} ->
|
{Parent, cont} ->
|
||||||
demonitor(MRef),
|
demonitor(MRef),
|
||||||
|
Parent ! {self(), cont_ack},
|
||||||
ok;
|
ok;
|
||||||
{'DOWN', MRef, _, _, Reason} ->
|
{'DOWN', MRef, _, _, Reason} ->
|
||||||
ct:log("Parent died, Reason = ~p", [Reason]),
|
ct:log("Parent died, Reason = ~p", [Reason]),
|
||||||
@ -586,7 +718,13 @@ go_ahead_other(Att, POther, Timeout) ->
|
|||||||
go_ahead_other_(POther, Timeout) ->
|
go_ahead_other_(POther, Timeout) ->
|
||||||
receive
|
receive
|
||||||
{POther, ready} ->
|
{POther, ready} ->
|
||||||
POther ! {self(), cont}
|
POther ! {self(), cont},
|
||||||
|
receive
|
||||||
|
{POther, cont_ack} ->
|
||||||
|
ok
|
||||||
|
after Timeout ->
|
||||||
|
error(cont_ack_timeout)
|
||||||
|
end
|
||||||
after Timeout ->
|
after Timeout ->
|
||||||
error(go_ahead_timeout)
|
error(go_ahead_timeout)
|
||||||
end.
|
end.
|
||||||
@ -619,6 +757,35 @@ get_attempt() ->
|
|||||||
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
|
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
|
||||||
Attempt.
|
Attempt.
|
||||||
|
|
||||||
|
create_counters(_Config) ->
|
||||||
|
create_tab(counters, []),
|
||||||
|
mrdb:insert(counters, {counters, c0, 1}),
|
||||||
|
mrdb:update_counter(counters, c1, 1),
|
||||||
|
[{counters, c0, 1}] = mrdb:read(counters, c0),
|
||||||
|
[{counters, c1, 1}] = mrdb:read(counters, c1),
|
||||||
|
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
restart_node(_Config) ->
|
||||||
|
mnesia:stop(),
|
||||||
|
ok = mnesia:start(),
|
||||||
|
ct:log("mnesia restarted", []),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
update_counters(_Config) ->
|
||||||
|
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
|
||||||
|
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
|
||||||
|
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
|
||||||
|
ok = mrdb:update_counter(counters, c0, 1),
|
||||||
|
ok = mrdb:update_counter(counters, c1, 1),
|
||||||
|
ct:log("Incremented c0 and c1 by 1", []),
|
||||||
|
C0 = C0Prev + 1,
|
||||||
|
C1 = C1Prev + 1,
|
||||||
|
[{counters, c0, C0}] = mrdb:read(counters, c0),
|
||||||
|
[{counters, c1, C1}] = mrdb:read(counters, c1),
|
||||||
|
ct:log("c0: ~p, c1: ~p", [C0, C1]),
|
||||||
|
ok.
|
||||||
|
|
||||||
create_tabs(Tabs, Config) ->
|
create_tabs(Tabs, Config) ->
|
||||||
Res = lists:map(fun create_tab/1, Tabs),
|
Res = lists:map(fun create_tab/1, Tabs),
|
||||||
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
|
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
|
||||||
@ -645,3 +812,8 @@ dictionary_unchanged(Old) ->
|
|||||||
, added := [] } = #{ deleted => Old -- New
|
, added := [] } = #{ deleted => Old -- New
|
||||||
, added => New -- Old },
|
, added => New -- Old },
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
check_stats(Alias) ->
|
||||||
|
Stats = mrdb_stats:get(Alias),
|
||||||
|
ct:log("Stats: ~p", [Stats]),
|
||||||
|
Stats.
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
-export([
|
-export([
|
||||||
index_plugin_mgmt/1
|
index_plugin_mgmt/1
|
||||||
, add_indexes/1
|
, add_indexes/1
|
||||||
|
, delete_indexes/1
|
||||||
, create_bag_index/1
|
, create_bag_index/1
|
||||||
, create_ordered_index/1
|
, create_ordered_index/1
|
||||||
, test_1_ram_copies/1
|
, test_1_ram_copies/1
|
||||||
@ -40,10 +41,12 @@
|
|||||||
, fail_1_disc_only/1
|
, fail_1_disc_only/1
|
||||||
, plugin_ram_copies1/1
|
, plugin_ram_copies1/1
|
||||||
, plugin_ram_copies2/1
|
, plugin_ram_copies2/1
|
||||||
|
, plugin_ram_copies3/1
|
||||||
, plugin_disc_copies/1
|
, plugin_disc_copies/1
|
||||||
, fail_plugin_disc_only/1
|
, fail_plugin_disc_only/1
|
||||||
, plugin_disc_copies_bag/1
|
, plugin_disc_copies_bag/1
|
||||||
, plugin_rdb_ordered/1
|
, plugin_rdb_ordered/1
|
||||||
|
, plugin_rdb_none/1
|
||||||
, index_iterator/1
|
, index_iterator/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
@ -73,10 +76,12 @@ run(Config) ->
|
|||||||
{pfx},mnesia_rocksdb, ix_prefixes),
|
{pfx},mnesia_rocksdb, ix_prefixes),
|
||||||
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
|
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
|
||||||
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
|
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
|
||||||
|
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
|
||||||
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
|
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
|
||||||
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
|
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
|
||||||
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
|
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
|
||||||
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
|
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
|
||||||
|
test_index_plugin(cfg([pl3, rdb, none], Config)),
|
||||||
index_plugin_mgmt(Config),
|
index_plugin_mgmt(Config),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
@ -94,6 +99,7 @@ groups() ->
|
|||||||
, create_ordered_index
|
, create_ordered_index
|
||||||
, index_plugin_mgmt
|
, index_plugin_mgmt
|
||||||
, add_indexes
|
, add_indexes
|
||||||
|
, delete_indexes
|
||||||
]}
|
]}
|
||||||
, {access, [sequence], [
|
, {access, [sequence], [
|
||||||
test_1_ram_copies
|
test_1_ram_copies
|
||||||
@ -104,10 +110,12 @@ groups() ->
|
|||||||
, {plugin, [sequence], [
|
, {plugin, [sequence], [
|
||||||
plugin_ram_copies1
|
plugin_ram_copies1
|
||||||
, plugin_ram_copies2
|
, plugin_ram_copies2
|
||||||
|
, plugin_ram_copies3
|
||||||
, plugin_disc_copies
|
, plugin_disc_copies
|
||||||
, fail_plugin_disc_only
|
, fail_plugin_disc_only
|
||||||
, plugin_disc_copies_bag
|
, plugin_disc_copies_bag
|
||||||
, plugin_rdb_ordered
|
, plugin_rdb_ordered
|
||||||
|
, plugin_rdb_none
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
|
|
||||||
@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
|
|||||||
|
|
||||||
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
|
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
|
||||||
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
|
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
|
||||||
|
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
|
||||||
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
|
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
|
||||||
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
|
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
|
||||||
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
|
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
|
||||||
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
|
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
|
||||||
|
|
||||||
|
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
|
||||||
|
|
||||||
test(N, Type, T) ->
|
test(N, Type, T) ->
|
||||||
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
|
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
|
||||||
{attributes,[k,a,b,c]},
|
{attributes,[k,a,b,c]},
|
||||||
@ -204,7 +215,7 @@ add_del_indexes() ->
|
|||||||
test_index_plugin(Config) ->
|
test_index_plugin(Config) ->
|
||||||
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
|
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
|
||||||
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
|
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
|
||||||
{index, [{{pfx}, IxType}]}]),
|
{index, [ixtype(IxType)]}]),
|
||||||
mnesia:dirty_write({Tab, "foobar", "sentence"}),
|
mnesia:dirty_write({Tab, "foobar", "sentence"}),
|
||||||
mnesia:dirty_write({Tab, "yellow", "sensor"}),
|
mnesia:dirty_write({Tab, "yellow", "sensor"}),
|
||||||
mnesia:dirty_write({Tab, "truth", "white"}),
|
mnesia:dirty_write({Tab, "truth", "white"}),
|
||||||
@ -216,13 +227,27 @@ test_index_plugin(Config) ->
|
|||||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||||
Tab, <<"foo">>, {pfx});
|
Tab, <<"foo">>, {pfx});
|
||||||
IxType == ordered ->
|
IxType == ordered; IxType == none ->
|
||||||
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
|
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
|
||||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||||
Tab, <<"foo">>, {pfx})
|
Tab, <<"foo">>, {pfx})
|
||||||
|
end,
|
||||||
|
if Type == rdb ->
|
||||||
|
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
|
||||||
|
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
|
||||||
|
[{Tab,"foobar","sentence"}] = mrdb:index_read(
|
||||||
|
Tab, <<"foo">>, {pfx});
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
ixtype(T) when T==bag;
|
||||||
|
T==ordered ->
|
||||||
|
{{pfx}, T};
|
||||||
|
ixtype(none) ->
|
||||||
|
{pfx}.
|
||||||
|
|
||||||
create_bag_index(_Config) ->
|
create_bag_index(_Config) ->
|
||||||
{aborted, {combine_error, _, _}} =
|
{aborted, {combine_error, _, _}} =
|
||||||
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
|
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
|
||||||
@ -239,6 +264,25 @@ add_indexes(_Config) ->
|
|||||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
delete_indexes(_Config) ->
|
||||||
|
T = ?TAB(t1),
|
||||||
|
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
|
||||||
|
ok = mrdb:insert(T, {T, a, 1, 1}),
|
||||||
|
ok = mrdb:insert(T, {T, b, 1, 2}),
|
||||||
|
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||||
|
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
|
||||||
|
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||||
|
{atomic, ok} = mnesia:del_table_index(T, a),
|
||||||
|
ok = mrdb:delete(T, b),
|
||||||
|
ok = mrdb:insert(T, {T, c, 2, 3}),
|
||||||
|
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||||
|
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
|
||||||
|
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
ix_fold_acc(K, V, Acc) ->
|
||||||
|
[{K, V} | Acc].
|
||||||
|
|
||||||
index_plugin_mgmt(_Config) ->
|
index_plugin_mgmt(_Config) ->
|
||||||
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
|
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
|
||||||
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
|
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
-module(mrdb_bench).
|
-module(mrdb_bench).
|
||||||
|
|
||||||
-compile(export_all).
|
-compile([export_all, nowarn_export_all]).
|
||||||
|
|
||||||
init() ->
|
init() ->
|
||||||
mnesia:delete_schema([node()]),
|
mnesia:delete_schema([node()]),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user