Compare commits

...

57 Commits

Author SHA1 Message Date
318f84bbaf Merge pull request 'Ensure the erlang_merge_operator is always loaded' (#5) from uw-merge-operator into master
Reviewed-on: #5
2025-06-17 18:05:58 +09:00
Ulf Wiger
0691d8b055 Ensure the erlang_merge_operator is always loaded 2025-06-15 20:43:36 +02:00
ac69c8564f Merge pull request 'Consistent support for merge ops' (#4) from uw-merge-ops into master
Reviewed-on: #4
2025-06-16 03:41:44 +09:00
Ulf Wiger
2ca7f36eb1 Consistent support for merge ops 2025-06-02 20:07:05 +02:00
Ulf Wiger
56d78768d9 Produce stacktraces unless mnesia_compatible 2025-03-23 21:06:21 +01:00
a768d8008f Merge remote-tracking branch 'ae/master' into emqx 2024-10-17 10:54:42 +09:00
Richard Carlsson
9cea41cb9a
Merge pull request #57 from richcarl/use-fixed-emqx
Use latest version of emqx fork with compilation fix
2024-10-14 12:17:30 +02:00
Richard Carlsson
d42055c7ac Update erlang-rocksdb to use latest patch fixing compilation issues 2024-10-13 14:30:56 +02:00
Richard Carlsson
0ae7ecd41d Use our patched version of the EMQX fork 2024-10-09 13:53:01 +02:00
Richard Carlsson
ece9db2b09
Merge pull request #56 from richcarl/use-emqx-fork
Use emqx fork of erlang-rocksdb
2024-09-09 14:04:40 +02:00
Richard Carlsson
0e4382d5f7 Use emqx fork of erlang-rocksdb 2024-08-23 11:01:46 +02:00
Ulf Wiger
d9d82d7ead Merge branch 'uw-catchup' into 'master'
Fix wrong index preparation in mrdb:index_read_/3

See merge request ioecs/mnesia_rocksdb!1
2024-08-14 09:24:16 +00:00
Ulf Wiger
34285da6d8
Merge pull request #55 from havelkadragan/master
Fix wrong index preparation in mrdb:index_read_/3
2024-08-13 18:21:32 +02:00
crossroad61
cb123ee28a
Merge pull request #1 from aeternity/uw-ix-plugin-tests
Improve tests (fail without Havelka's fix)
2024-08-13 12:45:56 +02:00
Ulf Wiger
996ae82717 Improve tests (fail without Havelka's fix) 2024-08-13 10:36:31 +02:00
dragan
d6b86524fd Fix wrong index preparation in mrdb:index_read_/3
In the case when index is of type {_}, index is returned as is
instead of putting it into a tuple {Ix, ordered}.

An example

Given the following index plugin implementation:
-module(key6).

-record(cgh, {key, val}).

-export([k6/3
        , add_index/0
        , create_table/0
        , tst_bad_type/0
    ]).

k6(_, _, #cgh{key = {Name, K8, _Cid, _Vn}}) ->
    [list_to_binary([atom_to_binary(Name),
     list_to_binary(lists:sublist(K8, 6))])].

add_index() ->
    mnesia_schema:add_index_plugin({k6}, key6, k6).

create_table() ->
    mnesia:create_table(cgh,
    [{attributes, record_info(fields, cgh)},
     {type, ordered_set},
     {index, [{k6}]},
     {rocksdb_copies, [node()]},
     {local_content, true}]).

tst_bad_type() ->
    E1 = #cgh{key = {n1, "u1vgrjkh",{ecgi,238,6,213020,50},3}, val = ok},
    mnesia:dirty_write(E1),
    Index1 = list_to_binary([atom_to_binary(n1),
                             list_to_binary(lists:sublist("u1vgrjkh", 6))]),
    mrdb:index_read(cgh, Index1, {k6}).

-- testing --
Eshell V14.2.2 (press Ctrl+G to abort, type help(). for help)
(rock@bang)2> mnesia:create_schema([node()]).
ok
(rock@bang)3> mnesia:start().
ok
(rock@bang)4> mnesia_rocksdb:register().
{ok,rocksdb_copies}
(rock@bang)5> l(key6).
{module,key6}
(rock@bang)6> key6:add_index().
{atomic,ok}
(rock@bang)7> key6:create_table().
{atomic,ok}
(rock@bang)8> key6:tst_bad_type().
** exception exit: {aborted,{bad_type,{cgh,index,{k6}}}}
     in function  mnesia:abort/1 (mnesia.erl, line 362)
     in call from mrdb:ensure_ref/1 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 609)
     in call from mrdb:index_read_/3 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 945)

-- after patch --

(rock@bang)9> c(mrdb).
Recompiling ~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl
{ok,mrdb}
(rock@bang)10> key6:tst_bad_type().
[{cgh,{n1,"u1vgrjkh",{ecgi,238,6,213020,50},3},ok}]
(rock@bang)11>
2024-08-12 14:32:20 +02:00
Thomas Arts
296813ef7b
Merge pull request #51 from aeternity/ta-rocksdb-1.8.0
Towards version 7.10.2 of rocksdb via version 1.8.0 or erlang_rocksdb
2024-03-19 14:41:50 +01:00
Thomas Arts
16d0533acf Fix dialyzer errors and warnings 2024-03-19 14:08:14 +01:00
Thomas Arts
1d6ae82c6d OTP24-26 2024-03-19 14:08:14 +01:00
Ulf Wiger
71ebaf2d66 New args for rocksdb:transaction_iterator() 2024-03-18 17:40:42 +01:00
Ulf Wiger
f2b6116d31
Merge pull request #47 from aeternity/uw-robustify-tx-test
Add retry for bleed-through check
2023-10-12 18:19:48 +02:00
Ulf Wiger
33ee7929d4
Merge pull request #44 from aeternity/uw-index-metadata
Adding/deleting indexes cleans up index metadata
2023-10-12 18:19:16 +02:00
Ulf Wiger
78669bb8bf
Merge pull request #42 from mariari/mariari/rebar3-compatible-otp-25
Update rebar3 bin to be compatible with OTP 25
2023-10-09 14:47:25 +02:00
Ulf Wiger
346decee6e
Merge pull request #49 from aeternity/ci_matrix
Add CI build matrix for OTP versions
2023-10-09 14:30:47 +02:00
Dincho Todorov
4da0fce567 Add CI build matrix for OTP versions 2023-10-09 13:22:13 +03:00
Ulf Wiger
e7d42f9500 Add retry for bleed-through check 2023-10-05 13:53:13 +02:00
Ulf Wiger
699a7d5920
Merge pull request #29 from aeternity/uw-bump-hut-dep
Change hut dep to 1.4.0
2023-10-05 11:00:53 +01:00
Ulf Wiger
57f02078bc Fix typo 2023-10-05 11:53:41 +02:00
Ulf Wiger
791cec41db Adding/deleting indexes cleans up index metadata 2023-10-05 11:53:41 +02:00
Ulf Wiger
6eff62df6d
Merge pull request #45 from aeternity/uw-ci-improvements
Run multiple OTP vsns in CI
2023-10-05 10:52:54 +01:00
Ulf Wiger
2e430fc5eb remove 'build' sublevel 2023-10-05 11:21:03 +02:00
Ulf Wiger
e76a01f8c4 add ci workflow 2023-10-05 11:12:32 +02:00
Ulf Wiger
4d0a78612a Fix ci config 2023-10-05 11:08:12 +02:00
Ulf Wiger
eeb6aff242 Run multiple OTP vsns in CI 2023-10-05 11:00:49 +02:00
mariari
f11e16e29f Update rebar3 bin to be compatible with OTP 25
This is the same OTP binary from

af2988a5a2
2023-08-31 03:12:12 +08:00
Ulf Wiger
0aecf5ef01
Merge pull request #35 from aeternity/uw-different-mutex
rewrite transaction retry mutex
2022-11-04 10:06:58 +01:00
Ulf Wiger
465a220bfe Update comment about mutex implementation 2022-11-03 13:37:52 +01:00
Ulf Wiger
19140c738b Don't warn for export_all in mrdb_bench 2022-11-03 12:57:54 +01:00
Ulf Wiger
bed66b2998 Remove html docs 2022-11-03 12:54:50 +01:00
Ulf Wiger
3635eac717 Test case for inner retries; add mrdb_stats.erl; update docs 2022-11-03 12:41:14 +01:00
Ulf Wiger
95abe4e36e Mutex server with fifo queues 2022-11-01 10:11:20 +01:00
Ulf Wiger
7c729bd932 Use a serializing mutex 2022-11-01 10:09:48 +01:00
Ulf Wiger
4489e5d743
Merge pull request #34 from aeternity/uw-batch-release
Don't try to release dummy batch ref
2022-10-31 17:01:15 +01:00
Ulf Wiger
d1a6bf22d5 Don't try to release dummy batch ref 2022-10-31 16:48:23 +01:00
Ulf Wiger
b65e82ed71
Merge pull request #33 from aeternity/uw-batch-on-demand
Begin dirty activity with batch ref dummy
2022-10-26 13:16:48 +02:00
Ulf Wiger
ee9e7eac67
Merge pull request #32 from aeternity/uw-push-pop-error
Tx push at retry before mutex instead of after
2022-10-26 13:16:28 +02:00
Ulf Wiger
ce2be519b4 Begin dirty activity with batch ref dummy 2022-10-24 15:15:01 +02:00
Ulf Wiger
8073a0daa5 Tx push at retry before mutex instead of after 2022-10-19 13:19:48 +02:00
Ulf Wiger
ab15b7f399
Merge pull request #30 from aeternity/otp23_builder
Switch to OTP23
2022-08-29 12:28:59 +02:00
Dincho Todorov
a75f6e0c43
Switch to OTP23 2022-08-29 12:40:50 +03:00
Ulf Wiger
1340bb2050 Change hut dep to 1.4.0 2022-08-15 17:16:29 +02:00
Ulf Wiger
d1177b6ad4
Merge pull request #27 from aeternity/uw-accept-repeated-create
Ignore certain mnesia_dumper close_table requests
2022-08-03 09:49:58 +02:00
Ulf Wiger
b908998e6b Check pdict for dumper state at close_table 2022-08-02 17:07:53 +02:00
Ulf Wiger
dfc0125800 More dets-like load/close behavior 2022-07-25 15:33:00 +02:00
Ulf Wiger
296abb23bb Fix double encoding of info data 2022-07-22 16:33:32 +02:00
Ulf Wiger
7057f4dcbd Remove rocksdb opts refault, safer index consistency check 2022-07-22 15:31:55 +02:00
Ulf Wiger
c4235be94a Be more lenient about mnesia asking more than once 2022-07-15 14:14:06 +02:00
18 changed files with 810 additions and 177 deletions

View File

@ -2,15 +2,35 @@ version: 2.1
executors:
aebuilder:
parameters:
otp:
type: string
docker:
- image: aeternity/builder
- image: aeternity/builder:focal-<< parameters.otp >>
user: builder
environment:
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
jobs:
build:
executor: aebuilder
build_and_test:
parameters:
otp:
type: string
executor:
name: aebuilder
otp: << parameters.otp >>
steps:
- checkout
- run: make test
- store_artifacts:
path: _build/test/logs
workflows:
commit:
jobs:
- build_and_test:
matrix:
parameters:
otp: ["otp24", "otp25", "otp26"]

View File

@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
it yourself. If you only need the size occasionally, you may traverse the
table to count the elements.
When `mrdb` transactions abort, they will return a stacktrace caught
from within the transaction fun, giving much better debugging info.
This is different from how mnesia does it.
If behavior closer to mnesia's abort returns are needed, say, for backwards
compatibility, this can be controlled by setting the environment variable
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
For really performance-critical transactions which may abort often, it might
make a difference to set this option to `true`, since there is a cost involved
in producing stacktraces.
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###

View File

@ -2,4 +2,5 @@
{application,mnesia_rocksdb}.
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.

View File

@ -4,8 +4,8 @@
{deps,
[
{sext, "1.8.0"},
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
{hut, "1.3.0"}
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
{hut, "1.4.0"}
]}.
{xref_checks, [
@ -14,6 +14,10 @@
deprecated_function_calls
]}.
{dialyzer, [{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
]}.
{profiles,
[
{test,

View File

@ -1,11 +1,4 @@
%% -*- erlang-mode -*-
case os:getenv("ERLANG_ROCKSDB_OPTS") of
false ->
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
_ ->
%% If manually set, we assume it's throught through
skip
end.
case os:getenv("DEBUG") of
"true" ->
Opts = proplists:get_value(erl_opts, CONFIG, []),

View File

@ -1,15 +1,15 @@
{"1.2.0",
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
{<<"rocksdb">>,
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
{git,"https://github.com/emqx/erlang-rocksdb.git",
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
0},
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
[
{pkg_hash,[
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
{pkg_hash_ext,[
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
].

BIN
rebar3

Binary file not shown.

View File

@ -346,14 +346,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
semantics(_Alias, _) -> undefined.
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
case info(Alias, Tab, {index_consistent, PosInfo}) of
true -> true;
_ -> false
end.
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
when is_boolean(Bool) ->
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
%% PRIVATE FUN
@ -454,8 +451,13 @@ close_table(Alias, Tab) ->
error ->
ok;
_ ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab)
case get(mnesia_dumper_dets) of
undefined ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab);
_ ->
ok
end
end.
close_table_(Alias, Tab) ->
@ -798,7 +800,7 @@ handle_call({create_table, Tab, Props}, _From,
exit:{aborted, Error} ->
{reply, {aborted, Error}, St}
end;
handle_call({load_table, _LoadReason, Props}, _From,
handle_call({load_table, _LoadReason, Props}, _,
#st{alias = Alias, tab = Tab} = St) ->
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{status = active}};
@ -826,7 +828,7 @@ handle_call({delete, Key}, _From, St) ->
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
Res = mrdb:match_delete(get_ref(Tab), Pat),
{reply, Res, St};
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
{reply, ok, St#st{status = undefined}};
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->

View File

@ -33,6 +33,7 @@
, read_info/2 %% (Alias, Tab)
, read_info/4 %% (Alias, Tab, Key, Default)
, write_info/4 %% (Alias, Tab, Key, Value)
, delete_info/3 %% (Alias, Tab, Key)
, write_table_property/3 %% (Alias, Tab, Property)
]).
@ -280,9 +281,20 @@ write_info(Alias, Tab, K, V) ->
write_info_(get_ref({admin, Alias}), Tab, K, V).
write_info_(Ref, Tab, K, V) ->
write_info_encv(Ref, Tab, K, term_to_binary(V)).
write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) ->
delete_info_(get_ref({admin, Alias}), Tab, K).
delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
maybe_delete_standalone_info(Ref, K),
mrdb:rdb_delete(Ref, EncK, []).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
@ -295,6 +307,16 @@ maybe_write_standalone_info(Ref, K, V) ->
ok
end.
maybe_delete_standalone_info(Ref, K) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
rocksdb:delete(DbRef, Key, []);
_ ->
ok
end.
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}).
@ -323,7 +345,6 @@ call(Alias, Req, Timeout) ->
end.
start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
@ -405,12 +426,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
%% We need to store the persistent ref explicitly here,
%% since mnesia knows nothing of our admin table.
AdminTab = {admin, Alias},
Stats = mrdb_stats:new(),
CfI = update_cf_info(AdminTab, #{ status => open
, name => AdminTab
, vsn => ?VSN
, encoding => {sext,{value,term}}
, attr_pos => #{key => 1,
value => 2}
, stats => Stats
, mountpoint => MP
, properties =>
#{ attributes => [key, val]
@ -510,12 +533,17 @@ intersection(A, B) ->
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, St1};
{error, _} = Error ->
{reply, Error, St}
case find_cf(Alias, Name, Backend, St) of
{ok, TRec} ->
{reply, {ok, TRec}, St};
error ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
{error, _} = Error ->
{reply, Error, St}
end
end;
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
try
@ -628,8 +656,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
case {Op, lists:member(I, Index)} of
{delete, true} ->
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
delete_info(Alias, Main, {index_consistent, I}),
maybe_update_pt(Main, CfM1),
update_cf(Alias, Main, CfM1, St);
{create, _} ->
%% Due to a previous bug, this marker might linger
%% In any case, it mustn't be there for a newly created index
delete_info(Alias, Main, {index_consistent, I}),
St;
_ ->
St
end;
@ -640,6 +674,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
maybe_update_main(_, _, _, St) ->
St.
%% The pt may not have been created yet. If so, don't do it here.
maybe_update_pt(Name, Ref) ->
case get_pt(Name, error) of
@ -1223,7 +1258,18 @@ load_info_(Res, I, ARef, Tab) ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_(ARef, Tab, DecK, V);
write_info_encv(ARef, Tab, DecK, V);
<<131,_/binary>> = Value ->
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
try binary_to_term(Value) of
_DecVal ->
%% We haven't been storing erlang-term encoded data as info,
%% so assume this is double-encoded and correct
write_info_encv(ARef, Tab, DecK, Value)
catch
error:_ ->
skip
end;
_ ->
skip
end,
@ -1443,24 +1489,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
%% not yet created
CFs = cfs(CFs0),
file:make_dir(MP),
OpenOpts = [ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ],
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
OpenRes = rocksdb_open(MP, Opts, CFs),
map_cfs(OpenRes, CFs, Alias, Acc0);
false ->
{error, enoent};
true ->
%% Assumption: even an old rocksdb database file will have at least "default"
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
end.
open_opts(Opts) ->
[ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ].
rocksdb_open(MP, Opts, CFs) ->
%% rocksdb:open(MP, Opts, CFs),
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
is_open(Alias, #st{backends = Bs}) ->
case maps:find(Alias, Bs) of

View File

@ -42,4 +42,5 @@ start_link() ->
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.

View File

@ -53,6 +53,7 @@
, delete/2 , delete/3
, delete_object/2, delete_object/3
, match_delete/2
, merge/3 , merge/4
, clear_table/1
, batch_write/2 , batch_write/3
, update_counter/3, update_counter/4
@ -105,6 +106,8 @@
-include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl").
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
-type tab_name() :: atom().
-type alias() :: atom().
-type admin_tab() :: {admin, alias()}.
@ -115,7 +118,9 @@
| index()
| retainer().
-type retries() :: non_neg_integer().
-type inner() :: non_neg_integer().
-type outer() :: non_neg_integer().
-type retries() :: outer() | {inner(), outer()}.
%% activity type 'ets' makes no sense in this context
-type mnesia_activity_type() :: transaction
@ -141,7 +146,7 @@
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := non_neg_integer() }.
, attempt := 'undefined' | retries() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
@ -254,6 +259,14 @@ release_snapshot(SHandle) ->
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
%% the commit set. It will then be re-run again, until the retry count is exhausted.
%%
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
%% attempt, but only on outer retries (the first retry is always an outer retry).
%%
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
%%
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
@ -269,41 +282,92 @@ activity(Type, Alias, F) ->
#{ alias => Alias
, db_ref => DbRef }, TxCtxt);
batch ->
Batch = get_batch_(DbRef),
Batch = init_batch_ref(DbRef),
#{ activity => #{ type => batch
, handle => Batch }
, alias => Alias
, db_ref => DbRef }
end,
do_activity(F, Alias, Ctxt, false).
do_activity(F, Alias, Ctxt).
do_activity(F, Alias, Ctxt, WithLock) ->
try run_f(F, Ctxt, WithLock, Alias) of
Res ->
try commit_and_pop(Res)
catch
throw:{?MODULE, busy} ->
do_activity(F, Alias, Ctxt, true)
end
do_activity(F, Alias, Ctxt) ->
try try_f(F, Ctxt)
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt)
end.
try_f(F, Ctxt) ->
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
try_f(false, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err:T ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult. Incompatible with mnesia, though.
abort_and_pop(Cat, {Err, T})
end;
try_f(true, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult
abort_and_pop(Cat, Err)
end.
run_f(F, Ctxt, false, _) ->
push_ctxt(Ctxt),
F();
run_f(F, Ctxt, true, Alias) ->
mrdb_mutex:do(
Alias,
fun() ->
push_ctxt(incr_attempt(Ctxt)),
F()
end).
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
run_f(F, Ctxt) ->
push_ctxt(Ctxt),
F().
incr_attempt(0, {_,O}) when O > 0 ->
{outer, {0,1}};
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
is_integer(Ri), is_integer(Ro) ->
if I < Ri -> {inner, {I+1, O}};
O < Ro -> {outer, {0, O+1}};
true ->
error
end;
incr_attempt(_, _) ->
error.
retry_activity(F, Alias, #{activity := #{ type := Type
, attempt := A
, retries := R} = Act} = Ctxt) ->
case incr_attempt(A, R) of
{RetryCtxt, A1} ->
Act1 = Act#{attempt := A1},
Ctxt1 = Ctxt#{activity := Act1},
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt1)
end;
error ->
return_abort(Type, error, retry_limit)
end.
retry_activity_(inner, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, inner_retries, 1),
try_f(F, Ctxt);
retry_activity_(outer, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, outer_retries, 1),
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Act1 = Act#{attempt := A+1, handle := TxH},
Act1 = Act#{handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of
true ->
@ -368,7 +432,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
check_tx_opts(Opts) ->
case maps:without([no_snapshot, retries], Opts) of
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
Other when map_size(Other) > 0 ->
abort({invalid_tx_opts, maps:keys(Other)});
_ ->
@ -376,10 +440,14 @@ check_tx_opts(Opts) ->
end.
check_retries(#{retries := Retries} = Opts) ->
if is_integer(Retries), Retries >= 0 ->
Opts;
true ->
error({invalid_tx_option, {retries, Retries}})
case Retries of
_ when is_integer(Retries), Retries >= 0 ->
Opts#{retries := {0, Retries}};
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
Inner >= 0, Outer >= 0 ->
Opts;
_ ->
error({invalid_tx_option, {retries, Retries}})
end.
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
@ -394,7 +462,7 @@ create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 1})}.
, attempt => 0 })}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of
@ -414,8 +482,7 @@ commit_and_pop(Res) ->
Res;
{error, {error, "Resource busy" ++ _ = Busy}} ->
case A of
#{retries := Retries, attempt := Att}
when Att =< Retries ->
#{retries := {I,O}} when I > 0; O > 0 ->
throw({?MODULE, busy});
_ ->
error({error, Busy})
@ -477,6 +544,11 @@ re_throw(Cat, Err) ->
mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
Bool;
mnesia_compatible_aborts(_) ->
mnesia_compatible_aborts().
fix_error({aborted, Err}) ->
Err;
fix_error(Err) ->
@ -507,10 +579,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
pop_ctxt()
end.
rdb_release_batch(?BATCH_REF_DUMMY) ->
ok;
rdb_release_batch(H) ->
rocksdb:release_batch(H).
%% @doc Aborts an ongoing {@link activity/2}
-spec abort(_) -> no_return().
abort(Reason) ->
case mnesia_compatible_aborts() of
true ->
@ -529,7 +604,7 @@ new_tx(#{activity := _}, _) ->
new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) ->
@ -539,7 +614,7 @@ tx_ref(Tab, TxH) ->
#{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH});
R ->
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
end.
-spec tx_commit(tx_handle() | db_ref()) -> ok.
@ -666,6 +741,21 @@ insert(Tab, Obj0, Opts) ->
EncVal = encode_val(Obj, Ref),
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
merge(Tab, Key, MergeOp) ->
merge(Tab, Key, MergeOp, []).
merge(Tab, Key, MergeOp, Opts) ->
#{encoding := Enc} = Ref = ensure_ref(Tab),
case Enc of
{_, {value, term}} ->
merge_(Ref, Key, MergeOp, Opts);
_ ->
abort(badarg)
end.
merge_(Ref, Key, MergeOp, Opts) ->
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
validate_obj(Obj, #{mode := mnesia}) ->
Obj;
validate_obj(Obj, #{attr_pos := AP,
@ -889,7 +979,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
_ when is_atom(Ix) ->
{attr_pos(Ix, Ref), ordered};
{_} ->
Ix;
{Ix, ordered};
_ when is_integer(Ix) ->
{Ix, ordered}
end,
@ -1010,18 +1100,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
get_batch(_) ->
{error, badarg}.
get_batch_(DbRef) ->
init_batch_ref(DbRef) ->
Ref = make_ref(),
{ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
Ref.
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
get_batch_(DbRef, BatchRef) ->
Key = batch_ref_key(BatchRef),
case pdict_get(Key) of
undefined ->
error(stale_batch_ref);
#{DbRef := Batch} ->
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
Batch;
Map ->
{ok, Batch} = rdb_batch(),
@ -1050,7 +1142,9 @@ write_batches(BatchRef, Opts) ->
pdict_erase(Key),
ret_batch_write_acc(
maps:fold(
fun(DbRef, Batch, Acc) ->
fun(_, ?BATCH_REF_DUMMY, Acc) ->
Acc;
(DbRef, Batch, Acc) ->
case rocksdb:write_batch(DbRef, Batch, Opts) of
ok ->
Acc;
@ -1494,8 +1588,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
rdb_iterator(R, Opts) ->
rdb_iterator_(R, read_opts(R, Opts)).
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
%% Note: versions before 1.8.0 expected DbRef as first argument
rocksdb:transaction_iterator(TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
rocksdb:iterator(DbRef, CfH, ROpts).

View File

@ -6,11 +6,14 @@
, iterator_move/2
, iterator/2
, iterator_close/1
, fold/4
, rev_fold/4
, index_ref/2
]).
-record(mrdb_ix_iter, { i :: mrdb:iterator()
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
, type = set :: set | bag
, sub :: mrdb:ref() | pid()
, sub :: pid() | mrdb:db_ref()
}).
-type ix_iterator() :: #mrdb_ix_iter{}.
@ -19,7 +22,7 @@
-type object() :: tuple().
-record(subst, { i :: mrdb:iterator()
-record(subst, { i :: mrdb:mrdb_iterator()
, vals_f
, cur
, mref }).
@ -62,6 +65,41 @@ iterator(Tab, IxPos) ->
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Folds over the index table corresponding to Tab and IxPos.
%% The fold fun is called with the index key and the corresponding object,
%% or [] if there is no such object.
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, first, next, FoldFun, Acc).
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Like fold/4 above, but folds over the index in the reverse order.
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
with_iterator(
Tab, IxPos,
fun(I) ->
iter_fold(I, Start, Dir, FoldFun, Acc)
end).
iter_fold(I, Start, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
iter_fold_({error, _}, _, _, _, Acc) ->
Acc.
index_ref(Tab, Pos) ->
TRef = mrdb:ensure_ref(Tab),
ensure_index_ref(Pos, TRef).
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} ->
@ -83,6 +121,7 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
Other
end.
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
opt_read(R, Key) ->
case mrdb:read(R, Key, []) of
[Obj] ->

View File

@ -3,79 +3,109 @@
-export([ do/2 ]).
-export([ ensure_tab/0 ]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
%% critical section.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
%% Releasing the resource is done by notifying the server.
%%
%% Previous implementations tested:
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
%% This is a pretty simple implementation, but it lacks ordering, and appeared
%% to sometimes lead to starvation.
%% * A duplicate_bag ets table: These are defined such that insertion order is
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
%% Unfortunately, performance plummeted when tested with > 25 concurrent
%% processes. While this is likely a very high number, given that we're talking
%% about contention in a system using optimistic concurrency, it's never good
%% if performance falls off a cliff.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
try F()
after
release(Rsrc)
mrdb_mutex_serializer:done(Rsrc, Ref)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
-ifdef(TEST).
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
setup() ->
case whereis(mrdb_mutex_serializer) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
cleanup(Pid) ->
unlink(Pid),
exit(Pid, kill).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
swarm_do() ->
Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() ->
send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,1000)],
await_pids(Pids),
Results = fetch(Pid),
{incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
Pid ! {self(), result, N};
false ->
exit(not_even)
end
end).
-endif.

View File

@ -0,0 +1,98 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.

74
src/mrdb_stats.erl Normal file
View File

@ -0,0 +1,74 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Statistics API for the mnesia_rocksdb plugin
%%
%% Some counters are maintained for each active alias. Currently, the following
%% counters are supported:
%% * inner_retries
%% * outer_retries
%%
-module(mrdb_stats).
-export([new/0]).
-export([incr/3,
get/1,
get/2]).
-type alias() :: mnesia_rocksdb:alias().
-type db_ref() :: mrdb:db_ref().
-type counter() :: atom().
-type increment() :: integer().
-type counters() :: #{ counter() := integer() }.
new() ->
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
, meta => ctr_meta()}.
ctr_meta() ->
#{ inner_retries => 1
, outer_retries => 2 }.
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
%%
%% Note that the first argument may also be a `db_ref()' map,
%% corresponding to `mrdb:get_ref({admin, Alias})'.
%% @end
incr(Alias, Ctr, N) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
incr_(Ref, Meta, Ctr, N);
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
incr_(Ref, Meta, Ctr, N).
-spec get(alias() | db_ref(), counter()) -> integer().
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
%% @end
get(Alias, Ctr) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
get_(Ref, Meta, Ctr);
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
get_(Ref, Meta, Ctr).
-spec get(alias() | db_ref()) -> counters().
%% @doc Fetches all known counters for `Alias', in the form of a map,
%% `#{Counter => Value}'.
%% @end
get(Alias) when is_atom(Alias) ->
get_(mrdb:get_ref({admin, Alias}));
get(Ref) when is_map(Ref) ->
get_(Ref).
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
lists:foldl(
fun({K, P}, M) ->
M#{K := counters:get(Ref, P)}
end, Meta, maps:to_list(Meta)).
get_(Ref, Meta, Attr) ->
Ix = maps:get(Attr, Meta),
counters:get(Ref, Ix).
incr_(Ref, Meta, Attr, N) ->
Ix = maps:get(Attr, Meta),
counters:add(Ref, Ix, N).

View File

@ -24,8 +24,12 @@
, mrdb_abort/1
, mrdb_two_procs/1
, mrdb_two_procs_tx_restart/1
, mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1
, mrdb_three_procs/1
, create_counters/1
, update_counters/1
, restart_node/1
]).
-include_lib("common_test/include/ct.hrl").
@ -41,7 +45,8 @@ all() ->
groups() ->
[
{all_tests, [sequence], [ {group, checks}
, {group, mrdb} ]}
, {group, mrdb}
, {group, counters}]}
%% , error_handling ]}
, {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary
@ -53,8 +58,13 @@ groups() ->
, mrdb_abort
, mrdb_two_procs
, mrdb_two_procs_tx_restart
, mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap
, mrdb_three_procs ]}
, {counters, [sequence], [ create_counters
, update_counters
, restart_node
, update_counters ]}
].
@ -267,7 +277,7 @@ mrdb_abort(Config) ->
Pre = mrdb:read(tx_abort, a),
D0 = get_dict(),
TRes = try mrdb:activity(
tx, rdb,
{tx, #{mnesia_compatible => true}}, rdb,
fun() ->
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
error(abort_here),
@ -287,10 +297,17 @@ mrdb_abort(Config) ->
mrdb_two_procs(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
tr_flags(
{self(), [call, sos, p]},
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) ->
@ -314,11 +331,16 @@ mrdb_two_procs_(Config) ->
Pre = mrdb:read(R, a),
go_ahead_other(POther),
await_other_down(POther, MRef, ?LINE),
%% The test proc is still inside the transaction, and POther
%% has terminated/committed. If we now read 'a', we should see
%% the value that POther wrote (no isolation).
[{R, a, 17}] = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
%% Our transaction should fail due to the resource conflict, and because
%% we're not using retries.
try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of
ok -> error(unexpected)
@ -339,6 +361,7 @@ mrdb_two_procs_tx_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
check_stats(rdb),
mrdb:insert(R, {R, a, 1}),
Pre = mrdb:read(R, a),
F0 = fun() ->
@ -354,7 +377,7 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = [{R, a, 17}],
Att = get_attempt(),
Expected = case Att of
1 -> Pre;
0 -> Pre;
_ -> OtherWrite
end,
Expected = mrdb:read(R, a),
@ -363,14 +386,104 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
check_stats(rdb),
[{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created),
ok.
mrdb_two_procs_tx_inner_restart(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
dbg_tr_opts()).
mrdb_two_procs_tx_inner_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
mrdb:insert(R, {R, a, 1}),
mrdb:insert(R, {R, b, 1}),
PreA = mrdb:read(R, a),
PreB = mrdb:read(R, b),
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
F0 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, a, 17}),
wait_for_other(Parent, ?LINE)
end,
F1 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, b, 147}),
wait_for_other(Parent, ?LINE)
end,
Spawn = fun(F) ->
Res = spawn_opt(
fun() ->
ok = mrdb:activity(tx, rdb, F)
end, [monitor]),
Res
end,
{P1, MRef1} = Spawn(F0),
{P2, MRef2} = Spawn(F1),
F2 = fun() ->
PostA = [{R, a, 17}], % once F0 (P1) has committed
PostB = [{R, b, 147}], % once F1 (P2) has committed
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
BRes = mrdb:read(R, b),
Att = get_attempt(),
ct:log("Att = ~p", [Att]),
case Att of
0 -> % This is the first run (no retry yet)
ARes = PreA,
BRes = PreB,
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
await_other_down(P1, MRef1, ?LINE),
PostA = mrdb:read(R, a); % now, P1's write should leak through here
{0, 1} -> % This is the first (outer) retry
go_ahead_other(0, P2), % Let P2 write
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
await_other_down(P2, MRef2, ?LINE),
PostA = mrdb:read(R, a), % now we should see writes from both P1
%% On OTP 23, the following step might fail due to timing, even
%% though the trace output looks as expected. Possibly some quirk
%% with leakage propagation time in rocksdb. Let's retry to be sure.
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
{1, 1} ->
PostA = mrdb:read(R, a),
PostB = mrdb:read(R, b),
ok
end,
mrdb:insert(R, {R, a, 18}),
mrdb:insert(R, {R, b, 18})
end,
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
check_stats(rdb),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b),
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
delete_tabs(Created),
ok.
try_until(Result, F) ->
try_until(Result, F, 10).
try_until(Result, F, N) when N > 0 ->
case F() of
Result ->
ok;
_ ->
receive after 100 -> ok end,
try_until(Result, F, N-1)
end;
try_until(Result, F, _) ->
error({badmatch, {Result, F}}).
%
%% For testing purposes, we use side-effects inside the transactions
@ -383,7 +496,7 @@ mrdb_two_procs_tx_restart_(Config) ->
%% attempt, and ignore the sync ops on retries.
%%
-define(IF_FIRST(N, Expr),
if N == 1 ->
if N == 0 ->
Expr;
true ->
ok
@ -413,8 +526,8 @@ mrdb_two_procs_snap(Config) ->
go_ahead_other(Att, POther),
ARes = mrdb:read(R, a),
ARes = case Att of
1 -> Pre;
2 -> [{R, a, 17}]
0 -> Pre;
_ -> [{R, a, 17}]
end,
await_other_down(POther, MRef, ?LINE),
PreB = mrdb:read(R, b),
@ -434,7 +547,7 @@ mrdb_two_procs_snap(Config) ->
%% We make sure that P2 commits before finishing the other two, and P3 and the
%% main thread sync, so as to maximize the contention for the retry lock.
mrdb_three_procs(Config) ->
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
mrdb_three_procs_(Config) ->
R = ?FUNCTION_NAME,
@ -452,7 +565,7 @@ mrdb_three_procs_(Config) ->
spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows(
1, Parent, ?LINE,
0, Parent, ?LINE,
fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end),
@ -488,8 +601,8 @@ mrdb_three_procs_(Config) ->
fun() ->
Att = get_attempt(),
ARes = case Att of
1 -> [A0];
2 -> [A1]
0 -> [A0];
_ -> [A1]
end,
%% First, ensure that P2 tx is running
go_ahead_other(Att, P2),
@ -519,6 +632,7 @@ tr_opts() ->
, {?MODULE, await_other_down, 3, x}
, {?MODULE, do_when_p_allows, 4, x}
, {?MODULE, allow_p, 3, x}
, {?MODULE, check_stats, 1, x}
]}.
light_tr_opts() ->
@ -529,6 +643,23 @@ light_tr_opts() ->
, {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts())).
dbg_tr_opts() ->
tr_flags(
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb, current_context, 0, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {?MODULE, try_until, 3, x}
, {mrdb, activity, x} ], tr_opts())).
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
Opts#{patterns => Ps ++ Pats1}.
@ -542,12 +673,13 @@ wait_for_other(Parent, L) ->
wait_for_other(Att, Parent, L) ->
wait_for_other(Att, Parent, 1000, L).
wait_for_other(1, Parent, Timeout, L) ->
wait_for_other(0, Parent, Timeout, L) ->
MRef = monitor(process, Parent),
Parent ! {self(), ready},
receive
{Parent, cont} ->
demonitor(MRef),
Parent ! {self(), cont_ack},
ok;
{'DOWN', MRef, _, _, Reason} ->
ct:log("Parent died, Reason = ~p", [Reason]),
@ -586,7 +718,13 @@ go_ahead_other(Att, POther, Timeout) ->
go_ahead_other_(POther, Timeout) ->
receive
{POther, ready} ->
POther ! {self(), cont}
POther ! {self(), cont},
receive
{POther, cont_ack} ->
ok
after Timeout ->
error(cont_ack_timeout)
end
after Timeout ->
error(go_ahead_timeout)
end.
@ -619,6 +757,35 @@ get_attempt() ->
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt.
create_counters(_Config) ->
create_tab(counters, []),
mrdb:insert(counters, {counters, c0, 1}),
mrdb:update_counter(counters, c1, 1),
[{counters, c0, 1}] = mrdb:read(counters, c0),
[{counters, c1, 1}] = mrdb:read(counters, c1),
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
ok.
restart_node(_Config) ->
mnesia:stop(),
ok = mnesia:start(),
ct:log("mnesia restarted", []),
ok.
update_counters(_Config) ->
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
ok = mrdb:update_counter(counters, c0, 1),
ok = mrdb:update_counter(counters, c1, 1),
ct:log("Incremented c0 and c1 by 1", []),
C0 = C0Prev + 1,
C1 = C1Prev + 1,
[{counters, c0, C0}] = mrdb:read(counters, c0),
[{counters, c1, C1}] = mrdb:read(counters, c1),
ct:log("c0: ~p, c1: ~p", [C0, C1]),
ok.
create_tabs(Tabs, Config) ->
Res = lists:map(fun create_tab/1, Tabs),
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
@ -645,3 +812,8 @@ dictionary_unchanged(Old) ->
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.
check_stats(Alias) ->
Stats = mrdb_stats:get(Alias),
ct:log("Stats: ~p", [Stats]),
Stats.

View File

@ -33,6 +33,7 @@
-export([
index_plugin_mgmt/1
, add_indexes/1
, delete_indexes/1
, create_bag_index/1
, create_ordered_index/1
, test_1_ram_copies/1
@ -40,10 +41,12 @@
, fail_1_disc_only/1
, plugin_ram_copies1/1
, plugin_ram_copies2/1
, plugin_ram_copies3/1
, plugin_disc_copies/1
, fail_plugin_disc_only/1
, plugin_disc_copies_bag/1
, plugin_rdb_ordered/1
, plugin_rdb_none/1
, index_iterator/1
]).
@ -73,10 +76,12 @@ run(Config) ->
{pfx},mnesia_rocksdb, ix_prefixes),
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
test_index_plugin(cfg([pl3, rdb, none], Config)),
index_plugin_mgmt(Config),
ok.
@ -94,6 +99,7 @@ groups() ->
, create_ordered_index
, index_plugin_mgmt
, add_indexes
, delete_indexes
]}
, {access, [sequence], [
test_1_ram_copies
@ -104,10 +110,12 @@ groups() ->
, {plugin, [sequence], [
plugin_ram_copies1
, plugin_ram_copies2
, plugin_ram_copies3
, plugin_disc_copies
, fail_plugin_disc_only
, plugin_disc_copies_bag
, plugin_rdb_ordered
, plugin_rdb_none
]}
].
@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
test(N, Type, T) ->
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
{attributes,[k,a,b,c]},
@ -204,7 +215,7 @@ add_del_indexes() ->
test_index_plugin(Config) ->
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
{index, [{{pfx}, IxType}]}]),
{index, [ixtype(IxType)]}]),
mnesia:dirty_write({Tab, "foobar", "sentence"}),
mnesia:dirty_write({Tab, "yellow", "sensor"}),
mnesia:dirty_write({Tab, "truth", "white"}),
@ -216,13 +227,27 @@ test_index_plugin(Config) ->
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx});
IxType == ordered ->
IxType == ordered; IxType == none ->
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx})
end,
if Type == rdb ->
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mrdb:index_read(
Tab, <<"foo">>, {pfx});
true ->
ok
end.
ixtype(T) when T==bag;
T==ordered ->
{{pfx}, T};
ixtype(none) ->
{pfx}.
create_bag_index(_Config) ->
{aborted, {combine_error, _, _}} =
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
@ -239,6 +264,25 @@ add_indexes(_Config) ->
{atomic, ok} = mnesia:add_table_index(T, a),
ok.
delete_indexes(_Config) ->
T = ?TAB(t1),
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
ok = mrdb:insert(T, {T, a, 1, 1}),
ok = mrdb:insert(T, {T, b, 1, 2}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
{atomic, ok} = mnesia:del_table_index(T, a),
ok = mrdb:delete(T, b),
ok = mrdb:insert(T, {T, c, 2, 3}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
ok.
ix_fold_acc(K, V, Acc) ->
[{K, V} | Acc].
index_plugin_mgmt(_Config) ->
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),

View File

@ -1,6 +1,6 @@
-module(mrdb_bench).
-compile(export_all).
-compile([export_all, nowarn_export_all]).
init() ->
mnesia:delete_schema([node()]),