71 Commits

Author SHA1 Message Date
Ulf Wiger ce0c09aec6 rocksdb pr #55 merged into master 2026-04-14 14:57:00 +02:00
Ulf Wiger 6c1a343c82 Improve mnesia-compat abort returns, use latest rocksdb 2026-04-12 14:47:00 +02:00
uwiger 6246f3f2e1 Merge pull request 'Add mrdb:ms/2 for match spec production' (#9) from uw-mrdb-ms into master
Reviewed-on: #9
2025-10-11 02:22:24 +09:00
Ulf Wiger c6969238c7 Add mrdb:ms/2 for match spec production 2025-10-10 14:01:25 +02:00
uwiger d6a4dca5f6 Merge pull request 'Fix dialyzer issues (also for test)' (#8) from uw-dialyzer-issues into master
Reviewed-on: #8
2025-08-05 20:06:05 +09:00
Ulf Wiger b97b727605 Fix dialyzer issues (also for test) 2025-08-01 21:14:56 +02:00
uwiger e0542f7f52 Merge pull request 'Add mrdb_index:select() et al' (#7) from uw-index-select into master
Reviewed-on: #7
2025-07-09 14:39:52 +09:00
Ulf Wiger 13ccdb373a Fix pattern-matching index select 2025-07-04 16:43:51 +02:00
Ulf Wiger 29d5d6f170 Add mrdb_index:select() et al 2025-07-02 14:36:38 +02:00
uwiger 4e3c9e83c8 Merge pull request 'Add mrdb:fold_reverse(), mrdb:select_reverse() etc. Remove vsn 1 support.' (#6) from uw-fold_reverse into master
Reviewed-on: #6
2025-06-25 03:13:52 +09:00
Ulf Wiger f9352b6ff0 Remove accidental unicode NBSP char 2025-06-19 20:18:52 +02:00
Ulf Wiger 04e6063ce0 Tests for fold, fold_reverse, select_reverse, etc. 2025-06-19 20:12:06 +02:00
Ulf Wiger 323fcea7a7 Remove legacy support for vsn 1 standalone tabs 2025-06-19 20:12:06 +02:00
Ulf Wiger d8b6ab788e Fix prefixed rdb_fold, add mrdb:rdb_rev_fold() 2025-06-19 20:12:06 +02:00
uwiger 318f84bbaf Merge pull request 'Ensure the erlang_merge_operator is always loaded' (#5) from uw-merge-operator into master
Reviewed-on: #5
2025-06-17 18:05:58 +09:00
Ulf Wiger 0691d8b055 Ensure the erlang_merge_operator is always loaded 2025-06-15 20:43:36 +02:00
uwiger ac69c8564f Merge pull request 'Consistent support for merge ops' (#4) from uw-merge-ops into master
Reviewed-on: #4
2025-06-16 03:41:44 +09:00
Ulf Wiger 2ca7f36eb1 Consistent support for merge ops 2025-06-02 20:07:05 +02:00
Ulf Wiger 56d78768d9 Produce stacktraces unless mnesia_compatible 2025-03-23 21:06:21 +01:00
zxq9 a768d8008f Merge remote-tracking branch 'ae/master' into emqx 2024-10-17 10:54:42 +09:00
Richard Carlsson 9cea41cb9a Merge pull request #57 from richcarl/use-fixed-emqx
Use latest version of emqx fork with compilation fix
2024-10-14 12:17:30 +02:00
Richard Carlsson d42055c7ac Update erlang-rocksdb to use latest patch fixing compilation issues 2024-10-13 14:30:56 +02:00
Richard Carlsson 0ae7ecd41d Use our patched version of the EMQX fork 2024-10-09 13:53:01 +02:00
Richard Carlsson ece9db2b09 Merge pull request #56 from richcarl/use-emqx-fork
Use emqx fork of erlang-rocksdb
2024-09-09 14:04:40 +02:00
Richard Carlsson 0e4382d5f7 Use emqx fork of erlang-rocksdb 2024-08-23 11:01:46 +02:00
Ulf Wiger d9d82d7ead Merge branch 'uw-catchup' into 'master'
Fix wrong index preparation in mrdb:index_read_/3

See merge request ioecs/mnesia_rocksdb!1
2024-08-14 09:24:16 +00:00
Ulf Wiger 34285da6d8 Merge pull request #55 from havelkadragan/master
Fix wrong index preparation in mrdb:index_read_/3
2024-08-13 18:21:32 +02:00
crossroad61 cb123ee28a Merge pull request #1 from aeternity/uw-ix-plugin-tests
Improve tests (fail without Havelka's fix)
2024-08-13 12:45:56 +02:00
Ulf Wiger 996ae82717 Improve tests (fail without Havelka's fix) 2024-08-13 10:36:31 +02:00
dragan d6b86524fd Fix wrong index preparation in mrdb:index_read_/3
In the case when index is of type {_}, index is returned as is
instead of putting it into a tuple {Ix, ordered}.

An example

Given the following index plugin implementation:
-module(key6).

-record(cgh, {key, val}).

-export([k6/3
        , add_index/0
        , create_table/0
        , tst_bad_type/0
    ]).

k6(_, _, #cgh{key = {Name, K8, _Cid, _Vn}}) ->
    [list_to_binary([atom_to_binary(Name),
     list_to_binary(lists:sublist(K8, 6))])].

add_index() ->
    mnesia_schema:add_index_plugin({k6}, key6, k6).

create_table() ->
    mnesia:create_table(cgh,
    [{attributes, record_info(fields, cgh)},
     {type, ordered_set},
     {index, [{k6}]},
     {rocksdb_copies, [node()]},
     {local_content, true}]).

tst_bad_type() ->
    E1 = #cgh{key = {n1, "u1vgrjkh",{ecgi,238,6,213020,50},3}, val = ok},
    mnesia:dirty_write(E1),
    Index1 = list_to_binary([atom_to_binary(n1),
                             list_to_binary(lists:sublist("u1vgrjkh", 6))]),
    mrdb:index_read(cgh, Index1, {k6}).

-- testing --
Eshell V14.2.2 (press Ctrl+G to abort, type help(). for help)
(rock@bang)2> mnesia:create_schema([node()]).
ok
(rock@bang)3> mnesia:start().
ok
(rock@bang)4> mnesia_rocksdb:register().
{ok,rocksdb_copies}
(rock@bang)5> l(key6).
{module,key6}
(rock@bang)6> key6:add_index().
{atomic,ok}
(rock@bang)7> key6:create_table().
{atomic,ok}
(rock@bang)8> key6:tst_bad_type().
** exception exit: {aborted,{bad_type,{cgh,index,{k6}}}}
     in function  mnesia:abort/1 (mnesia.erl, line 362)
     in call from mrdb:ensure_ref/1 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 609)
     in call from mrdb:index_read_/3 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 945)

-- after patch --

(rock@bang)9> c(mrdb).
Recompiling ~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl
{ok,mrdb}
(rock@bang)10> key6:tst_bad_type().
[{cgh,{n1,"u1vgrjkh",{ecgi,238,6,213020,50},3},ok}]
(rock@bang)11>
2024-08-12 14:32:20 +02:00
Thomas Arts 296813ef7b Merge pull request #51 from aeternity/ta-rocksdb-1.8.0
Towards version 7.10.2 of rocksdb via version 1.8.0 or erlang_rocksdb
2024-03-19 14:41:50 +01:00
Thomas Arts 16d0533acf Fix dialyzer errors and warnings 2024-03-19 14:08:14 +01:00
Thomas Arts 1d6ae82c6d OTP24-26 2024-03-19 14:08:14 +01:00
Ulf Wiger 71ebaf2d66 New args for rocksdb:transaction_iterator() 2024-03-18 17:40:42 +01:00
Ulf Wiger f2b6116d31 Merge pull request #47 from aeternity/uw-robustify-tx-test
Add retry for bleed-through check
2023-10-12 18:19:48 +02:00
Ulf Wiger 33ee7929d4 Merge pull request #44 from aeternity/uw-index-metadata
Adding/deleting indexes cleans up index metadata
2023-10-12 18:19:16 +02:00
Ulf Wiger 78669bb8bf Merge pull request #42 from mariari/mariari/rebar3-compatible-otp-25
Update rebar3 bin to be compatible with OTP 25
2023-10-09 14:47:25 +02:00
Ulf Wiger 346decee6e Merge pull request #49 from aeternity/ci_matrix
Add CI build matrix for OTP versions
2023-10-09 14:30:47 +02:00
Dincho Todorov 4da0fce567 Add CI build matrix for OTP versions 2023-10-09 13:22:13 +03:00
Ulf Wiger e7d42f9500 Add retry for bleed-through check 2023-10-05 13:53:13 +02:00
Ulf Wiger 699a7d5920 Merge pull request #29 from aeternity/uw-bump-hut-dep
Change hut dep to 1.4.0
2023-10-05 11:00:53 +01:00
Ulf Wiger 57f02078bc Fix typo 2023-10-05 11:53:41 +02:00
Ulf Wiger 791cec41db Adding/deleting indexes cleans up index metadata 2023-10-05 11:53:41 +02:00
Ulf Wiger 6eff62df6d Merge pull request #45 from aeternity/uw-ci-improvements
Run multiple OTP vsns in CI
2023-10-05 10:52:54 +01:00
Ulf Wiger 2e430fc5eb remove 'build' sublevel 2023-10-05 11:21:03 +02:00
Ulf Wiger e76a01f8c4 add ci workflow 2023-10-05 11:12:32 +02:00
Ulf Wiger 4d0a78612a Fix ci config 2023-10-05 11:08:12 +02:00
Ulf Wiger eeb6aff242 Run multiple OTP vsns in CI 2023-10-05 11:00:49 +02:00
mariari f11e16e29f Update rebar3 bin to be compatible with OTP 25
This is the same OTP binary from

https://gitlab.com/vans/erlang-rocksdb/-/commit/af2988a5a27393646de1bb35cfc3644ba4b651b0
2023-08-31 03:12:12 +08:00
Ulf Wiger 0aecf5ef01 Merge pull request #35 from aeternity/uw-different-mutex
rewrite transaction retry mutex
2022-11-04 10:06:58 +01:00
Ulf Wiger 465a220bfe Update comment about mutex implementation 2022-11-03 13:37:52 +01:00
Ulf Wiger 19140c738b Don't warn for export_all in mrdb_bench 2022-11-03 12:57:54 +01:00
Ulf Wiger bed66b2998 Remove html docs 2022-11-03 12:54:50 +01:00
Ulf Wiger 3635eac717 Test case for inner retries; add mrdb_stats.erl; update docs 2022-11-03 12:41:14 +01:00
Ulf Wiger 95abe4e36e Mutex server with fifo queues 2022-11-01 10:11:20 +01:00
Ulf Wiger 7c729bd932 Use a serializing mutex 2022-11-01 10:09:48 +01:00
Ulf Wiger 4489e5d743 Merge pull request #34 from aeternity/uw-batch-release
Don't try to release dummy batch ref
2022-10-31 17:01:15 +01:00
Ulf Wiger d1a6bf22d5 Don't try to release dummy batch ref 2022-10-31 16:48:23 +01:00
Ulf Wiger b65e82ed71 Merge pull request #33 from aeternity/uw-batch-on-demand
Begin dirty activity with batch ref dummy
2022-10-26 13:16:48 +02:00
Ulf Wiger ee9e7eac67 Merge pull request #32 from aeternity/uw-push-pop-error
Tx push at retry before mutex instead of after
2022-10-26 13:16:28 +02:00
Ulf Wiger ce2be519b4 Begin dirty activity with batch ref dummy 2022-10-24 15:15:01 +02:00
Ulf Wiger 8073a0daa5 Tx push at retry before mutex instead of after 2022-10-19 13:19:48 +02:00
Ulf Wiger ab15b7f399 Merge pull request #30 from aeternity/otp23_builder
Switch to OTP23
2022-08-29 12:28:59 +02:00
Dincho Todorov a75f6e0c43 Switch to OTP23 2022-08-29 12:40:50 +03:00
Ulf Wiger 1340bb2050 Change hut dep to 1.4.0 2022-08-15 17:16:29 +02:00
Ulf Wiger d1177b6ad4 Merge pull request #27 from aeternity/uw-accept-repeated-create
Ignore certain mnesia_dumper close_table requests
2022-08-03 09:49:58 +02:00
Ulf Wiger b908998e6b Check pdict for dumper state at close_table 2022-08-02 17:07:53 +02:00
Ulf Wiger dfc0125800 More dets-like load/close behavior 2022-07-25 15:33:00 +02:00
Ulf Wiger 296abb23bb Fix double encoding of info data 2022-07-22 16:33:32 +02:00
Ulf Wiger 7057f4dcbd Remove rocksdb opts refault, safer index consistency check 2022-07-22 15:31:55 +02:00
Ulf Wiger c4235be94a Be more lenient about mnesia asking more than once 2022-07-15 14:14:06 +02:00
28 changed files with 1453 additions and 615 deletions
+23 -3
View File
@@ -2,15 +2,35 @@ version: 2.1
executors:
aebuilder:
parameters:
otp:
type: string
docker:
- image: aeternity/builder
- image: aeternity/builder:focal-<< parameters.otp >>
user: builder
environment:
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
jobs:
build:
executor: aebuilder
build_and_test:
parameters:
otp:
type: string
executor:
name: aebuilder
otp: << parameters.otp >>
steps:
- checkout
- run: make test
- store_artifacts:
path: _build/test/logs
workflows:
commit:
jobs:
- build_and_test:
matrix:
parameters:
otp: ["otp24", "otp25", "otp26"]
+12
View File
@@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
it yourself. If you only need the size occasionally, you may traverse the
table to count the elements.
When `mrdb` transactions abort, they will return a stacktrace caught
from within the transaction fun, giving much better debugging info.
This is different from how mnesia does it.
If behavior closer to mnesia's abort returns are needed, say, for backwards
compatibility, this can be controlled by setting the environment variable
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
For really performance-critical transactions which may abort often, it might
make a difference to set this option to `true`, since there is a cost involved
in producing stacktraces.
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
+2 -1
View File
@@ -2,4 +2,5 @@
{application,mnesia_rocksdb}.
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
+10 -3
View File
@@ -4,8 +4,8 @@
{deps,
[
{sext, "1.8.0"},
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
{hut, "1.3.0"}
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref, "09df04e"}}},
{hut, "1.4.0"}
]}.
{xref_checks, [
@@ -14,6 +14,10 @@
deprecated_function_calls
]}.
{dialyzer, [{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
]}.
{profiles,
[
{test,
@@ -22,7 +26,10 @@
, {meck, "0.9.2"}
, {trace_runner, {git, "https://github.com/uwiger/trace_runner.git",
{ref, "2e56677"}}}
]}
]},
{dialyzer, [{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, mnesia, runtime_tools, eunit,
proper, trace_runner, common_test]}]}
]},
{edown,
%% Use as `rebar3 as edown do edoc`
-7
View File
@@ -1,11 +1,4 @@
%% -*- erlang-mode -*-
case os:getenv("ERLANG_ROCKSDB_OPTS") of
false ->
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
_ ->
%% If manually set, we assume it's throught through
skip
end.
case os:getenv("DEBUG") of
"true" ->
Opts = proplists:get_value(erl_opts, CONFIG, []),
+5 -5
View File
@@ -1,15 +1,15 @@
{"1.2.0",
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
{<<"rocksdb">>,
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
{git,"https://github.com/emqx/erlang-rocksdb.git",
{ref,"09df04e02f250a5075bb93cafdd3b637cab81d96"}},
0},
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
[
{pkg_hash,[
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
{pkg_hash_ext,[
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
].
BIN
View File
Binary file not shown.
+16 -27
View File
@@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
i_show_table(I, Move, Limit, Ref) ->
case rocksdb:iterator_move(I, Move) of
{ok, EncKey, EncVal} ->
{Type,Val} =
case EncKey of
<< ?INFO_TAG, K/binary >> ->
K1 = decode_key(K, Ref),
V = decode_val(EncVal, K1, Ref),
{info,V};
_ ->
K = decode_key(EncKey, Ref),
V = decode_val(EncVal, K, Ref),
{data,V}
end,
io:fwrite("~p: ~p~n", [Type, Val]),
K = decode_key(EncKey, Ref),
Val = decode_val(EncVal, K, Ref),
io:fwrite("~p~n", [Val]),
i_show_table(I, next, Limit-1, Ref);
_ ->
ok
@@ -346,14 +337,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
semantics(_Alias, _) -> undefined.
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
case info(Alias, Tab, {index_consistent, PosInfo}) of
true -> true;
_ -> false
end.
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
when is_boolean(Bool) ->
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
%% PRIVATE FUN
@@ -454,8 +442,13 @@ close_table(Alias, Tab) ->
error ->
ok;
_ ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab)
case get(mnesia_dumper_dets) of
undefined ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab);
_ ->
ok
end
end.
close_table_(Alias, Tab) ->
@@ -673,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
#{semantics := Sem} = Ref = get_ref(Tab),
Start = case Ref of
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
_ -> first
end,
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
First = fun(I) -> rocksdb:iterator_move(I, first) end,
F = case Sem of
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end
@@ -798,7 +787,7 @@ handle_call({create_table, Tab, Props}, _From,
exit:{aborted, Error} ->
{reply, {aborted, Error}, St}
end;
handle_call({load_table, _LoadReason, Props}, _From,
handle_call({load_table, _LoadReason, Props}, _,
#st{alias = Alias, tab = Tab} = St) ->
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{status = active}};
@@ -826,7 +815,7 @@ handle_call({delete, Key}, _From, St) ->
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
Res = mrdb:match_delete(get_ref(Tab), Pat),
{reply, Res, St};
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
{reply, ok, St#st{status = undefined}};
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
+58 -191
View File
@@ -33,6 +33,7 @@
, read_info/2 %% (Alias, Tab)
, read_info/4 %% (Alias, Tab, Key, Default)
, write_info/4 %% (Alias, Tab, Key, Value)
, delete_info/3 %% (Alias, Tab, Key)
, write_table_property/3 %% (Alias, Tab, Property)
]).
@@ -77,7 +78,7 @@
| {add_aliases, [alias()]}
| {write_table_property, tabname(), tuple()}
| {remove_aliases, [alias()]}
| {migrate, [{tabname(), map()}], rpt()}
| {migrate, [tabname() | {tabname(), map()}], rpt()}
| {prep_close, table()}
| {close_table, table()}
| {clear_table, table() | cf() }.
@@ -249,58 +250,38 @@ get_info_res(Res, Default) ->
error(E)
end.
%% Admin info: metadata written by the admin proc to keep track of
%% the derived status of tables (such as detected version and encoding
%% of existing standalone tables.)
%%
write_admin_info(K, V, Alias, Name) ->
mrdb:rdb_put(get_ref({admin, Alias}),
admin_info_key(K, Name),
term_to_binary(V)).
read_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
_ ->
error
end.
delete_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
admin_info_key(K, Name) ->
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
%% Table metadata info maintained by users
%%
write_info(Alias, Tab, K, V) ->
write_info_(get_ref({admin, Alias}), Tab, K, V).
write_info_(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
write_info_encv(Ref, Tab, K, term_to_binary(V)).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
EncV = mnesia_rocksdb_lib:encode_val(V, term),
rocksdb:put(DbRef, Key, EncV, []);
_ ->
ok
end.
write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) ->
delete_info_(get_ref({admin, Alias}), Tab, K).
delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
mrdb:rdb_delete(Ref, EncK, []).
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}).
-spec migrate_standalone(alias(), Tabs) -> Res when
Tabs :: [tabname() | {tabname(),map()}],
Res :: [{tabname(), {ok,any()} | {error, any()}}].
migrate_standalone(Alias, Tabs) ->
migrate_standalone(Alias, Tabs, undefined).
-spec migrate_standalone(alias(), Tabs, Rpt) -> Res when
Tabs :: [tabname() | {tabname(),map()}],
Rpt :: 'undefined' | pid() | atom(),
Res :: [{tabname(), {ok,any()} | {error, any()}}].
migrate_standalone(Alias, Tabs, Rpt0) ->
Rpt = case Rpt0 of
undefined -> undefined;
@@ -323,7 +304,6 @@ call(Alias, Req, Timeout) ->
end.
start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
@@ -405,12 +385,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
%% We need to store the persistent ref explicitly here,
%% since mnesia knows nothing of our admin table.
AdminTab = {admin, Alias},
Stats = mrdb_stats:new(),
CfI = update_cf_info(AdminTab, #{ status => open
, name => AdminTab
, vsn => ?VSN
, encoding => {sext,{value,term}}
, attr_pos => #{key => 1,
value => 2}
, stats => Stats
, mountpoint => MP
, properties =>
#{ attributes => [key, val]
@@ -510,12 +492,17 @@ intersection(A, B) ->
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, St1};
{error, _} = Error ->
{reply, Error, St}
case find_cf(Alias, Name, Backend, St) of
{ok, TRec} ->
{reply, {ok, TRec}, St};
error ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
{error, _} = Error ->
{reply, Error, St}
end
end;
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
try
@@ -628,8 +615,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
case {Op, lists:member(I, Index)} of
{delete, true} ->
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
delete_info(Alias, Main, {index_consistent, I}),
maybe_update_pt(Main, CfM1),
update_cf(Alias, Main, CfM1, St);
{create, _} ->
%% Due to a previous bug, this marker might linger
%% In any case, it mustn't be there for a newly created index
delete_info(Alias, Main, {index_consistent, I}),
St;
_ ->
St
end;
@@ -640,6 +633,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
maybe_update_main(_, _, _, St) ->
St.
%% The pt may not have been created yet. If so, don't do it here.
maybe_update_pt(Name, Ref) ->
case get_pt(Name, error) of
@@ -778,10 +772,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
false ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
{false, MP} ->
create_table_as_standalone(Alias, Name, true, MP, TRec, St);
create_table_as_standalone(Alias, Name, MP, TRec, St);
{true, MP} ->
?log(debug, "will create ~p as standalone and migrate", [Name]),
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
case create_table_as_standalone(Alias, Name, MP, TRec, St) of
{ok, OldTRec, _} ->
?log(info, "Migrating ~p to column family", [Name]),
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
@@ -790,8 +784,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
end
end;
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
{Exists, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
{_, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, MP, TRec, St).
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
?log(debug, "Migrate to cf (~p)", [Name]),
@@ -1071,29 +1065,12 @@ table_exists_as_standalone(Name) ->
end,
{Exists, MP}.
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
{ok, #{type := standalone, vsn := Vsn1,
encoding := Enc1} = Cf, _St1} = Ok ->
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
Alias, Name),
case Vsn1 of
1 ->
load_info(Alias, Name, Cf);
_ ->
skip
end,
Ok;
Other ->
Other
end.
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
create_table_as_standalone(Alias, Name, MP, TRec, St) ->
Vsn = check_version(TRec),
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St).
do_open_standalone(true, Alias, Name, MP, TRec1, St).
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
#st{standalone = Ts} = St) ->
Opts = rocksdb_opts_from_trec(TRec0),
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
@@ -1103,135 +1080,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
DbRec1 = DbRec#{ cfs => CfNames,
mountpoint => MP },
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
TRec1 = guess_table_vsn_and_encoding(Exists, TRec),
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{error, _} = Err ->
?log(debug, "open_db error: ~p", [Err]),
Err
end.
%% When opening a standalone table, chances are it's a legacy table
%% where legacy encoding is already in place. We try to read the
%% first object and apply legacy encoding. If successful, we set
%% legacy encoding in the TRec. If we migrate the data to a column
%% family, we should apply the defined encoding for the cf.
%%
%% The first object can either be an info object (in the legacy case)
%% or a data object, with a sext-encoded key, and a term_to_binary-
%% encoded object as value, where the key position is set to [].
%% The info objects would be sext-encoded key + term-encoded value.
guess_table_vsn_and_encoding(false, TRec) ->
TRec;
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
alias := Alias, name := Name} = R) ->
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
{ok, {V, E}} ->
R#{vsn => V, encoding => E};
error ->
R1 = set_default_guess(R),
mrdb:with_rdb_iterator(
R1, fun(I) ->
guess_table_vsn_and_encoding_(
mrdb:rdb_iterator_move(I, first), I, As, R1)
end)
end.
set_default_guess(#{type := standalone} = R) ->
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
1 ->
R#{vsn => 1, encoding => {sext, {object, term}}};
V ->
R#{vsn => V}
end.
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
Arity = length(As) + 1,
case K of
<<?INFO_TAG, EncK/binary>> ->
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
mnesia_rocksdb_lib:decode(V, term)},
%% This is a vsn 1 standalone table
R#{vsn => 1, encoding => {sext, {object, term}}}
catch
error:_ ->
R
end;
_ ->
Enc = guess_obj_encoding(K, V, Arity),
R#{encoding => Enc}
end;
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
R.
guess_obj_encoding(K, V, Arity) ->
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
guess_encoding(Bin) ->
try {sext, sext:decode(Bin)}
catch
error:_ ->
try {term, binary_to_term(Bin)}
catch
error:_ -> raw
end
end.
guess_key_encoding(Bin) ->
case guess_encoding(Bin) of
raw -> raw;
{Enc, _} -> Enc
end.
guess_val_encoding(Bin, Arity) ->
case guess_encoding(Bin) of
raw -> {value, raw};
{Enc, Term} ->
if is_tuple(Term), size(Term) == Arity,
element(2, Term) == [] ->
{object, Enc};
true ->
{value, Enc}
end
end.
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
%% for the presence of some metadata, and still considers it empty if there
%% is no user data.
table_is_empty(#{} = DbRec) ->
Start = iterator_data_start(DbRec),
mrdb:with_rdb_iterator(
DbRec, fun(I) ->
case mrdb:rdb_iterator_move(I, Start) of
case mrdb:rdb_iterator_move(I, first) of
{ok, _, _} -> false;
_ -> true
end
end).
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
iterator_data_start(_) -> first.
load_info(Alias, Name, Cf) ->
ARef = get_ref({admin, Alias}),
mrdb:with_rdb_iterator(
Cf, fun(I) ->
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
end).
load_info_(Res, I, ARef, Tab) ->
case Res of
{ok, << ?INFO_TAG, K/binary >>, V} ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_(ARef, Tab, DecK, V);
_ ->
skip
end,
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
_ ->
ok
end.
check_version(TRec) ->
user_property(mrdb_version, TRec, ?VSN).
@@ -1443,24 +1309,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
%% not yet created
CFs = cfs(CFs0),
file:make_dir(MP),
OpenOpts = [ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ],
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
OpenRes = rocksdb_open(MP, Opts, CFs),
map_cfs(OpenRes, CFs, Alias, Acc0);
false ->
{error, enoent};
true ->
%% Assumption: even an old rocksdb database file will have at least "default"
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
end.
open_opts(Opts) ->
[ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ].
rocksdb_open(MP, Opts, CFs) ->
%% rocksdb:open(MP, Opts, CFs),
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
is_open(Alias, #st{backends = Bs}) ->
case maps:find(Alias, Bs) of
@@ -1559,7 +1427,6 @@ close_and_delete_standalone(#{alias := Alias,
case get_table_mountpoint(Alias, Name, St) of
{ok, MP} ->
close_and_delete(DbRef, MP),
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
error ->
St
+2 -1
View File
@@ -42,4 +42,5 @@ start_link() ->
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
+288 -101
View File
@@ -53,6 +53,7 @@
, delete/2 , delete/3
, delete_object/2, delete_object/3
, match_delete/2
, merge/3 , merge/4
, clear_table/1
, batch_write/2 , batch_write/3
, update_counter/3, update_counter/4
@@ -60,14 +61,18 @@
, get_batch/1
, snapshot/1
, release_snapshot/1
, first/1 , first/2
, next/2 , next/3
, prev/2 , prev/3
, last/1 , last/2
, select/2 , select/3
, first/1 , first/2
, next/2 , next/3
, prev/2 , prev/3
, last/1 , last/2
, select/2 , select/3
, select_reverse/2, select_reverse/3
, select/1
, fold/3 , fold/4 , fold/5
, rdb_fold/4 , rdb_fold/5
, ms/2
, fold/3 , fold/4 , fold/5
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
, rdb_fold/4 , rdb_fold/5
, rdb_fold_reverse/4, rdb_fold_reverse/5
, write_info/3
, read_info/2
, read_info/1
@@ -94,17 +99,25 @@
, encode_val/2
, decode_val/3 ]).
%% TODO: OTP 28.3 dialyzer is quite pesky about opaque types. Find a way
%% to structure this, and firm up the type specs.
-dialyzer([no_opaque, no_return]).
-dialyzer([{nowarn_function, [activity/3]}]).
-export_type( [ mrdb_iterator/0
, itr_handle/0
, iterator_action/0
, db_ref/0
, ref_or_tab/0
, index_position/0
, match_pattern/0
]).
-include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl").
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
-type tab_name() :: atom().
-type alias() :: atom().
-type admin_tab() :: {admin, alias()}.
@@ -115,7 +128,12 @@
| index()
| retainer().
-type retries() :: non_neg_integer().
-type inner() :: non_neg_integer().
-type outer() :: non_neg_integer().
-type retries() :: outer() | {inner(), outer()}.
-type matchpat_map() :: #{atom() => _}.
-type match_pattern() :: matchpat_map() | ets:match_pattern().
%% activity type 'ets' makes no sense in this context
-type mnesia_activity_type() :: transaction
@@ -124,14 +142,15 @@
| sync_dirty.
-type tx_options() :: #{ retries => retries()
, no_snapshot => boolean() }.
, no_snapshot => boolean()
, mnesia_compatible => boolean() }.
-type mrdb_activity_type() :: tx | {tx, tx_options()} | batch.
-type activity_type() :: mrdb_activity_type() | mnesia_activity_type().
-type key() :: any().
-type obj() :: tuple().
-type index_position() :: atom() | pos().
-type index_position() :: atom() | pos() | plugin_ix_pos().
-type db_handle() :: rocksdb:db_handle().
-type cf_handle() :: rocksdb:cf_handle().
@@ -141,16 +160,20 @@
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := non_neg_integer() }.
, attempt := 'undefined' | retries() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
-type pos() :: non_neg_integer().
-type plugin_ix_pos() :: {atom()}.
-type propkey() :: any().
-type propvalue() :: any().
-type properties() :: #{ record_name := atom()
, attributes := [atom()]
, index := [{pos(), bag | ordered}]
, user_properties => #{propkey() => propvalue()}
}.
-type semantics() :: bag | set.
-type key_encoding() :: 'raw' | 'sext' | 'term'.
@@ -161,7 +184,7 @@
-type attr_pos() :: #{atom() := pos()}.
-type db_ref() :: #{ name => table()
, alias => atom()
, alias => alias()
, vsn => non_neg_integer()
, db_ref := db_handle()
, cf_handle := cf_handle()
@@ -176,6 +199,11 @@
, activity => activity()
, _ => _}.
-type context() :: #{ 'activity' => activity()
, 'alias' => alias()
, 'retries' => retries()
, 'db_ref' => db_ref() }.
-type error() :: {error, any()}.
-type ref_or_tab() :: table() | db_ref().
@@ -254,12 +282,20 @@ release_snapshot(SHandle) ->
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
%% the commit set. It will then be re-run again, until the retry count is exhausted.
%%
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
%% attempt, but only on outer retries (the first retry is always an outer retry).
%%
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
%%
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
%% `batch | async_dirty | sync_dirty' are synonyms.
%% @end
-spec activity(activity_type(), alias(), fun( () -> Res )) -> Res.
-spec activity(activity_type(), alias(), fun( () -> any() )) -> any().
activity(Type, Alias, F) ->
#{db_ref := DbRef} = ensure_ref({admin, Alias}),
Ctxt = case tx_type(Type) of
@@ -269,41 +305,93 @@ activity(Type, Alias, F) ->
#{ alias => Alias
, db_ref => DbRef }, TxCtxt);
batch ->
Batch = get_batch_(DbRef),
Batch = init_batch_ref(DbRef),
#{ activity => #{ type => batch
, handle => Batch }
, alias => Alias
, db_ref => DbRef }
end,
do_activity(F, Alias, Ctxt, false).
do_activity(F, Alias, Ctxt).
do_activity(F, Alias, Ctxt, WithLock) ->
try run_f(F, Ctxt, WithLock, Alias) of
Res ->
try commit_and_pop(Res)
catch
throw:{?MODULE, busy} ->
do_activity(F, Alias, Ctxt, true)
end
-spec do_activity(fun(() -> any()), alias(), context()) -> any().
do_activity(F, Alias, Ctxt) ->
try try_f(F, Ctxt)
catch
Cat:Err ->
abort_and_pop(Cat, Err)
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt)
end.
run_f(F, Ctxt, false, _) ->
push_ctxt(Ctxt),
F();
run_f(F, Ctxt, true, Alias) ->
mrdb_mutex:do(
Alias,
fun() ->
push_ctxt(incr_attempt(Ctxt)),
F()
end).
try_f(F, Ctxt) ->
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
try_f(false, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(false, throw, Something);
Cat:Err:T ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult. Incompatible with mnesia, though.
abort_and_pop(false, Cat, {Err, T})
end;
try_f(true, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(true, throw, Something);
Cat:Err ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult
abort_and_pop(true, Cat, Err)
end.
run_f(F, Ctxt) ->
push_ctxt(Ctxt),
F().
incr_attempt(0, {_,O}) when O > 0 ->
{outer, {0,1}};
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
is_integer(Ri), is_integer(Ro) ->
if I < Ri -> {inner, {I+1, O}};
O < Ro -> {outer, {0, O+1}};
true ->
error
end;
incr_attempt(_, _) ->
error.
retry_activity(F, Alias, #{activity := #{ type := Type
, attempt := A
, retries := R} = Act} = Ctxt) ->
case incr_attempt(A, R) of
{RetryCtxt, A1} ->
Act1 = Act#{attempt := A1},
Ctxt1 = Ctxt#{activity := Act1},
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt1)
end;
error ->
return_abort(mnesia_compatible_aborts(Ctxt), Type, error, retry_limit)
end.
retry_activity_(inner, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, inner_retries, 1),
try_f(F, Ctxt);
retry_activity_(outer, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, outer_retries, 1),
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Act1 = Act#{attempt := A+1, handle := TxH},
Act1 = Act#{handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of
true ->
@@ -348,6 +436,7 @@ current_context() ->
undefined
end.
-spec tx_type(activity_type()) -> {'tx', map()} | 'batch'.
tx_type(T) ->
case T of
_ when T==batch;
@@ -368,7 +457,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
check_tx_opts(Opts) ->
case maps:without([no_snapshot, retries], Opts) of
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
Other when map_size(Other) > 0 ->
abort({invalid_tx_opts, maps:keys(Other)});
_ ->
@@ -376,10 +465,14 @@ check_tx_opts(Opts) ->
end.
check_retries(#{retries := Retries} = Opts) ->
if is_integer(Retries), Retries >= 0 ->
Opts;
true ->
error({invalid_tx_option, {retries, Retries}})
case Retries of
_ when is_integer(Retries), Retries >= 0 ->
Opts#{retries := {0, Retries}};
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
Inner >= 0, Outer >= 0 ->
Opts;
_ ->
error({invalid_tx_option, {retries, Retries}})
end.
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
@@ -394,7 +487,7 @@ create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 1})}.
, attempt => 0 })}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of
@@ -414,8 +507,7 @@ commit_and_pop(Res) ->
Res;
{error, {error, "Resource busy" ++ _ = Busy}} ->
case A of
#{retries := Retries, attempt := Att}
when Att =< Retries ->
#{retries := {I,O}} when I > 0; O > 0 ->
throw({?MODULE, busy});
_ ->
error({error, Busy})
@@ -431,8 +523,8 @@ commit_and_pop(Res) ->
end
end.
-spec abort_and_pop(atom(), any()) -> no_return().
abort_and_pop(Cat, Err) ->
-spec abort_and_pop(boolean(), atom(), any()) -> no_return().
abort_and_pop(Compat, Cat, Err) ->
%% We can pop the context right away, since there is no
%% complex failure handling (like retry-on-busy) for rollback.
#{activity := #{type := Type, handle := H}} = pop_ctxt(),
@@ -440,23 +532,29 @@ abort_and_pop(Cat, Err) ->
tx -> ok = rdb_transaction_rollback(H);
batch -> ok = release_batches(H)
end,
return_abort(Type, Cat, Err).
return_abort(Compat, Type, Cat, Err).
-spec return_abort(batch | tx, atom(), any()) -> no_return().
return_abort(batch, Cat, Err) ->
-spec return_abort(boolean(), batch | tx, atom(), any()) -> no_return().
return_abort(_, batch, Cat, Err) ->
re_throw(Cat, Err);
return_abort(tx, Cat, Err) ->
case mnesia_compatible_aborts() of
return_abort(Compat, tx, Cat, Err) ->
case Compat of
true ->
%% Mnesia always captures stack traces, but this could actually become a
%% performance issue in some cases (generally, it's better not to lazily
%% produce stack traces.) Since we want to pushe the option checking for
%% produce stack traces.) Since we want to push the option checking for
%% mnesia-abort-style compatibility to AFTER detecting an abort, we don't
%% order a stack trace initially, and instead insert an empty list.
%% (The exact stack trace wouldn't be the same anyway.)
%% NOTE: This behavior changed in mnesia-4.23.5.1, so if we really want
%% to stay compatible, we have to special-case things.
Err1 =
case Cat of
error -> {fix_error(Err), []};
error ->
case newer_mnesia() of
true -> fix_error(Err);
false -> {fix_error(Err), []}
end;
exit -> fix_error(Err);
throw -> {throw, Err}
end,
@@ -465,6 +563,11 @@ return_abort(tx, Cat, Err) ->
re_throw(Cat, Err)
end.
%% Some rollback return values changed in mnesia in vsn 4.23.5.1
newer_mnesia() ->
{ok, Vsn} = application:get_key(mnesia, vsn),
("4.23.5" < Vsn).
-spec re_throw(atom(), any()) -> no_return().
re_throw(Cat, Err) ->
case Cat of
@@ -473,10 +576,14 @@ re_throw(Cat, Err) ->
throw -> throw(Err)
end.
mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
Bool;
mnesia_compatible_aborts(_) ->
mnesia_compatible_aborts().
fix_error({aborted, Err}) ->
Err;
fix_error(Err) ->
@@ -507,10 +614,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
pop_ctxt()
end.
rdb_release_batch(?BATCH_REF_DUMMY) ->
ok;
rdb_release_batch(H) ->
rocksdb:release_batch(H).
%% @doc Aborts an ongoing {@link activity/2}
-spec abort(_) -> no_return().
abort(Reason) ->
case mnesia_compatible_aborts() of
true ->
@@ -529,7 +639,7 @@ new_tx(#{activity := _}, _) ->
new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) ->
@@ -539,7 +649,7 @@ tx_ref(Tab, TxH) ->
#{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH});
R ->
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
end.
-spec tx_commit(tx_handle() | db_ref()) -> ok.
@@ -586,10 +696,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
inherit_ctxt(Ref, R) ->
maps:merge(Ref, maps:with([snapshot, activity], R)).
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%% @equiv with_iterator(Tab, Fun, [])
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
with_iterator(Tab, Fun) ->
with_iterator(Tab, Fun, []).
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%%
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
with_iterator(Tab, Fun, Opts) ->
R = ensure_ref(Tab),
@@ -666,6 +791,21 @@ insert(Tab, Obj0, Opts) ->
EncVal = encode_val(Obj, Ref),
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
merge(Tab, Key, MergeOp) ->
merge(Tab, Key, MergeOp, []).
merge(Tab, Key, MergeOp, Opts) ->
#{encoding := Enc} = Ref = ensure_ref(Tab),
case Enc of
{_, {value, term}} ->
merge_(Ref, Key, MergeOp, Opts);
_ ->
abort(badarg)
end.
merge_(Ref, Key, MergeOp, Opts) ->
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
validate_obj(Obj, #{mode := mnesia}) ->
Obj;
validate_obj(Obj, #{attr_pos := AP,
@@ -889,7 +1029,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
_ when is_atom(Ix) ->
{attr_pos(Ix, Ref), ordered};
{_} ->
Ix;
{Ix, ordered};
_ when is_integer(Ix) ->
{Ix, ordered}
end,
@@ -1010,18 +1150,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
get_batch(_) ->
{error, badarg}.
get_batch_(DbRef) ->
init_batch_ref(DbRef) ->
Ref = make_ref(),
{ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
Ref.
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
get_batch_(DbRef, BatchRef) ->
Key = batch_ref_key(BatchRef),
case pdict_get(Key) of
undefined ->
error(stale_batch_ref);
#{DbRef := Batch} ->
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
Batch;
Map ->
{ok, Batch} = rdb_batch(),
@@ -1050,7 +1192,9 @@ write_batches(BatchRef, Opts) ->
pdict_erase(Key),
ret_batch_write_acc(
maps:fold(
fun(DbRef, Batch, Acc) ->
fun(_, ?BATCH_REF_DUMMY, Acc) ->
Acc;
(DbRef, Batch, Acc) ->
case rocksdb:write_batch(DbRef, Batch, Opts) of
ok ->
Acc;
@@ -1206,9 +1350,65 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select(ensure_ref(Tab), Pat, Limit).
select_reverse(Tab, Pat) ->
select_reverse(Tab, Pat, infinity).
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
select(Cont) ->
mrdb_select:select(Cont).
%% @doc Produce a match specification for select(), supporting map-based match patterns
%%
%% Using record syntax in match patterns tends to conflict with type checking. This
%% function offers an alternative approach, drawing on the fact that mnesia_rocksdb
%% keeps the record name and attribute names readily available as persistent terms.
%%
%% When using the map-based representation, the match pattern is built by matching
%% attribute names to map elements; any attribute not found in the map gets set to '_'.
%% Thus, ```[{#balance{key = {Acct,'$1'},_='_'},[{'>=','$1',Height}],['$_']}]''' can be
%% created as ```ms(balance,[{#{key => {Acct,'$1'}},[{'>=','$1',Height}],['$_']}])'''.
%%
%% This has the advantage over `ms_transform' that it can handle bound variables
%% in the match pattern. The function works on all mnesia table types.
%% @end
-spec ms(ref_or_tab(), [{match_pattern(), [_], [_]}]) -> ets:match_spec().
ms(Tab, Pat) ->
#{ attributes := Attrs
, record_name := RecName } = any_tab_props(Tab),
[{headpat(RecName, Attrs, Hd), Gs, Body}
|| {Hd, Gs, Body} <- Pat].
any_tab_props(Tab) ->
try mrdb_props(Tab)
catch
exit:{aborted,{bad_type,_}} ->
mnesia_props(Tab)
end.
mrdb_props(Tab) ->
#{properties := Props} = get_ref(Tab),
Props.
mnesia_props(Tab) ->
try mnesia_props_(Tab)
catch
exit:{aborted, _} ->
mnesia:abort({bad_type, Tab})
end.
mnesia_props_(Tab) ->
#{ record_name => mnesia:table_info(Tab, record_name)
, attributes => mnesia:table_info(Tab, attributes) }.
headpat(RecName, Attrs, Hd) when is_map(Hd) ->
list_to_tuple([RecName | [maps:get(A, Hd, '_')
|| A <- Attrs]]);
headpat(_, _, Hd) when is_tuple(Hd); is_atom(Hd) ->
Hd.
clear_table(Tab) ->
match_delete(Tab, '_').
@@ -1260,6 +1460,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
fold_reverse(Tab, Fun, Acc) ->
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold(Tab, Fun, Acc, Prefix, infinity).
@@ -1269,6 +1479,15 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
true = valid_limit(Limit),
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
, is_binary(Prefix) ->
true = valid_limit(Limit),
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) ->
case L of
infinity ->
@@ -1280,15 +1499,7 @@ valid_limit(L) ->
end.
write_info(Tab, K, V) ->
R = ensure_ref(Tab),
Alias = case R of
#{type := standalone, vsn := 1, alias := A} = TRef ->
%% Also write on legacy info format
write_info_standalone(TRef, K, V),
A;
#{alias := A} ->
A
end,
#{alias := Alias} = ensure_ref(Tab),
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
write_info_(#{} = R, Tab, K, V) ->
@@ -1305,13 +1516,8 @@ read_info(Tab, K) ->
read_info(Tab, K, Default) when K==size; K==memory ->
read_direct_info_(ensure_ref(Tab), K, Default);
read_info(Tab, K, Default) ->
#{alias := Alias} = R = ensure_ref(Tab),
case R of
#{type := standalone, vsn := 1} = TRef ->
read_info_standalone(TRef, K, Default);
#{alias := Alias} ->
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
end.
#{alias := Alias} = ensure_ref(Tab),
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
read_direct_info_(R, memory, _Def) ->
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
@@ -1335,26 +1541,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
%%rocksdb_boolean(<<"1">>) -> true;
%%rocksdb_boolean(<<"0">>) -> false.
write_info_standalone(#{} = R, K, V) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
EncV = term_to_binary(V),
rdb_put(R, EncK, EncV, write_opts(R, [])).
read_info_standalone(#{} = R, K, Default) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
get_info_res(Res, Default) ->
case Res of
not_found ->
Default;
{ok, Bin} ->
%% no fancy tricks when encoding/decoding info values
binary_to_term(Bin);
{error, E} ->
error(E)
end.
%% insert_bag_v2(Ref, K, V, Opts) ->
%% rdb_merge(Ref, K, {list_append, [V]}
@@ -1494,8 +1680,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
rdb_iterator(R, Opts) ->
rdb_iterator_(R, read_opts(R, Opts)).
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
%% Note: versions before 1.8.0 expected DbRef as first argument
rocksdb:transaction_iterator(TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
rocksdb:iterator(DbRef, CfH, ROpts).
+90 -3
View File
@@ -6,11 +6,18 @@
, iterator_move/2
, iterator/2
, iterator_close/1
, fold/4
, rev_fold/4
, index_ref/2
, select/3
, select/4
, select_reverse/3
, select_reverse/4
]).
-record(mrdb_ix_iter, { i :: mrdb:iterator()
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
, type = set :: set | bag
, sub :: mrdb:ref() | pid()
, sub :: pid() | mrdb:db_ref()
}).
-type ix_iterator() :: #mrdb_ix_iter{}.
@@ -19,7 +26,7 @@
-type object() :: tuple().
-record(subst, { i :: mrdb:iterator()
-record(subst, { i :: mrdb:mrdb_iterator()
, vals_f
, cur
, mref }).
@@ -30,6 +37,11 @@
-export_type([ ix_iterator/0 ]).
-spec index_ref(mrdb:ref_or_tab(), mrdb:index_position()) -> mrdb:db_ref().
index_ref(Tab, Ix) ->
#{} = R = mrdb:ensure_ref(Tab),
ensure_index_ref(Ix, R).
-spec with_iterator(mrdb:ref_or_tab(), mrdb:index_position(), fun( (ix_iterator()) -> Res)) -> Res.
with_iterator(Tab, IxPos, Fun) when is_function(Fun, 1) ->
{ok, I} = iterator(Tab, IxPos),
@@ -62,6 +74,57 @@ iterator(Tab, IxPos) ->
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Folds over the index table corresponding to Tab and IxPos.
%% The fold fun is called with the index key and the corresponding object,
%% or [] if there is no such object.
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, first, next, FoldFun, Acc).
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Like fold/4 above, but folds over the index in the reverse order.
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
with_iterator(
Tab, IxPos,
fun(I) ->
iter_fold(I, Start, Dir, FoldFun, Acc)
end).
select(Tab, Ix, MS) ->
select(Tab, Ix, MS, infinity).
select(Tab, Ix, MS, Limit) ->
#{} = R = mrdb:ensure_ref(Tab),
#{} = IxR = ensure_index_ref(Ix, R),
MSpre = pre_match_spec(MS),
mrdb_select:select(IxR#{ derive_obj_f => mk_derive_obj_f(R)
, pre_ms => MSpre }, MS, Limit).
select_reverse(Tab, Ix, MS) ->
select_reverse(Tab, Ix, MS, infinity).
select_reverse(Tab, Ix, MS, Limit) ->
#{} = R = mrdb:ensure_ref(Tab),
#{} = IxR = ensure_index_ref(Ix, R),
MSpre = pre_match_spec(MS),
mrdb_select:select_reverse(IxR#{ derive_obj_f => mk_derive_obj_f(R)
, pre_ms => MSpre }, MS, Limit).
iter_fold(I, Start, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
iter_fold_({error, _}, _, _, _, Acc) ->
Acc.
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} ->
@@ -83,6 +146,30 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
Other
end.
pre_match_spec([{{KeyPat,ObjPat}, Gs, MatchBody} | T]) ->
[{{{KeyPat,'_'},ObjPat}, Gs, MatchBody} | pre_match_spec(T)];
pre_match_spec([H|T]) ->
[H | pre_match_spec(T)];
pre_match_spec([]) ->
[].
%% Used for mrdb_select:select()
%% The select operation folds over index keys, and for matching keys,
%% calls the `derive_obj_f/1` fun, which normally just does `Obj -> [Obj]`.
%% In the case of indexes, it fetches the object, if it exists, and then
%% returns `[{IndexKey, Object}]`, which is what the matchspec should operate on.
%%
mk_derive_obj_f(Ref) ->
fun({{IxK, Key}}) ->
case mrdb:read(Ref, Key, []) of
[Obj] ->
[{IxK, Obj}];
[] ->
[]
end
end.
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
opt_read(R, Key) ->
case mrdb:read(R, Key, []) of
[Obj] ->
+92 -62
View File
@@ -3,79 +3,109 @@
-export([ do/2 ]).
-export([ ensure_tab/0 ]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
%% critical section.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
%% Releasing the resource is done by notifying the server.
%%
%% Previous implementations tested:
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
%% This is a pretty simple implementation, but it lacks ordering, and appeared
%% to sometimes lead to starvation.
%% * A duplicate_bag ets table: These are defined such that insertion order is
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
%% Unfortunately, performance plummeted when tested with > 25 concurrent
%% processes. While this is likely a very high number, given that we're talking
%% about contention in a system using optimistic concurrency, it's never good
%% if performance falls off a cliff.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
try F()
after
release(Rsrc)
mrdb_mutex_serializer:done(Rsrc, Ref)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
-ifdef(TEST).
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
setup() ->
case whereis(mrdb_mutex_serializer) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
cleanup(Pid) ->
unlink(Pid),
exit(Pid, kill).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
swarm_do() ->
Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() ->
send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,1000)],
await_pids(Pids),
Results = fetch(Pid),
{incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
Pid ! {self(), result, N};
false ->
exit(not_even)
end
end).
-endif.
+98
View File
@@ -0,0 +1,98 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.
+157 -40
View File
@@ -1,11 +1,15 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_select).
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select_reverse/3
, select_reverse/4
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, fold_reverse/5
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-export([continuation_info/2]).
@@ -25,26 +29,41 @@
, compiled_ms
, limit
, key_only = false % TODO: not used
, direction = forward % TODO: not used
, direction = forward
, pre_ms
, derive_obj_f = fun unit_l/1
}).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit).
select(Ref, MS, false, forward, Limit).
select(Ref, MS, AccKeys, Limit)
select(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, forward, Limit).
select_reverse(Ref, MS, Limit) ->
select(Ref, MS, false, reverse, Limit).
select_reverse(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, reverse, Limit).
select(Ref, MS, AccKeys, Dir, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit),
Sel = mk_sel(Ref, MS, Dir, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref),
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
MSpre = maps:get(pre_ms, Ref, MS),
Keypat = keypat(MSpre, keypos(Tab), Ref),
#sel{tab = Tab,
ref = Ref,
keypat = Keypat,
ms = MS,
compiled_ms = ets:match_spec_compile(MS),
ms = MSpre,
pre_ms = MSpre,
compiled_ms = ms_compile(MS),
key_only = needs_key_only(MS),
limit = Limit}.
direction = Dir,
limit = Limit,
derive_obj_f = derive_f(Ref)}.
select(Cont) ->
case Cont of
@@ -59,11 +78,18 @@ continuation_info(_, _) -> undefined.
continuation_info_(ref, #sel{ref = Ref}) -> Ref;
continuation_info_(ms, #sel{ms = MS }) -> MS;
continuation_info_(pre_ms, #sel{pre_ms = MSp}) -> MSp;
continuation_info_(limit, #sel{limit = L }) -> L;
continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, forward, Limit).
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
{AccKeys, F} =
if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) ->
@@ -74,7 +100,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
true ->
mrdb:abort(invalid_fold_fun)
end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
fold_('$end_of_table', _, Acc) ->
Acc;
@@ -86,12 +112,57 @@ fold_({L, Cont}, Fun, Acc) ->
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)),
MovRes = fwd_init_seek(I, Prefix),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
first(#{vsn := 1}) -> <<?DATA_START>>;
first(_) -> first.
rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
fwd_init_seek_tgt(<<>> ) -> first;
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
rev_init_seek(I, Pfx) ->
case rev_init_seek_tgt(Pfx) of
last ->
i_move(I, last);
{seek, Bin} ->
%% An 'incremented' prefix.
%% This will fail if we seek past the end of the table.
%% Then, try to seek_for_prev instead (fails if table empty).
%% This because rocksdb lacks a "seek backward to last matching prefix".
case i_move(I, {seek, Bin}) of
{error, invalid_iterator} ->
i_move(I, {seek_for_prev, Bin});
{ok, _, _} = Ok ->
Ok
end
end.
rev_init_seek_tgt(<<>>) -> last;
rev_init_seek_tgt(Prefix) ->
case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end.
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
PfxI = binary:decode_unsigned(Pfx),
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
case PfxI + 1 of
I1 when I1 >= MaxI -> last;
I1 ->
binary:encode_unsigned(I1)
end.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
@@ -104,20 +175,37 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) ->
Acc.
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS
, limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
StartKey = case {Pfx, Vsn, Enc} of
{<<>>, 1, {sext, _}} ->
<<?DATA_START>>;
{_, _, {term, _}} ->
<<>>;
_ ->
Pfx
end,
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc).
, direction = Dir
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
{MoveRes, Sel} = case Enc of
{term, _} ->
%% No defined ordering - do forward select
{i_move(I, first), Sel0#sel{direction = forward}};
_ ->
case Dir of
forward ->
{fwd_init_seek(I, Pfx), Sel0};
reverse ->
{rev_init_seek(I, Pfx), Sel0}
end
end,
select_traverse(MoveRes, Limit, Pfx, MS, I, Sel, AccKeys, Acc).
needs_key_only([Pat]) ->
needs_key_only_(Pat);
@@ -193,16 +281,17 @@ map_vars([H|T], P) ->
map_vars([], _) ->
[].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
AccKeys, Acc) ->
case is_prefix(Pfx, K) of
true ->
DecKey = decode_key(K, R),
Rec = decode_val(V, DecKey, R),
case ets:match_spec_run([Rec], MS) of
Rec0 = decode_val(V, DecKey, R),
RecL = derive_object(Rec0, Sel),
case ms_run(RecL, MS) of
[] ->
select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
[Match] ->
Acc1 = if AccKeys ->
@@ -212,6 +301,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end;
false when Dir == reverse, K > Pfx ->
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
false when Limit == infinity ->
lists:reverse(Acc);
false ->
@@ -220,6 +312,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
next_or_prev(forward) -> next;
next_or_prev(reverse) -> prev.
select_return(infinity, {L, '$end_of_table'}) ->
L;
select_return(_, Ret) ->
@@ -239,29 +334,40 @@ decr(I) when is_integer(I) ->
decr(infinity) ->
infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc),
fun(sel) -> Sel;
(cont) ->
mrdb:with_rdb_iterator(
Ref,
fun(NewI) ->
select_traverse(iterator_next(NewI, K),
select_traverse(iterator_next(NewI, K, Dir),
Limit, Pfx, MS, NewI, Sel,
AccKeys, [])
end)
end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) ->
case rocksdb:iterator_move(I, K) of
iterator_next(I, K, Dir) ->
case i_move(I, K, Dir) of
{ok, K, _} ->
rocksdb:iterator_move(I, next);
rocksdb:iterator_move(I, next_or_prev(Dir));
Other ->
Other
end.
i_move(I, Tgt) ->
rocksdb:iterator_move(I, Tgt).
i_move(I, K, reverse) ->
rocksdb:iterator_move(I, {seek_for_prev, K});
i_move(I, K, forward) ->
rocksdb:iterator_move(I, K).
derive_object(R, #sel{derive_obj_f = F}) ->
F(R).
keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
@@ -283,3 +389,14 @@ keypat_pfx({HeadPat,_Gs,_}, KeyPos, #{encoding := {sext,_}}) when is_tuple(HeadP
keypat_pfx(_, _, _) ->
<<>>.
derive_f(#{derive_obj_f := F}) when is_function(F, 1) -> F;
derive_f(_) -> fun unit_l/1.
unit_l(X) ->
[X].
ms_compile(MS) ->
ets:match_spec_compile(MS).
ms_run(RecL, MS) ->
ets:match_spec_run(RecL, MS).
+74
View File
@@ -0,0 +1,74 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Statistics API for the mnesia_rocksdb plugin
%%
%% Some counters are maintained for each active alias. Currently, the following
%% counters are supported:
%% * inner_retries
%% * outer_retries
%%
-module(mrdb_stats).
-export([new/0]).
-export([incr/3,
get/1,
get/2]).
-type alias() :: mnesia_rocksdb:alias().
-type db_ref() :: mrdb:db_ref().
-type counter() :: atom().
-type increment() :: integer().
-type counters() :: #{ counter() := integer() }.
new() ->
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
, meta => ctr_meta()}.
ctr_meta() ->
#{ inner_retries => 1
, outer_retries => 2 }.
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
%%
%% Note that the first argument may also be a `db_ref()' map,
%% corresponding to `mrdb:get_ref({admin, Alias})'.
%% @end
incr(Alias, Ctr, N) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
incr_(Ref, Meta, Ctr, N);
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
incr_(Ref, Meta, Ctr, N).
-spec get(alias() | db_ref(), counter()) -> integer().
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
%% @end
get(Alias, Ctr) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
get_(Ref, Meta, Ctr);
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
get_(Ref, Meta, Ctr).
-spec get(alias() | db_ref()) -> counters().
%% @doc Fetches all known counters for `Alias', in the form of a map,
%% `#{Counter => Value}'.
%% @end
get(Alias) when is_atom(Alias) ->
get_(mrdb:get_ref({admin, Alias}));
get(Ref) when is_map(Ref) ->
get_(Ref).
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
lists:foldl(
fun({K, P}, M) ->
M#{K := counters:get(Ref, P)}
end, Meta, maps:to_list(Meta)).
get_(Ref, Meta, Attr) ->
Ix = maps:get(Attr, Meta),
counters:get(Ref, Ix).
incr_(Ref, Meta, Attr, N) ->
Ix = maps:get(Attr, Meta),
counters:add(Ref, Ix, N).
@@ -1,64 +0,0 @@
%% -------------------------------------------------------------------
%%
%% basho_bench: Benchmarking Suite
%%
%% Copyright (c) 2009-2010 Basho Techonologies
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(basho_bench_driver_mnesia_rocksdb).
-export([new/1,
run/4]).
-include("mnesia_rocksdb_basho_bench.hrl").
%% ====================================================================
%% API
%% ====================================================================
new(_Id) ->
Type = basho_bench_config:get(backend, ram_copies),
Tab = basho_bench_config:get(mnesia_table, t),
ok = bootstrap_mnesia(Tab, Type),
{ok, Tab}.
bootstrap_mnesia(Tab, Type) ->
ok = mnesia:create_schema([node()],
[{backend_types,
[{rocksdb_copies, mnesia_rocksdb}]}]),
ok = mnesia:start(),
{atomic,ok} = mnesia:create_table(Tab, [{Type, [node()]}]),
mnesia:wait_for_tables([Tab], 10000).
run(get, KeyGen, _ValueGen, State) ->
Tab = State,
Key = KeyGen(),
case mnesia:dirty_read({Tab, Key}) of
[] ->
{ok, State};
[{_, Key, _}] ->
{ok, State}
end;
run(put, KeyGen, ValueGen, State) ->
Tab = State,
ok = mnesia:dirty_write({Tab, KeyGen(), ValueGen()}),
{ok, State};
run(delete, KeyGen, _ValueGen, State) ->
Tab = State,
ok = mnesia:dirty_delete({Tab, KeyGen()}),
{ok, State}.
+205 -22
View File
@@ -24,8 +24,12 @@
, mrdb_abort/1
, mrdb_two_procs/1
, mrdb_two_procs_tx_restart/1
, mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1
, mrdb_three_procs/1
, create_counters/1
, update_counters/1
, restart_node/1
]).
-include_lib("common_test/include/ct.hrl").
@@ -41,7 +45,8 @@ all() ->
groups() ->
[
{all_tests, [sequence], [ {group, checks}
, {group, mrdb} ]}
, {group, mrdb}
, {group, counters}]}
%% , error_handling ]}
, {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary
@@ -53,8 +58,13 @@ groups() ->
, mrdb_abort
, mrdb_two_procs
, mrdb_two_procs_tx_restart
, mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap
, mrdb_three_procs ]}
, {counters, [sequence], [ create_counters
, update_counters
, restart_node
, update_counters ]}
].
@@ -206,6 +216,7 @@ mrdb_transactions_(Config) ->
delete_tabs(Created),
ok.
-dialyzer(no_return).
mrdb_abort_reasons(_Config) ->
Prev = mnesia_rocksdb_admin:set_and_cache_env(mnesia_compatible_aborts, true),
X = some_value,
@@ -224,6 +235,7 @@ compare_txs(Type, F) ->
ct:log("Mrdb = ~p/~p", [Type, EMr]),
case {Type, EMn, EMr} of
{error, {some_value, [_|_]}, {some_value, []}} -> ok;
{error, E, E} -> ok;
{throw, {throw, some_value}, {throw, some_value}} -> ok;
{exit, some_value, some_value} -> ok;
{abort, some_value, some_value} -> ok
@@ -266,31 +278,47 @@ mrdb_abort(Config) ->
mrdb:insert(tx_abort, {tx_abort, a, 1}),
Pre = mrdb:read(tx_abort, a),
D0 = get_dict(),
ActivityF = fun() ->
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
error(abort_here),
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
noooo
end,
TRes = try mrdb:activity(
tx, rdb,
fun() ->
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
error(abort_here),
ok = mrdb:insert(tx_abort, [{tx_abort, a, N+1}]),
noooo
end)
{tx, #{mnesia_compatible => true}}, rdb,
ActivityF)
catch
error:abort_here ->
exit:{aborted, {abort_here, []}} ->
ok
end,
dictionary_unchanged(D0),
ok = TRes,
Pre = mrdb:read(tx_abort, a),
TRes1 = try mrdb:activity(tx, rdb, ActivityF)
catch
error:{abort_here, [_|_]} ->
ok
end,
dictionary_unchanged(D0),
ok = TRes1,
Pre = mrdb:read(tx_abort, a),
delete_tabs(Created),
ok.
mrdb_two_procs(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
tr_flags(
{self(), [call, sos, p]},
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) ->
@@ -314,11 +342,16 @@ mrdb_two_procs_(Config) ->
Pre = mrdb:read(R, a),
go_ahead_other(POther),
await_other_down(POther, MRef, ?LINE),
%% The test proc is still inside the transaction, and POther
%% has terminated/committed. If we now read 'a', we should see
%% the value that POther wrote (no isolation).
[{R, a, 17}] = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
%% Our transaction should fail due to the resource conflict, and because
%% we're not using retries.
try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of
ok -> error(unexpected)
@@ -339,6 +372,7 @@ mrdb_two_procs_tx_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
check_stats(rdb),
mrdb:insert(R, {R, a, 1}),
Pre = mrdb:read(R, a),
F0 = fun() ->
@@ -354,7 +388,7 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = [{R, a, 17}],
Att = get_attempt(),
Expected = case Att of
1 -> Pre;
0 -> Pre;
_ -> OtherWrite
end,
Expected = mrdb:read(R, a),
@@ -363,14 +397,104 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
check_stats(rdb),
[{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created),
ok.
mrdb_two_procs_tx_inner_restart(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
dbg_tr_opts()).
mrdb_two_procs_tx_inner_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
mrdb:insert(R, {R, a, 1}),
mrdb:insert(R, {R, b, 1}),
PreA = mrdb:read(R, a),
PreB = mrdb:read(R, b),
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
F0 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, a, 17}),
wait_for_other(Parent, ?LINE)
end,
F1 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, b, 147}),
wait_for_other(Parent, ?LINE)
end,
Spawn = fun(F) ->
Res = spawn_opt(
fun() ->
ok = mrdb:activity(tx, rdb, F)
end, [monitor]),
Res
end,
{P1, MRef1} = Spawn(F0),
{P2, MRef2} = Spawn(F1),
F2 = fun() ->
PostA = [{R, a, 17}], % once F0 (P1) has committed
PostB = [{R, b, 147}], % once F1 (P2) has committed
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
BRes = mrdb:read(R, b),
Att = get_attempt(),
ct:log("Att = ~p", [Att]),
case Att of
0 -> % This is the first run (no retry yet)
ARes = PreA,
BRes = PreB,
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
await_other_down(P1, MRef1, ?LINE),
PostA = mrdb:read(R, a); % now, P1's write should leak through here
{0, 1} -> % This is the first (outer) retry
go_ahead_other(0, P2), % Let P2 write
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
await_other_down(P2, MRef2, ?LINE),
PostA = mrdb:read(R, a), % now we should see writes from both P1
%% On OTP 23, the following step might fail due to timing, even
%% though the trace output looks as expected. Possibly some quirk
%% with leakage propagation time in rocksdb. Let's retry to be sure.
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
{1, 1} ->
PostA = mrdb:read(R, a),
PostB = mrdb:read(R, b),
ok
end,
mrdb:insert(R, {R, a, 18}),
mrdb:insert(R, {R, b, 18})
end,
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
check_stats(rdb),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b),
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
delete_tabs(Created),
ok.
try_until(Result, F) ->
try_until(Result, F, 10).
try_until(Result, F, N) when N > 0 ->
case F() of
Result ->
ok;
_ ->
receive after 100 -> ok end,
try_until(Result, F, N-1)
end;
try_until(Result, F, _) ->
error({badmatch, {Result, F}}).
%
%% For testing purposes, we use side-effects inside the transactions
@@ -383,7 +507,7 @@ mrdb_two_procs_tx_restart_(Config) ->
%% attempt, and ignore the sync ops on retries.
%%
-define(IF_FIRST(N, Expr),
if N == 1 ->
if N == 0 ->
Expr;
true ->
ok
@@ -413,8 +537,8 @@ mrdb_two_procs_snap(Config) ->
go_ahead_other(Att, POther),
ARes = mrdb:read(R, a),
ARes = case Att of
1 -> Pre;
2 -> [{R, a, 17}]
0 -> Pre;
_ -> [{R, a, 17}]
end,
await_other_down(POther, MRef, ?LINE),
PreB = mrdb:read(R, b),
@@ -434,7 +558,7 @@ mrdb_two_procs_snap(Config) ->
%% We make sure that P2 commits before finishing the other two, and P3 and the
%% main thread sync, so as to maximize the contention for the retry lock.
mrdb_three_procs(Config) ->
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
mrdb_three_procs_(Config) ->
R = ?FUNCTION_NAME,
@@ -452,7 +576,7 @@ mrdb_three_procs_(Config) ->
spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows(
1, Parent, ?LINE,
0, Parent, ?LINE,
fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end),
@@ -488,8 +612,8 @@ mrdb_three_procs_(Config) ->
fun() ->
Att = get_attempt(),
ARes = case Att of
1 -> [A0];
2 -> [A1]
0 -> [A0];
_ -> [A1]
end,
%% First, ensure that P2 tx is running
go_ahead_other(Att, P2),
@@ -519,6 +643,7 @@ tr_opts() ->
, {?MODULE, await_other_down, 3, x}
, {?MODULE, do_when_p_allows, 4, x}
, {?MODULE, allow_p, 3, x}
, {?MODULE, check_stats, 1, x}
]}.
light_tr_opts() ->
@@ -529,6 +654,23 @@ light_tr_opts() ->
, {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts())).
dbg_tr_opts() ->
tr_flags(
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb, current_context, 0, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {?MODULE, try_until, 3, x}
, {mrdb, activity, x} ], tr_opts())).
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
Opts#{patterns => Ps ++ Pats1}.
@@ -542,12 +684,13 @@ wait_for_other(Parent, L) ->
wait_for_other(Att, Parent, L) ->
wait_for_other(Att, Parent, 1000, L).
wait_for_other(1, Parent, Timeout, L) ->
wait_for_other(0, Parent, Timeout, L) ->
MRef = monitor(process, Parent),
Parent ! {self(), ready},
receive
{Parent, cont} ->
demonitor(MRef),
Parent ! {self(), cont_ack},
ok;
{'DOWN', MRef, _, _, Reason} ->
ct:log("Parent died, Reason = ~p", [Reason]),
@@ -586,7 +729,13 @@ go_ahead_other(Att, POther, Timeout) ->
go_ahead_other_(POther, Timeout) ->
receive
{POther, ready} ->
POther ! {self(), cont}
POther ! {self(), cont},
receive
{POther, cont_ack} ->
ok
after Timeout ->
error(cont_ack_timeout)
end
after Timeout ->
error(go_ahead_timeout)
end.
@@ -619,6 +768,35 @@ get_attempt() ->
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt.
create_counters(_Config) ->
create_tab(counters, []),
mrdb:insert(counters, {counters, c0, 1}),
mrdb:update_counter(counters, c1, 1),
[{counters, c0, 1}] = mrdb:read(counters, c0),
[{counters, c1, 1}] = mrdb:read(counters, c1),
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
ok.
restart_node(_Config) ->
mnesia:stop(),
ok = mnesia:start(),
ct:log("mnesia restarted", []),
ok.
update_counters(_Config) ->
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
ok = mrdb:update_counter(counters, c0, 1),
ok = mrdb:update_counter(counters, c1, 1),
ct:log("Incremented c0 and c1 by 1", []),
C0 = C0Prev + 1,
C1 = C1Prev + 1,
[{counters, c0, C0}] = mrdb:read(counters, c0),
[{counters, c1, C1}] = mrdb:read(counters, c1),
ct:log("c0: ~p, c1: ~p", [C0, C1]),
ok.
create_tabs(Tabs, Config) ->
Res = lists:map(fun create_tab/1, Tabs),
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
@@ -645,3 +823,8 @@ dictionary_unchanged(Old) ->
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.
check_stats(Alias) ->
Stats = mrdb_stats:get(Alias),
ct:log("Stats: ~p", [Stats]),
Stats.
-15
View File
@@ -1,15 +0,0 @@
-define(FAIL_MSG(Str, Args), ?ERROR(Str, Args), basho_bench_app:halt_or_kill()).
-define(STD_ERR(Str, Args), io:format(standard_error, Str, Args)).
-define(CONSOLE(Str, Args), lager:info(Str, Args)).
-define(DEBUG(Str, Args), lager:debug(Str, Args)).
-define(INFO(Str, Args), lager:info(Str, Args)).
-define(WARN(Str, Args), lager:warning(Str, Args)).
-define(ERROR(Str, Args), lager:error(Str, Args)).
-define(FMT(Str, Args), lists:flatten(io_lib:format(Str, Args))).
-define(VAL_GEN_BLOB_CFG, value_generator_blob_file).
-define(VAL_GEN_SRC_SIZE, value_generator_source_size).
@@ -1,18 +0,0 @@
{mode, max}.
{duration, 10}.
{concurrent, 1}.
{driver, basho_bench_driver_mnesia_rocksdb}.
{key_generator, {int_to_bin,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
{operations, [{get, 2}, {put, 2}, {delete, 1}]}.
{code_paths, []}.
{mnesia_table, doc}.
{backend, disc_only_copies}.
@@ -1,19 +0,0 @@
{mode, max}.
{duration, 10}.
{concurrent, 1}.
{driver, basho_bench_driver_mnesia_rocksdb}.
{key_generator, {int_to_bin,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
{operations, [{get, 2}, {put, 2}, {delete, 1}]}.
{code_paths, ["/Users/uwiger/git/rocksdb",
"/Users/uwiger/git/mnesia_rocksdb"]}.
{mnesia_table, rdb}.
{backend, rocksdb_copies}.
+1
View File
@@ -22,6 +22,7 @@
-record(t, {k, i, v}).
-dialyzer({nowarn_function, run/1}).
run(Sz) ->
mnesia:stop(),
init(),
+17 -17
View File
@@ -79,20 +79,20 @@ cleanup() ->
os:cmd("rm *.BUP").
mods(0) ->
[];
mods(1) ->
[
{l, mnesia_rocksdb},
{g, rocksdb}
];
mods(2) ->
[
%% {l, mnesia_monitor},
{g, mnesia_rocksdb},
{l, mnesia_bup},
{g, mnesia_lib},
{g, mnesia_schema},
%% {g, mnesia_loader},
{g, mnesia_index},
{l, mnesia_tm}
].
[].
%% mods(1) ->
%% [
%% {l, mnesia_rocksdb},
%% {g, rocksdb}
%% ];
%% mods(2) ->
%% [
%% %% {l, mnesia_monitor},
%% {g, mnesia_rocksdb},
%% {l, mnesia_bup},
%% {g, mnesia_lib},
%% {g, mnesia_schema},
%% %% {g, mnesia_loader},
%% {g, mnesia_index},
%% {l, mnesia_tm}
%% ].
+61 -4
View File
@@ -33,6 +33,7 @@
-export([
index_plugin_mgmt/1
, add_indexes/1
, delete_indexes/1
, create_bag_index/1
, create_ordered_index/1
, test_1_ram_copies/1
@@ -40,10 +41,12 @@
, fail_1_disc_only/1
, plugin_ram_copies1/1
, plugin_ram_copies2/1
, plugin_ram_copies3/1
, plugin_disc_copies/1
, fail_plugin_disc_only/1
, plugin_disc_copies_bag/1
, plugin_rdb_ordered/1
, plugin_rdb_none/1
, index_iterator/1
]).
@@ -73,10 +76,12 @@ run(Config) ->
{pfx},mnesia_rocksdb, ix_prefixes),
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
test_index_plugin(cfg([pl3, rdb, none], Config)),
index_plugin_mgmt(Config),
ok.
@@ -94,6 +99,7 @@ groups() ->
, create_ordered_index
, index_plugin_mgmt
, add_indexes
, delete_indexes
]}
, {access, [sequence], [
test_1_ram_copies
@@ -104,10 +110,12 @@ groups() ->
, {plugin, [sequence], [
plugin_ram_copies1
, plugin_ram_copies2
, plugin_ram_copies3
, plugin_disc_copies
, fail_plugin_disc_only
, plugin_disc_copies_bag
, plugin_rdb_ordered
, plugin_rdb_none
]}
].
@@ -144,8 +152,8 @@ end_per_testcase(_, _) ->
%% ======================================================================
cfg([Tab, Type, IxType], Config) ->
[{my_config, #{tab => Tab, type => Type, ixtype => IxType}} | Config];
cfg(Cfg, Config) when is_map(Cfg) -> [{my_config, Cfg} | Config].
[{my_config, #{tab => Tab, type => Type, ixtype => IxType}} | Config].
%% cfg(Cfg, Config) when is_map(Cfg) -> [{my_config, Cfg} | Config].
cfg(Config) -> ?config(my_config, Config).
@@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
test(N, Type, T) ->
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
{attributes,[k,a,b,c]},
@@ -204,7 +215,7 @@ add_del_indexes() ->
test_index_plugin(Config) ->
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
{index, [{{pfx}, IxType}]}]),
{index, [ixtype(IxType)]}]),
mnesia:dirty_write({Tab, "foobar", "sentence"}),
mnesia:dirty_write({Tab, "yellow", "sensor"}),
mnesia:dirty_write({Tab, "truth", "white"}),
@@ -216,13 +227,40 @@ test_index_plugin(Config) ->
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx});
IxType == ordered ->
IxType == ordered; IxType == none ->
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx})
end,
if Type == rdb ->
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
ok = test_select(Tab,{pfx},[{'_', [], ['$_']}]),
MS2 = [{{<<"whi">>,{Tab,"truth",'_'}},[],['$_']}],
[_] = mrdb_index:select(Tab, {pfx}, MS2),
ok = test_select(Tab, {pfx}, MS2),
[{Tab,"foobar","sentence"}] = mrdb:index_read(
Tab, <<"foo">>, {pfx});
true ->
ok
end.
test_select(Tab, Ix, MS) ->
Res = mrdb_index:select(Tab, Ix, MS),
ct:log("mrdb_index:select(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, Res]),
RevRes = mrdb_index:select_reverse(Tab, Ix, MS),
ct:log("mrdb_index:select_reverse(~p, ~p, ~p) -> ~p", [Tab, Ix, MS, RevRes]),
{Res,Res} = {Res, lists:reverse(RevRes)},
ok.
ixtype(T) when T==bag;
T==ordered ->
{{pfx}, T};
ixtype(none) ->
{pfx}.
create_bag_index(_Config) ->
{aborted, {combine_error, _, _}} =
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
@@ -239,6 +277,25 @@ add_indexes(_Config) ->
{atomic, ok} = mnesia:add_table_index(T, a),
ok.
delete_indexes(_Config) ->
T = ?TAB(t1),
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
ok = mrdb:insert(T, {T, a, 1, 1}),
ok = mrdb:insert(T, {T, b, 1, 2}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
{atomic, ok} = mnesia:del_table_index(T, a),
ok = mrdb:delete(T, b),
ok = mrdb:insert(T, {T, c, 2, 3}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
ok.
ix_fold_acc(K, V, Acc) ->
[{K, V} | Acc].
index_plugin_mgmt(_Config) ->
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
@@ -48,14 +48,18 @@
-record(st, {}).
-define(KEYS, [a,b,c]).
-dialyzer({no_return, [basic_test_/0, test/1, prop_seq/0]}).
basic_test_() ->
{timeout, 60000, [fun() -> test(100) end]}.
-dialyzer({no_opaque, test/1}).
test(N) ->
setup_mnesia(),
true = proper:quickcheck(?MODULE:prop_seq(), N),
ok.
-dialyzer({no_opaque, prop_seq/0}).
prop_seq() ->
?FORALL(Cmds, proper_statem:commands(?MODULE),
begin
+4 -4
View File
@@ -260,10 +260,10 @@ plain_transform1(Fun, [F|Fs]) when is_atom(element(1,F)) ->
continue ->
[list_to_tuple(plain_transform1(Fun, tuple_to_list(F))) |
plain_transform1(Fun, Fs)];
{done, NewF} ->
[NewF | Fs];
{error, Reason} ->
io:format("Error: ~p (~p)~n", [F,Reason]);
%% {done, NewF} ->
%% [NewF | Fs];
%% {error, Reason} ->
%% io:format("Error: ~p (~p)~n", [F,Reason]);
NewF when is_tuple(NewF) ->
[NewF | plain_transform1(Fun, Fs)]
end;
+1 -1
View File
@@ -1,6 +1,6 @@
-module(mrdb_bench).
-compile(export_all).
-compile([export_all, nowarn_export_all]).
init() ->
mnesia:delete_schema([node()]),
+226
View File
@@ -0,0 +1,226 @@
-module(mrdb_fold_SUITE).
-export([
all/0
, groups/0
, suite/0
, init_per_suite/1
, end_per_suite/1
, init_per_group/2
, end_per_group/2
, init_per_testcase/2
, end_per_testcase/2
]).
-export([
no_prefix_fwd_rdb_fold/1
, prefixed_fwd_rdb_fold/1
, no_prefix_rev_rdb_fold/1
, prefixed_rev_rdb_fold/1
, prefixed_rev_rdb_fold_max/1
, fwd_fold/1
, filtered_fwd_fold/1
, rev_fold/1
, filtered_rev_fold/1
, fwd_select/1
, rev_select/1
, select_cont/1
, rev_select_cont/1
]).
-include_lib("common_test/include/ct.hrl").
-record(r, {k, v}).
suite() ->
[].
all() ->
[{group, all_tests}].
groups() ->
[
{all_tests, [sequence], [ {group, rdb_fold}
, {group, fold}
, {group, select} ]}
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
, prefixed_fwd_rdb_fold
, no_prefix_rev_rdb_fold
, prefixed_rev_rdb_fold
, prefixed_rev_rdb_fold_max ]}
, {fold, [sequence], [ fwd_fold
, filtered_fwd_fold
, rev_fold
, filtered_rev_fold ]}
, {select, [sequence], [ fwd_select
, rev_select
, select_cont
, rev_select_cont ]}
].
init_per_suite(Config) ->
mnesia:stop(),
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
Config.
end_per_suite(_Config) ->
mnesia:stop(),
ok.
init_per_group(G, Config) when G==rdb_fold; G==fold ->
mk_tab(G),
Config;
init_per_group(_, Config) ->
Config.
end_per_group(_, _Config) ->
ok.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
mk_tab(G) ->
T = tab_name(G),
case create_tab(T) of
{atomic, ok} -> fill_tab(T);
{aborted,{already_exists,T}} -> ok
end.
tab_name(fold ) -> r;
%% tab_name(select ) -> r;
tab_name(rdb_fold) -> t.
create_tab(T) ->
Opts = tab_opts(T),
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
tab_opts(r) ->
[{attributes, [k, v]}, {type, ordered_set}];
tab_opts(t) ->
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
no_prefix_fwd_rdb_fold(_Config) ->
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = lists:reverse(raw_objs()),
{Res, Res} = {Res, Expected}.
fwd_fold(_Config) ->
Res = mrdb:fold(r, fun simple_acc/2, []),
Expected = lists:reverse(objs()),
{Res, Res} = {Res, Expected}.
prefixed_fwd_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
{Res, Res} = {Res, Expected}.
filtered_fwd_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
no_prefix_rev_rdb_fold(_Config) ->
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = raw_objs(),
{Res, Res} = {Res, Expected}.
rev_fold(_Config) ->
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
Expected = objs(),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
filtered_rev_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold_max(_Config) ->
Pfx = <<255,255>>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
fwd_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select(r, MS),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
rev_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select_reverse(r, MS),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = filtered_objs(MS),
select_cont_loop(mrdb:select(r, MS, 1), Expected).
rev_select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = lists:reverse(filtered_objs(MS)),
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
select_cont_loop(mrdb:select(Cont), Rest);
select_cont_loop({[], '$end_of_table'}, []) ->
ok.
simple_acc(#r{} = Obj, Acc) ->
[Obj | Acc].
simple_rdb_acc(K, V, Acc) ->
[{K,V} | Acc].
fill_tab(t = Tab) ->
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
ok;
fill_tab(r = Tab) ->
[mrdb:insert(Tab, Obj) || Obj <- objs()],
ok.
prefixed_raw_objs(Pfx) ->
[Obj || {K,_} = Obj <- raw_objs(),
is_prefix(Pfx, K)].
raw_objs() ->
[ {<<"aa">> , <<"1">>}
, {<<"aa1">>, <<"2">>}
, {<<"ab">> , <<"3">>}
, {<<"ab1">>, <<"4">>}
, {<<255,255>>, <<"5">>} ].
filtered_objs(MS) ->
MSC = ets:match_spec_compile(MS),
ets:match_spec_run(objs(), MSC).
objs() ->
[ #r{k = {a,a,1}, v = 1}
, #r{k = {a,b,2}, v = 2}
, #r{k = {a,b,3}, v = 3}
, #r{k = {a,c,4}, v = 4}
, #r{k = {b,b,5}, v = 5}
].
%% copied from mrdb_select.erl
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.