67 Commits

Author SHA1 Message Date
uwiger 4e3c9e83c8 Merge pull request 'Add mrdb:fold_reverse(), mrdb:select_reverse() etc. Remove vsn 1 support.' (#6) from uw-fold_reverse into master
Reviewed-on: #6
2025-06-25 03:13:52 +09:00
Ulf Wiger f9352b6ff0 Remove accidental unicode NBSP char 2025-06-19 20:18:52 +02:00
Ulf Wiger 04e6063ce0 Tests for fold, fold_reverse, select_reverse, etc. 2025-06-19 20:12:06 +02:00
Ulf Wiger 323fcea7a7 Remove legacy support for vsn 1 standalone tabs 2025-06-19 20:12:06 +02:00
Ulf Wiger d8b6ab788e Fix prefixed rdb_fold, add mrdb:rdb_rev_fold() 2025-06-19 20:12:06 +02:00
uwiger 318f84bbaf Merge pull request 'Ensure the erlang_merge_operator is always loaded' (#5) from uw-merge-operator into master
Reviewed-on: #5
2025-06-17 18:05:58 +09:00
Ulf Wiger 0691d8b055 Ensure the erlang_merge_operator is always loaded 2025-06-15 20:43:36 +02:00
uwiger ac69c8564f Merge pull request 'Consistent support for merge ops' (#4) from uw-merge-ops into master
Reviewed-on: #4
2025-06-16 03:41:44 +09:00
Ulf Wiger 2ca7f36eb1 Consistent support for merge ops 2025-06-02 20:07:05 +02:00
Ulf Wiger 56d78768d9 Produce stacktraces unless mnesia_compatible 2025-03-23 21:06:21 +01:00
zxq9 a768d8008f Merge remote-tracking branch 'ae/master' into emqx 2024-10-17 10:54:42 +09:00
Richard Carlsson 9cea41cb9a Merge pull request #57 from richcarl/use-fixed-emqx
Use latest version of emqx fork with compilation fix
2024-10-14 12:17:30 +02:00
Richard Carlsson d42055c7ac Update erlang-rocksdb to use latest patch fixing compilation issues 2024-10-13 14:30:56 +02:00
Richard Carlsson 0ae7ecd41d Use our patched version of the EMQX fork 2024-10-09 13:53:01 +02:00
Richard Carlsson ece9db2b09 Merge pull request #56 from richcarl/use-emqx-fork
Use emqx fork of erlang-rocksdb
2024-09-09 14:04:40 +02:00
Richard Carlsson 0e4382d5f7 Use emqx fork of erlang-rocksdb 2024-08-23 11:01:46 +02:00
Ulf Wiger d9d82d7ead Merge branch 'uw-catchup' into 'master'
Fix wrong index preparation in mrdb:index_read_/3

See merge request ioecs/mnesia_rocksdb!1
2024-08-14 09:24:16 +00:00
Ulf Wiger 34285da6d8 Merge pull request #55 from havelkadragan/master
Fix wrong index preparation in mrdb:index_read_/3
2024-08-13 18:21:32 +02:00
crossroad61 cb123ee28a Merge pull request #1 from aeternity/uw-ix-plugin-tests
Improve tests (fail without Havelka's fix)
2024-08-13 12:45:56 +02:00
Ulf Wiger 996ae82717 Improve tests (fail without Havelka's fix) 2024-08-13 10:36:31 +02:00
dragan d6b86524fd Fix wrong index preparation in mrdb:index_read_/3
In the case when index is of type {_}, index is returned as is
instead of putting it into a tuple {Ix, ordered}.

An example

Given the following index plugin implementation:
-module(key6).

-record(cgh, {key, val}).

-export([k6/3
        , add_index/0
        , create_table/0
        , tst_bad_type/0
    ]).

k6(_, _, #cgh{key = {Name, K8, _Cid, _Vn}}) ->
    [list_to_binary([atom_to_binary(Name),
     list_to_binary(lists:sublist(K8, 6))])].

add_index() ->
    mnesia_schema:add_index_plugin({k6}, key6, k6).

create_table() ->
    mnesia:create_table(cgh,
    [{attributes, record_info(fields, cgh)},
     {type, ordered_set},
     {index, [{k6}]},
     {rocksdb_copies, [node()]},
     {local_content, true}]).

tst_bad_type() ->
    E1 = #cgh{key = {n1, "u1vgrjkh",{ecgi,238,6,213020,50},3}, val = ok},
    mnesia:dirty_write(E1),
    Index1 = list_to_binary([atom_to_binary(n1),
                             list_to_binary(lists:sublist("u1vgrjkh", 6))]),
    mrdb:index_read(cgh, Index1, {k6}).

-- testing --
Eshell V14.2.2 (press Ctrl+G to abort, type help(). for help)
(rock@bang)2> mnesia:create_schema([node()]).
ok
(rock@bang)3> mnesia:start().
ok
(rock@bang)4> mnesia_rocksdb:register().
{ok,rocksdb_copies}
(rock@bang)5> l(key6).
{module,key6}
(rock@bang)6> key6:add_index().
{atomic,ok}
(rock@bang)7> key6:create_table().
{atomic,ok}
(rock@bang)8> key6:tst_bad_type().
** exception exit: {aborted,{bad_type,{cgh,index,{k6}}}}
     in function  mnesia:abort/1 (mnesia.erl, line 362)
     in call from mrdb:ensure_ref/1 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 609)
     in call from mrdb:index_read_/3 (~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl, line 945)

-- after patch --

(rock@bang)9> c(mrdb).
Recompiling ~/projects/erlang/github/mnesia_rocksdb/src/mrdb.erl
{ok,mrdb}
(rock@bang)10> key6:tst_bad_type().
[{cgh,{n1,"u1vgrjkh",{ecgi,238,6,213020,50},3},ok}]
(rock@bang)11>
2024-08-12 14:32:20 +02:00
Thomas Arts 296813ef7b Merge pull request #51 from aeternity/ta-rocksdb-1.8.0
Towards version 7.10.2 of rocksdb via version 1.8.0 or erlang_rocksdb
2024-03-19 14:41:50 +01:00
Thomas Arts 16d0533acf Fix dialyzer errors and warnings 2024-03-19 14:08:14 +01:00
Thomas Arts 1d6ae82c6d OTP24-26 2024-03-19 14:08:14 +01:00
Ulf Wiger 71ebaf2d66 New args for rocksdb:transaction_iterator() 2024-03-18 17:40:42 +01:00
Ulf Wiger f2b6116d31 Merge pull request #47 from aeternity/uw-robustify-tx-test
Add retry for bleed-through check
2023-10-12 18:19:48 +02:00
Ulf Wiger 33ee7929d4 Merge pull request #44 from aeternity/uw-index-metadata
Adding/deleting indexes cleans up index metadata
2023-10-12 18:19:16 +02:00
Ulf Wiger 78669bb8bf Merge pull request #42 from mariari/mariari/rebar3-compatible-otp-25
Update rebar3 bin to be compatible with OTP 25
2023-10-09 14:47:25 +02:00
Ulf Wiger 346decee6e Merge pull request #49 from aeternity/ci_matrix
Add CI build matrix for OTP versions
2023-10-09 14:30:47 +02:00
Dincho Todorov 4da0fce567 Add CI build matrix for OTP versions 2023-10-09 13:22:13 +03:00
Ulf Wiger e7d42f9500 Add retry for bleed-through check 2023-10-05 13:53:13 +02:00
Ulf Wiger 699a7d5920 Merge pull request #29 from aeternity/uw-bump-hut-dep
Change hut dep to 1.4.0
2023-10-05 11:00:53 +01:00
Ulf Wiger 57f02078bc Fix typo 2023-10-05 11:53:41 +02:00
Ulf Wiger 791cec41db Adding/deleting indexes cleans up index metadata 2023-10-05 11:53:41 +02:00
Ulf Wiger 6eff62df6d Merge pull request #45 from aeternity/uw-ci-improvements
Run multiple OTP vsns in CI
2023-10-05 10:52:54 +01:00
Ulf Wiger 2e430fc5eb remove 'build' sublevel 2023-10-05 11:21:03 +02:00
Ulf Wiger e76a01f8c4 add ci workflow 2023-10-05 11:12:32 +02:00
Ulf Wiger 4d0a78612a Fix ci config 2023-10-05 11:08:12 +02:00
Ulf Wiger eeb6aff242 Run multiple OTP vsns in CI 2023-10-05 11:00:49 +02:00
mariari f11e16e29f Update rebar3 bin to be compatible with OTP 25
This is the same OTP binary from

https://gitlab.com/vans/erlang-rocksdb/-/commit/af2988a5a27393646de1bb35cfc3644ba4b651b0
2023-08-31 03:12:12 +08:00
Ulf Wiger 0aecf5ef01 Merge pull request #35 from aeternity/uw-different-mutex
rewrite transaction retry mutex
2022-11-04 10:06:58 +01:00
Ulf Wiger 465a220bfe Update comment about mutex implementation 2022-11-03 13:37:52 +01:00
Ulf Wiger 19140c738b Don't warn for export_all in mrdb_bench 2022-11-03 12:57:54 +01:00
Ulf Wiger bed66b2998 Remove html docs 2022-11-03 12:54:50 +01:00
Ulf Wiger 3635eac717 Test case for inner retries; add mrdb_stats.erl; update docs 2022-11-03 12:41:14 +01:00
Ulf Wiger 95abe4e36e Mutex server with fifo queues 2022-11-01 10:11:20 +01:00
Ulf Wiger 7c729bd932 Use a serializing mutex 2022-11-01 10:09:48 +01:00
Ulf Wiger 4489e5d743 Merge pull request #34 from aeternity/uw-batch-release
Don't try to release dummy batch ref
2022-10-31 17:01:15 +01:00
Ulf Wiger d1a6bf22d5 Don't try to release dummy batch ref 2022-10-31 16:48:23 +01:00
Ulf Wiger b65e82ed71 Merge pull request #33 from aeternity/uw-batch-on-demand
Begin dirty activity with batch ref dummy
2022-10-26 13:16:48 +02:00
Ulf Wiger ee9e7eac67 Merge pull request #32 from aeternity/uw-push-pop-error
Tx push at retry before mutex instead of after
2022-10-26 13:16:28 +02:00
Ulf Wiger ce2be519b4 Begin dirty activity with batch ref dummy 2022-10-24 15:15:01 +02:00
Ulf Wiger 8073a0daa5 Tx push at retry before mutex instead of after 2022-10-19 13:19:48 +02:00
Ulf Wiger ab15b7f399 Merge pull request #30 from aeternity/otp23_builder
Switch to OTP23
2022-08-29 12:28:59 +02:00
Dincho Todorov a75f6e0c43 Switch to OTP23 2022-08-29 12:40:50 +03:00
Ulf Wiger 1340bb2050 Change hut dep to 1.4.0 2022-08-15 17:16:29 +02:00
Ulf Wiger d1177b6ad4 Merge pull request #27 from aeternity/uw-accept-repeated-create
Ignore certain mnesia_dumper close_table requests
2022-08-03 09:49:58 +02:00
Ulf Wiger b908998e6b Check pdict for dumper state at close_table 2022-08-02 17:07:53 +02:00
Ulf Wiger dfc0125800 More dets-like load/close behavior 2022-07-25 15:33:00 +02:00
Ulf Wiger 296abb23bb Fix double encoding of info data 2022-07-22 16:33:32 +02:00
Ulf Wiger 7057f4dcbd Remove rocksdb opts refault, safer index consistency check 2022-07-22 15:31:55 +02:00
Ulf Wiger c4235be94a Be more lenient about mnesia asking more than once 2022-07-15 14:14:06 +02:00
Ulf Wiger 302aa1252b Merge pull request #26 from aeternity/uw-fix-db-migration
Migration chunk handling was broken. Add progress reporting support
2022-07-13 12:50:38 +02:00
Ulf Wiger fde2e1194e Fix type to satisfy Dialyzer 2022-07-13 11:35:40 +02:00
Ulf Wiger c4f7b7ac02 Migration chunk handling was broken. Add progress reporting support 2022-07-13 11:19:47 +02:00
Ulf Wiger 226a3b8e91 Merge pull request #25 from shahryarjb/patch-1
Add Erlang flag for README block code
2022-07-11 16:15:13 +02:00
Shahryar Tavakkoli 73784fe765 Add Erlang flag for README block code 2022-07-11 18:35:45 +04:30
21 changed files with 1351 additions and 487 deletions
+23 -3
View File
@@ -2,15 +2,35 @@ version: 2.1
executors:
aebuilder:
parameters:
otp:
type: string
docker:
- image: aeternity/builder
- image: aeternity/builder:focal-<< parameters.otp >>
user: builder
environment:
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
jobs:
build:
executor: aebuilder
build_and_test:
parameters:
otp:
type: string
executor:
name: aebuilder
otp: << parameters.otp >>
steps:
- checkout
- run: make test
- store_artifacts:
path: _build/test/logs
workflows:
commit:
jobs:
- build_and_test:
matrix:
parameters:
otp: ["otp24", "otp25", "otp26"]
+17 -5
View File
@@ -80,7 +80,7 @@ RocksDB supports a number of customization options. These can be specified
by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`,
for example:
```
```erlang
mnesia:create_table(foo, [{rocksdb_copies, [node()]},
...
{user_properties,
@@ -93,7 +93,7 @@ for information on configuration parameters. Also see the section below on handl
The default configuration for tables in `mnesia_rocksdb` is:
```
```erlang
default_open_opts() ->
[ {create_if_missing, true}
, {cache_size,
@@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
it yourself. If you only need the size occasionally, you may traverse the
table to count the elements.
When `mrdb` transactions abort, they will return a stacktrace caught
from within the transaction fun, giving much better debugging info.
This is different from how mnesia does it.
If behavior closer to mnesia's abort returns are needed, say, for backwards
compatibility, this can be controlled by setting the environment variable
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
For really performance-critical transactions which may abort often, it might
make a difference to set this option to `true`, since there is a cost involved
in producing stacktraces.
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
@@ -195,7 +207,7 @@ our example. It returns a list of index terms.
Given the following index plugin implementation:
```
```erlang
-module(words).
-export([words_f/3]).
@@ -212,7 +224,7 @@ words_(_) ->
We can register the plugin and use it in table definitions:
```
```erlang
Eshell V12.1.3 (abort with ^G)
1> mnesia:start().
ok
@@ -228,7 +240,7 @@ as an exported function along the node's code path.
To see what happens when we insert an object, we can turn on call trace.
```
```erlang
4> dbg:tracer().
{ok,<0.108.0>}
5> dbg:tp(words, x).
+2 -1
View File
@@ -2,4 +2,5 @@
{application,mnesia_rocksdb}.
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
+6 -2
View File
@@ -4,8 +4,8 @@
{deps,
[
{sext, "1.8.0"},
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
{hut, "1.3.0"}
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
{hut, "1.4.0"}
]}.
{xref_checks, [
@@ -14,6 +14,10 @@
deprecated_function_calls
]}.
{dialyzer, [{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
]}.
{profiles,
[
{test,
-7
View File
@@ -1,11 +1,4 @@
%% -*- erlang-mode -*-
case os:getenv("ERLANG_ROCKSDB_OPTS") of
false ->
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
_ ->
%% If manually set, we assume it's throught through
skip
end.
case os:getenv("DEBUG") of
"true" ->
Opts = proplists:get_value(erl_opts, CONFIG, []),
+5 -5
View File
@@ -1,15 +1,15 @@
{"1.2.0",
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
{<<"rocksdb">>,
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
{git,"https://github.com/emqx/erlang-rocksdb.git",
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
0},
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
[
{pkg_hash,[
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
{pkg_hash_ext,[
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
].
BIN
View File
Binary file not shown.
+16 -27
View File
@@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
i_show_table(I, Move, Limit, Ref) ->
case rocksdb:iterator_move(I, Move) of
{ok, EncKey, EncVal} ->
{Type,Val} =
case EncKey of
<< ?INFO_TAG, K/binary >> ->
K1 = decode_key(K, Ref),
V = decode_val(EncVal, K1, Ref),
{info,V};
_ ->
K = decode_key(EncKey, Ref),
V = decode_val(EncVal, K, Ref),
{data,V}
end,
io:fwrite("~p: ~p~n", [Type, Val]),
K = decode_key(EncKey, Ref),
Val = decode_val(EncVal, K, Ref),
io:fwrite("~p~n", [Val]),
i_show_table(I, next, Limit-1, Ref);
_ ->
ok
@@ -346,14 +337,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
semantics(_Alias, _) -> undefined.
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
case info(Alias, Tab, {index_consistent, PosInfo}) of
true -> true;
_ -> false
end.
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
when is_boolean(Bool) ->
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
%% PRIVATE FUN
@@ -454,8 +442,13 @@ close_table(Alias, Tab) ->
error ->
ok;
_ ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab)
case get(mnesia_dumper_dets) of
undefined ->
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
close_table_(Alias, Tab);
_ ->
ok
end
end.
close_table_(Alias, Tab) ->
@@ -673,11 +666,7 @@ select(_Alias, Tab, Ms, Limit) when Limit==infinity; is_integer(Limit) ->
slot(_Alias, Tab, Pos) when is_integer(Pos), Pos >= 0 ->
#{semantics := Sem} = Ref = get_ref(Tab),
Start = case Ref of
#{type := standalone, vsn := 1} -> <<?DATA_START>>;
_ -> first
end,
First = fun(I) -> rocksdb:iterator_move(I, Start) end,
First = fun(I) -> rocksdb:iterator_move(I, first) end,
F = case Sem of
bag -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end;
_ -> fun(I) -> slot_iter_set(First(I), I, 0, Pos, Ref) end
@@ -798,7 +787,7 @@ handle_call({create_table, Tab, Props}, _From,
exit:{aborted, Error} ->
{reply, {aborted, Error}, St}
end;
handle_call({load_table, _LoadReason, Props}, _From,
handle_call({load_table, _LoadReason, Props}, _,
#st{alias = Alias, tab = Tab} = St) ->
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
{reply, ok, St#st{status = active}};
@@ -826,7 +815,7 @@ handle_call({delete, Key}, _From, St) ->
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
Res = mrdb:match_delete(get_ref(Tab), Pat),
{reply, Res, St};
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
{reply, ok, St#st{status = undefined}};
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
+203 -227
View File
@@ -18,7 +18,8 @@
, clear_table/1
]).
-export([ migrate_standalone/2 ]).
-export([ migrate_standalone/2
, migrate_standalone/3 ]).
-export([ start_link/0
, init/1
@@ -32,6 +33,7 @@
, read_info/2 %% (Alias, Tab)
, read_info/4 %% (Alias, Tab, Key, Default)
, write_info/4 %% (Alias, Tab, Key, Value)
, delete_info/3 %% (Alias, Tab, Key)
, write_table_property/3 %% (Alias, Tab, Property)
]).
@@ -66,6 +68,8 @@
-type cf() :: mrdb:db_ref().
-type rpt() :: undefined | map().
-type req() :: {create_table, table(), properties()}
| {delete_table, table()}
| {load_table, table(), properties()}
@@ -74,7 +78,7 @@
| {add_aliases, [alias()]}
| {write_table_property, tabname(), tuple()}
| {remove_aliases, [alias()]}
| {migrate, [{tabname(), map()}]}
| {migrate, [{tabname(), map()}], rpt()}
| {prep_close, table()}
| {close_table, table()}
| {clear_table, table() | cf() }.
@@ -246,57 +250,37 @@ get_info_res(Res, Default) ->
error(E)
end.
%% Admin info: metadata written by the admin proc to keep track of
%% the derived status of tables (such as detected version and encoding
%% of existing standalone tables.)
%%
write_admin_info(K, V, Alias, Name) ->
mrdb:rdb_put(get_ref({admin, Alias}),
admin_info_key(K, Name),
term_to_binary(V)).
read_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
_ ->
error
end.
delete_admin_info(K, Alias, Name) ->
EncK = admin_info_key(K, Name),
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
admin_info_key(K, Name) ->
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
%% Table metadata info maintained by users
%%
write_info(Alias, Tab, K, V) ->
write_info_(get_ref({admin, Alias}), Tab, K, V).
write_info_(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
maybe_write_standalone_info(Ref, K, V),
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
write_info_encv(Ref, Tab, K, term_to_binary(V)).
maybe_write_standalone_info(Ref, K, V) ->
case Ref of
#{type := standalone, vsn := 1, db_ref := DbRef} ->
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
Key = <<?INFO_TAG, EncK/binary>>,
EncV = mnesia_rocksdb_lib:encode_val(V, term),
rocksdb:put(DbRef, Key, EncV, []);
_ ->
ok
end.
write_info_encv(Ref, Tab, K, V) ->
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
mrdb:rdb_put(Ref, EncK, V, []).
delete_info(Alias, Tab, K) ->
delete_info_(get_ref({admin, Alias}), Tab, K).
delete_info_(Ref, Tab, K) ->
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
mrdb:rdb_delete(Ref, EncK, []).
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}).
migrate_standalone(Alias, Tabs) ->
call(Alias, {migrate, Tabs}).
migrate_standalone(Alias, Tabs, undefined).
migrate_standalone(Alias, Tabs, Rpt0) ->
Rpt = case Rpt0 of
undefined -> undefined;
To -> #{to => To, tag => migrate_standalone}
end,
call(Alias, {migrate, Tabs, Rpt}).
-spec call(alias() | [], req()) -> no_return() | any().
call(Alias, Req) ->
@@ -313,13 +297,13 @@ call(Alias, Req, Timeout) ->
end.
start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
_ = maybe_initial_meta(), %% bootstrap pt
Opts = default_opts(),
process_flag(trap_exit, true),
ensure_cf_cache(),
mnesia:subscribe({table, schema, simple}),
{ok, recover_state(#st{default_opts = Opts})}.
@@ -394,12 +378,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
%% We need to store the persistent ref explicitly here,
%% since mnesia knows nothing of our admin table.
AdminTab = {admin, Alias},
Stats = mrdb_stats:new(),
CfI = update_cf_info(AdminTab, #{ status => open
, name => AdminTab
, vsn => ?VSN
, encoding => {sext,{value,term}}
, attr_pos => #{key => 1,
value => 2}
, stats => Stats
, mountpoint => MP
, properties =>
#{ attributes => [key, val]
@@ -499,12 +485,17 @@ intersection(A, B) ->
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, St1};
{error, _} = Error ->
{reply, Error, St}
case find_cf(Alias, Name, Backend, St) of
{ok, TRec} ->
{reply, {ok, TRec}, St};
error ->
case create_trec(Alias, Name, Props, Backend, St) of
{ok, NewCf} ->
St1 = update_cf(Alias, Name, NewCf, St),
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
{error, _} = Error ->
{reply, Error, St}
end
end;
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
try
@@ -588,10 +579,10 @@ handle_req(Alias, {write_table_property, Tab, Prop}, Backend, St) ->
_ ->
{reply, {error, not_found}, St}
end;
handle_req(Alias, {migrate, Tabs0}, Backend, St) ->
case prepare_migration(Alias, Tabs0, St) of
handle_req(Alias, {migrate, Tabs0, Rpt}, Backend, St) ->
case prepare_migration(Alias, Tabs0, Rpt, St) of
{ok, Tabs} ->
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, St),
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, Rpt, St),
{reply, Res, St1};
{error, _} = Error ->
{reply, Error, St}
@@ -617,8 +608,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
case {Op, lists:member(I, Index)} of
{delete, true} ->
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
delete_info(Alias, Main, {index_consistent, I}),
maybe_update_pt(Main, CfM1),
update_cf(Alias, Main, CfM1, St);
{create, _} ->
%% Due to a previous bug, this marker might linger
%% In any case, it mustn't be there for a newly created index
delete_info(Alias, Main, {index_consistent, I}),
St;
_ ->
St
end;
@@ -629,6 +626,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
maybe_update_main(_, _, _, St) ->
St.
%% The pt may not have been created yet. If so, don't do it here.
maybe_update_pt(Name, Ref) ->
case get_pt(Name, error) of
@@ -767,25 +765,26 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
false ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
{false, MP} ->
create_table_as_standalone(Alias, Name, true, MP, TRec, St);
create_table_as_standalone(Alias, Name, MP, TRec, St);
{true, MP} ->
?log(debug, "will create ~p as standalone and migrate", [Name]),
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
case create_table_as_standalone(Alias, Name, MP, TRec, St) of
{ok, OldTRec, _} ->
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, St);
?log(info, "Migrating ~p to column family", [Name]),
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
_Other ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St)
end
end;
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
{Exists, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
{_, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, MP, TRec, St).
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, St) ->
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
?log(debug, "Migrate to cf (~p)", [Name]),
{ok, NewCf, St1} = create_table_as_cf(
Alias, Name, TRec#{db_ref => DbRef}, St),
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, St1),
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, Rpt, St1),
{ok, NewCf, St2}.
%% Return {Migrate, MP} iff table exists standalone; just false if it doesn't
@@ -802,17 +801,33 @@ should_we_migrate_standalone(#{name := Name}) ->
false
end.
prepare_migration(Alias, Tabs, St) ->
prepare_migration(Alias, Tabs, Rpt, St) ->
Res = lists:map(fun(T) ->
prepare_migration_(Alias, T, St)
end, Tabs),
Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St),
case [E || {error, _} = E <- Res1] of
[] -> {ok, Res1};
[] ->
rpt(Rpt, "Will migrate ~p~n", [[T || {T,_,_} <- Res1]]),
{ok, Res1};
[_|_] = Errors ->
rpt(Rpt, "Errors encountered: ~p~n", [Errors]),
{error, Errors}
end.
rpt(Rpt, Fmt, Args) ->
rpt(Rpt, erlang:system_time(millisecond), Fmt, Args).
rpt(undefined, _, _, _) -> ok;
rpt(#{to := Rpt} = R, Time, Fmt, Args) ->
Rpt ! {mnesia_rocksdb, report, R#{time => Time, fmt => Fmt, args => Args}},
ok.
maybe_progress(#{to := To}, C) when C rem 100000 =:= 0 ->
To ! {mnesia_rocksdb, report, progress};
maybe_progress(_, _) ->
ok.
add_related_tabs(Ts, Backend, Alias, St) ->
lists:flatmap(
fun({error,_} = E) -> [E];
@@ -839,29 +854,31 @@ prepare_migration_(Alias, T, #st{} = St) ->
{error, {no_such_table, TName}}
end.
do_migrate_tabs(Alias, Tabs, Backend, St) ->
do_migrate_tabs(Alias, Tabs, Backend, Rpt, St) ->
lists:mapfoldl(fun(T, St1) ->
do_migrate_table(Alias, T, Backend, St1)
do_migrate_table(Alias, T, Backend, Rpt, St1)
end, St, Tabs).
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, St) when is_map(TRec0) ->
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, Rpt, St) when is_map(TRec0) ->
T0 = erlang:system_time(millisecond),
rpt(Rpt, T0, "Migrate ~p~n", [Name]),
TRec = maps:without([encoding, vsn], TRec0),
maybe_write_user_props(TRec),
{ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec,
TRec, Backend, St),
TRec, Backend, Rpt, St),
put_pt(Name, CF),
T1 = erlang:system_time(millisecond),
rpt(Rpt, T1, "~nDone (~p)~n", [Name]),
Time = T1 - T0,
io:fwrite("~p migrated, ~p ms~n", [Name, Time]),
{{Name, {ok, Time}}, St1}.
migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
#st{standalone = Ts} = St) ->
Rpt, #st{standalone = Ts} = St) ->
ChunkSz = chunk_size(TRec),
KeyPos = mnesia_rocksdb_lib:keypos(T),
migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz),
TRec, OldTRec, KeyPos),
TRec, OldTRec, KeyPos, set_count(0, Rpt)),
case maps:is_key({Alias,T}, Ts)
andalso table_is_empty(OldTRec) of
true ->
@@ -871,27 +888,36 @@ migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
{ok, St}
end.
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos) ->
mrdb:as_batch(
Cf,
fun(New) ->
mrdb:as_batch(
DbRec,
fun(Old) ->
lists:foreach(
fun(Obj) ->
mrdb:insert(New, Obj),
mrdb:delete(Old, element(KeyPos,Obj))
end, L)
end)
end),
migrate_to_cf(cont(Cont), Cf, DbRec, KeyPos);
migrate_to_cf('$end_of_table', _, _, _) ->
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos, Rpt) ->
Count0 = get_count(Rpt),
Count = mrdb:as_batch(
Cf,
fun(New) ->
mrdb:as_batch(
DbRec,
fun(Old) ->
lists:foldl(
fun(Obj, C) ->
mrdb:insert(New, Obj),
mrdb:delete(Old, element(KeyPos,Obj)),
maybe_progress(Rpt, C),
C + 1
end, Count0, L)
end)
end),
migrate_to_cf(mrdb_select:select(Cont), Cf, DbRec, KeyPos, set_count(Count, Rpt));
migrate_to_cf('$end_of_table', _, _, _, _) ->
ok.
cont('$end_of_table' = E) -> E;
cont(F) when is_function(F,0) ->
F().
get_count(undefined) ->
0;
get_count(R) when is_map(R) ->
maps:get({count}, R, 0).
set_count(_, undefined) ->
undefined;
set_count(Count, R) when is_map(R) ->
R#{{count} => Count}.
chunk_size(_) ->
300.
@@ -1032,29 +1058,12 @@ table_exists_as_standalone(Name) ->
end,
{Exists, MP}.
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
{ok, #{type := standalone, vsn := Vsn1,
encoding := Enc1} = Cf, _St1} = Ok ->
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
Alias, Name),
case Vsn1 of
1 ->
load_info(Alias, Name, Cf);
_ ->
skip
end,
Ok;
Other ->
Other
end.
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
create_table_as_standalone(Alias, Name, MP, TRec, St) ->
Vsn = check_version(TRec),
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St).
do_open_standalone(true, Alias, Name, MP, TRec1, St).
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
#st{standalone = Ts} = St) ->
Opts = rocksdb_opts_from_trec(TRec0),
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
@@ -1064,135 +1073,24 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
DbRec1 = DbRec#{ cfs => CfNames,
mountpoint => MP },
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
TRec1 = guess_table_vsn_and_encoding(Exists, TRec),
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
{error, _} = Err ->
?log(debug, "open_db error: ~p", [Err]),
Err
end.
%% When opening a standalone table, chances are it's a legacy table
%% where legacy encoding is already in place. We try to read the
%% first object and apply legacy encoding. If successful, we set
%% legacy encoding in the TRec. If we migrate the data to a column
%% family, we should apply the defined encoding for the cf.
%%
%% The first object can either be an info object (in the legacy case)
%% or a data object, with a sext-encoded key, and a term_to_binary-
%% encoded object as value, where the key position is set to [].
%% The info objects would be sext-encoded key + term-encoded value.
guess_table_vsn_and_encoding(false, TRec) ->
TRec;
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
alias := Alias, name := Name} = R) ->
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
{ok, {V, E}} ->
R#{vsn => V, encoding => E};
error ->
R1 = set_default_guess(R),
mrdb:with_rdb_iterator(
R1, fun(I) ->
guess_table_vsn_and_encoding_(
mrdb:rdb_iterator_move(I, first), I, As, R1)
end)
end.
set_default_guess(#{type := standalone} = R) ->
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
1 ->
R#{vsn => 1, encoding => {sext, {object, term}}};
V ->
R#{vsn => V}
end.
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
Arity = length(As) + 1,
case K of
<<?INFO_TAG, EncK/binary>> ->
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
mnesia_rocksdb_lib:decode(V, term)},
%% This is a vsn 1 standalone table
R#{vsn => 1, encoding => {sext, {object, term}}}
catch
error:_ ->
R
end;
_ ->
Enc = guess_obj_encoding(K, V, Arity),
R#{encoding => Enc}
end;
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
R.
guess_obj_encoding(K, V, Arity) ->
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
guess_encoding(Bin) ->
try {sext, sext:decode(Bin)}
catch
error:_ ->
try {term, binary_to_term(Bin)}
catch
error:_ -> raw
end
end.
guess_key_encoding(Bin) ->
case guess_encoding(Bin) of
raw -> raw;
{Enc, _} -> Enc
end.
guess_val_encoding(Bin, Arity) ->
case guess_encoding(Bin) of
raw -> {value, raw};
{Enc, Term} ->
if is_tuple(Term), size(Term) == Arity,
element(2, Term) == [] ->
{object, Enc};
true ->
{value, Enc}
end
end.
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
%% for the presence of some metadata, and still considers it empty if there
%% is no user data.
table_is_empty(#{} = DbRec) ->
Start = iterator_data_start(DbRec),
mrdb:with_rdb_iterator(
DbRec, fun(I) ->
case mrdb:rdb_iterator_move(I, Start) of
case mrdb:rdb_iterator_move(I, first) of
{ok, _, _} -> false;
_ -> true
end
end).
iterator_data_start(#{vsn := 1}) -> <<?DATA_START>>;
iterator_data_start(_) -> first.
load_info(Alias, Name, Cf) ->
ARef = get_ref({admin, Alias}),
mrdb:with_rdb_iterator(
Cf, fun(I) ->
load_info_(rocksdb:iterator_move(I, first), I, ARef, Name)
end).
load_info_(Res, I, ARef, Tab) ->
case Res of
{ok, << ?INFO_TAG, K/binary >>, V} ->
DecK = mnesia_rocksdb_lib:decode_key(K),
case read_info_(ARef, Tab, DecK, undefined) of
undefined ->
write_info_(ARef, Tab, DecK, V);
_ ->
skip
end,
load_info_(rocksdb:iterator_move(I, next), I, ARef, Tab);
_ ->
ok
end.
check_version(TRec) ->
user_property(mrdb_version, TRec, ?VSN).
@@ -1251,7 +1149,7 @@ rocksdb_opts_from_trec(TRec) ->
create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
CfName = tab_to_cf_name(Name),
case rocksdb:create_column_family(DbRef, CfName, cfopts()) of
case create_column_family(DbRef, CfName, cfopts(), R) of
{ok, CfH} ->
R1 = check_version_and_encoding(R#{ cf_handle => CfH
, type => column_family }),
@@ -1260,6 +1158,82 @@ create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
Error
end.
create_column_family(DbRef, CfName, CfOpts, R) ->
Res = case column_family_exists(CfName, R) of
true ->
case find_active_cf_handle(DbRef, CfName) of
error ->
{error, {no_handle_for_existing_cf, CfName}};
{ok, _} = Ok ->
Ok
end;
false ->
rocksdb:create_column_family(DbRef, CfName, CfOpts)
end,
maybe_note_active_cf(Res, CfName),
Res.
column_family_exists(CfName, #{mountpoint := MP}) ->
case rocksdb:list_column_families(MP, []) of
{ok, CFs} ->
lists:member(CfName, CFs);
_ ->
false
end;
column_family_exists(CfName, #{alias := Alias}) ->
case get_ref({admin, Alias}, error) of
error ->
false;
Adm ->
column_family_exists(CfName, Adm)
end.
%% Column family handle caching ======================================================
%%
%% At least as far as I can tell, there is no way to query erlang-rocksdb for currently
%% active column family handles. This can become an issue e.g. during table migration
%% from standalone to column families, where the meta structures aren't updated until
%% after completed migration. A transient error during migration should be addressable
%% by simply retrying, but if the CF has already been created, and we've lost the handle,
%% there is no easy way to get it back. Unfortunately, if we start caching CFs, we also
%% need to garbage collect them.
%%
-define(CFH_CACHE, mnesia_rocksdb_cf_handle_cache).
ensure_cf_cache() ->
case ets:info(?CFH_CACHE, name) of
undefined ->
ets:new(?CFH_CACHE, [ordered_set, public, named_table]);
_ ->
true
end.
maybe_note_active_cf({ok, CfH}, CfName) ->
ets:insert(?CFH_CACHE, {{CfName, CfH}});
maybe_note_active_cf(_, _) ->
false.
find_active_cf_handle(DbRef, CfName) ->
Candidates = ets:select(?CFH_CACHE, [{{{CfName,'$1'}}, [], ['$1']}]),
lists:foldl(fun(CfH, Acc) -> check_cfh(DbRef, CfH, CfName, Acc) end, error, Candidates).
check_cfh(DbRef, CfH, CfName, Acc) ->
case rocksdb:iterator(DbRef, CfH, []) of
{ok, I} ->
rocksdb:iterator_close(I),
{ok, CfH};
{error, _} ->
ets:delete(?CFH_CACHE, {CfName, CfH}),
Acc
end.
drop_cached_cf(CfName, CfH) ->
ets:delete(?CFH_CACHE, {CfName, CfH}).
%%
%% ===================================================================================
do_prep_close(Name, Backend, St) ->
RelTabs = get_related_resources(Name, Backend),
erase_pt_list([Name | RelTabs]),
@@ -1302,6 +1276,7 @@ do_delete_table(Alias, Name, Backend, #st{} = St) ->
case Where of
#{db_ref := DbRef, cf_handle := CfH, type := column_family} ->
rocksdb:drop_column_family(DbRef, CfH),
drop_cached_cf(tab_to_cf_name(Name), CfH),
rocksdb:destroy_column_family(DbRef, CfH),
{ok, delete_cf(Alias, Name, St)};
#{type := standalone} = R ->
@@ -1327,24 +1302,26 @@ open_db_(MP, Alias, Opts, CFs0, CreateIfMissing) ->
%% not yet created
CFs = cfs(CFs0),
file:make_dir(MP),
OpenOpts = [ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ],
OpenRes = mnesia_rocksdb_lib:open_rocksdb(MP, OpenOpts, CFs),
OpenRes = rocksdb_open(MP, Opts, CFs),
map_cfs(OpenRes, CFs, Alias, Acc0);
false ->
{error, enoent};
true ->
%% Assumption: even an old rocksdb database file will have at least "default"
{ok,CFs} = rocksdb:list_column_families(MP, Opts),
CFs1 = [{CF,[]} || CF <- CFs], %% TODO: this really needs more checking
CFs1 = [{CF, cfopts()} || CF <- CFs], %% TODO: this really needs more checking
map_cfs(rocksdb_open(MP, Opts, CFs1), CFs1, Alias, Acc0)
end.
open_opts(Opts) ->
[ {create_if_missing, true}
, {create_missing_column_families, true}
, {merge_operator, erlang_merge_operator}
| Opts ].
rocksdb_open(MP, Opts, CFs) ->
%% rocksdb:open(MP, Opts, CFs),
mnesia_rocksdb_lib:open_rocksdb(MP, Opts, CFs).
mnesia_rocksdb_lib:open_rocksdb(MP, open_opts(Opts), CFs).
is_open(Alias, #st{backends = Bs}) ->
case maps:find(Alias, Bs) of
@@ -1443,7 +1420,6 @@ close_and_delete_standalone(#{alias := Alias,
case get_table_mountpoint(Alias, Name, St) of
{ok, MP} ->
close_and_delete(DbRef, MP),
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
error ->
St
+2 -1
View File
@@ -42,4 +42,5 @@ start_link() ->
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
+198 -93
View File
@@ -53,6 +53,7 @@
, delete/2 , delete/3
, delete_object/2, delete_object/3
, match_delete/2
, merge/3 , merge/4
, clear_table/1
, batch_write/2 , batch_write/3
, update_counter/3, update_counter/4
@@ -60,14 +61,17 @@
, get_batch/1
, snapshot/1
, release_snapshot/1
, first/1 , first/2
, next/2 , next/3
, prev/2 , prev/3
, last/1 , last/2
, select/2 , select/3
, first/1 , first/2
, next/2 , next/3
, prev/2 , prev/3
, last/1 , last/2
, select/2 , select/3
, select_reverse/2, select_reverse/3
, select/1
, fold/3 , fold/4 , fold/5
, rdb_fold/4 , rdb_fold/5
, fold/3 , fold/4 , fold/5
, fold_reverse/3 , fold_reverse/4, fold_reverse/5
, rdb_fold/4 , rdb_fold/5
, rdb_fold_reverse/4, rdb_fold_reverse/5
, write_info/3
, read_info/2
, read_info/1
@@ -105,6 +109,8 @@
-include("mnesia_rocksdb.hrl").
-include("mnesia_rocksdb_int.hrl").
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
-type tab_name() :: atom().
-type alias() :: atom().
-type admin_tab() :: {admin, alias()}.
@@ -115,7 +121,9 @@
| index()
| retainer().
-type retries() :: non_neg_integer().
-type inner() :: non_neg_integer().
-type outer() :: non_neg_integer().
-type retries() :: outer() | {inner(), outer()}.
%% activity type 'ets' makes no sense in this context
-type mnesia_activity_type() :: transaction
@@ -141,7 +149,7 @@
-type tx_activity() :: #{ type := 'tx'
, handle := tx_handle()
, attempt := non_neg_integer() }.
, attempt := 'undefined' | retries() }.
-type batch_activity() :: #{ type := 'batch'
, handle := batch_handle() }.
-type activity() :: tx_activity() | batch_activity().
@@ -246,7 +254,7 @@ release_snapshot(SHandle) ->
%% <li> `{tx, TxOpts}' - A `rocksdb' transaction with sligth modifications</li>
%% <li> `batch' - A `rocksdb' batch operation</li>
%% </ul>
%%
%%
%% By default, transactions are combined with a snapshot with 1 retry.
%% The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
%% A transaction will be retried if it detects that the commit set conflicts with recent changes.
@@ -254,6 +262,14 @@ release_snapshot(SHandle) ->
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
%% the commit set. It will then be re-run again, until the retry count is exhausted.
%%
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
%% attempt, but only on outer retries (the first retry is always an outer retry).
%%
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
%%
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
@@ -269,41 +285,92 @@ activity(Type, Alias, F) ->
#{ alias => Alias
, db_ref => DbRef }, TxCtxt);
batch ->
Batch = get_batch_(DbRef),
Batch = init_batch_ref(DbRef),
#{ activity => #{ type => batch
, handle => Batch }
, alias => Alias
, db_ref => DbRef }
end,
do_activity(F, Alias, Ctxt, false).
do_activity(F, Alias, Ctxt).
do_activity(F, Alias, Ctxt, WithLock) ->
try run_f(F, Ctxt, WithLock, Alias) of
Res ->
try commit_and_pop(Res)
catch
throw:{?MODULE, busy} ->
do_activity(F, Alias, Ctxt, true)
end
do_activity(F, Alias, Ctxt) ->
try try_f(F, Ctxt)
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt)
end.
try_f(F, Ctxt) ->
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
try_f(false, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err:T ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult. Incompatible with mnesia, though.
abort_and_pop(Cat, {Err, T})
end;
try_f(true, F, Ctxt) ->
try run_f(F, Ctxt) of
Res ->
commit_and_pop(Res)
catch
throw:Something ->
abort_and_pop(throw, Something);
Cat:Err ->
%% Without capturing the stacktract here,
%% debugging gets pretty difficult
abort_and_pop(Cat, Err)
end.
run_f(F, Ctxt, false, _) ->
push_ctxt(Ctxt),
F();
run_f(F, Ctxt, true, Alias) ->
mrdb_mutex:do(
Alias,
fun() ->
push_ctxt(incr_attempt(Ctxt)),
F()
end).
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
run_f(F, Ctxt) ->
push_ctxt(Ctxt),
F().
incr_attempt(0, {_,O}) when O > 0 ->
{outer, {0,1}};
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
is_integer(Ri), is_integer(Ro) ->
if I < Ri -> {inner, {I+1, O}};
O < Ro -> {outer, {0, O+1}};
true ->
error
end;
incr_attempt(_, _) ->
error.
retry_activity(F, Alias, #{activity := #{ type := Type
, attempt := A
, retries := R} = Act} = Ctxt) ->
case incr_attempt(A, R) of
{RetryCtxt, A1} ->
Act1 = Act#{attempt := A1},
Ctxt1 = Ctxt#{activity := Act1},
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
catch
throw:{?MODULE, busy} ->
retry_activity(F, Alias, Ctxt1)
end;
error ->
return_abort(Type, error, retry_limit)
end.
retry_activity_(inner, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, inner_retries, 1),
try_f(F, Ctxt);
retry_activity_(outer, F, Alias, Ctxt) ->
mrdb_stats:incr(Alias, outer_retries, 1),
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Act1 = Act#{attempt := A+1, handle := TxH},
Act1 = Act#{handle := TxH},
C1 = C#{ activity := Act1 },
case maps:is_key(snapshot, C) of
true ->
@@ -368,7 +435,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
check_tx_opts(Opts) ->
case maps:without([no_snapshot, retries], Opts) of
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
Other when map_size(Other) > 0 ->
abort({invalid_tx_opts, maps:keys(Other)});
_ ->
@@ -376,10 +443,14 @@ check_tx_opts(Opts) ->
end.
check_retries(#{retries := Retries} = Opts) ->
if is_integer(Retries), Retries >= 0 ->
Opts;
true ->
error({invalid_tx_option, {retries, Retries}})
case Retries of
_ when is_integer(Retries), Retries >= 0 ->
Opts#{retries := {0, Retries}};
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
Inner >= 0, Outer >= 0 ->
Opts;
_ ->
error({invalid_tx_option, {retries, Retries}})
end.
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
@@ -394,7 +465,7 @@ create_tx(Opts, DbRef) ->
{ok, TxH} = rdb_transaction(DbRef, []),
Opts#{activity => maps:merge(Opts, #{ type => tx
, handle => TxH
, attempt => 1})}.
, attempt => 0 })}.
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
case NoSnap of
@@ -414,8 +485,7 @@ commit_and_pop(Res) ->
Res;
{error, {error, "Resource busy" ++ _ = Busy}} ->
case A of
#{retries := Retries, attempt := Att}
when Att =< Retries ->
#{retries := {I,O}} when I > 0; O > 0 ->
throw({?MODULE, busy});
_ ->
error({error, Busy})
@@ -473,10 +543,14 @@ re_throw(Cat, Err) ->
throw -> throw(Err)
end.
mnesia_compatible_aborts() ->
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
Bool;
mnesia_compatible_aborts(_) ->
mnesia_compatible_aborts().
fix_error({aborted, Err}) ->
Err;
fix_error(Err) ->
@@ -486,7 +560,7 @@ rdb_transaction(DbRef, Opts) ->
rocksdb:transaction(DbRef, Opts).
rdb_transaction_commit_and_pop(H) ->
try rdb_transaction_commit(H)
try rdb_transaction_commit(H)
after
pop_ctxt()
end.
@@ -507,10 +581,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
pop_ctxt()
end.
rdb_release_batch(?BATCH_REF_DUMMY) ->
ok;
rdb_release_batch(H) ->
rocksdb:release_batch(H).
%% @doc Aborts an ongoing {@link activity/2}
-spec abort(_) -> no_return().
abort(Reason) ->
case mnesia_compatible_aborts() of
true ->
@@ -529,7 +606,7 @@ new_tx(#{activity := _}, _) ->
new_tx(Tab, Opts) ->
#{db_ref := DbRef} = R = ensure_ref(Tab),
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
tx_ref(Tab, TxH) ->
@@ -539,7 +616,7 @@ tx_ref(Tab, TxH) ->
#{activity := #{type := tx, handle := OtherTxH}} ->
error({tx_handle_conflict, OtherTxH});
R ->
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
end.
-spec tx_commit(tx_handle() | db_ref()) -> ok.
@@ -586,10 +663,25 @@ maybe_tx_ctxt([#{activity := #{type := Type} = A} = C|_], R) ->
inherit_ctxt(Ref, R) ->
maps:merge(Ref, maps:with([snapshot, activity], R)).
%% @doc Create an iterator on table `Tab' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%% @equiv with_iterator(Tab, Fun, [])
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res )) -> Res.
with_iterator(Tab, Fun) ->
with_iterator(Tab, Fun, []).
%% @doc Create an iterator on table `Tab' with `ReadOptions' for the duration of `Fun'
%%
%% The iterator is passed to the provided fun as `Fun(Iterator)', and is
%% closed once the fun terminates.
%%
%% The iterator respects `mnesia_rocksdb' metadata, so accesses through the iterator
%% will return `{ok, Obj}' where `Obj' is the complete decoded object.
%% For rocksdb-level iterators, see {@link with_rdb_iterator/3}.
%% @end
-spec with_iterator(ref_or_tab(), fun( (mrdb_iterator()) -> Res ), read_options()) -> Res.
with_iterator(Tab, Fun, Opts) ->
R = ensure_ref(Tab),
@@ -666,6 +758,21 @@ insert(Tab, Obj0, Opts) ->
EncVal = encode_val(Obj, Ref),
insert_(Ref, Key, encode_key(Key, Ref), EncVal, Obj, Opts).
merge(Tab, Key, MergeOp) ->
merge(Tab, Key, MergeOp, []).
merge(Tab, Key, MergeOp, Opts) ->
#{encoding := Enc} = Ref = ensure_ref(Tab),
case Enc of
{_, {value, term}} ->
merge_(Ref, Key, MergeOp, Opts);
_ ->
abort(badarg)
end.
merge_(Ref, Key, MergeOp, Opts) ->
rdb_merge(Ref, encode_key(Key), MergeOp, Opts).
validate_obj(Obj, #{mode := mnesia}) ->
Obj;
validate_obj(Obj, #{attr_pos := AP,
@@ -798,7 +905,7 @@ update_index_do_bag(Ixs, Name, R, Key, Obj, Opts) ->
not_found
end
end.
update_index_do([{_Pos,ordered} = Ix|Ixs], Name, R, Key, Obj, Rest, Opts) ->
Tab = {Name, index, Ix},
#{ix_vals_f := IxValsF} = IRef = ensure_ref(Tab, R),
@@ -889,7 +996,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
_ when is_atom(Ix) ->
{attr_pos(Ix, Ref), ordered};
{_} ->
Ix;
{Ix, ordered};
_ when is_integer(Ix) ->
{Ix, ordered}
end,
@@ -970,7 +1077,7 @@ alias_of(Tab) ->
%% and when releasing, all batches are released. This will not ensure
%% atomicity, but there is no way in rocksdb to achieve atomicity
%% across db instances. At least, data should end up where you expect.
%%
%%
%% @end
-spec as_batch(ref_or_tab(), fun( (db_ref()) -> Res )) -> Res.
as_batch(Tab, F) ->
@@ -1010,18 +1117,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
get_batch(_) ->
{error, badarg}.
get_batch_(DbRef) ->
init_batch_ref(DbRef) ->
Ref = make_ref(),
{ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
Ref.
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
get_batch_(DbRef, BatchRef) ->
Key = batch_ref_key(BatchRef),
case pdict_get(Key) of
undefined ->
error(stale_batch_ref);
#{DbRef := Batch} ->
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
Batch;
Map ->
{ok, Batch} = rdb_batch(),
@@ -1050,7 +1159,9 @@ write_batches(BatchRef, Opts) ->
pdict_erase(Key),
ret_batch_write_acc(
maps:fold(
fun(DbRef, Batch, Acc) ->
fun(_, ?BATCH_REF_DUMMY, Acc) ->
Acc;
(DbRef, Batch, Acc) ->
case rocksdb:write_batch(DbRef, Batch, Opts) of
ok ->
Acc;
@@ -1206,6 +1317,13 @@ select(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select(ensure_ref(Tab), Pat, Limit).
select_reverse(Tab, Pat) ->
select_reverse(Tab, Pat, infinity).
select_reverse(Tab, Pat, Limit) when Limit == infinity; is_integer(Limit), Limit > 0 ->
true = valid_limit(Limit),
mrdb_select:select_reverse(ensure_ref(Tab), Pat, Limit).
select(Cont) ->
mrdb_select:select(Cont).
@@ -1260,6 +1378,16 @@ fold(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
fold_reverse(Tab, Fun, Acc) ->
fold_reverse(Tab, Fun, Acc, [{'_', [], ['$_']}]).
fold_reverse(Tab, Fun, Acc, MatchSpec) ->
fold_reverse(Tab, Fun, Acc, MatchSpec, infinity).
fold_reverse(Tab, Fun, Acc, MatchSpec, Limit) ->
true = valid_limit(Limit),
mrdb_select:fold_reverse(ensure_ref(Tab), Fun, Acc, MatchSpec, Limit).
rdb_fold(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold(Tab, Fun, Acc, Prefix, infinity).
@@ -1269,7 +1397,16 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
true = valid_limit(Limit),
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) ->
rdb_fold_reverse(Tab, Fun, Acc, Prefix) when is_function(Fun, 3)
, is_binary(Prefix) ->
rdb_fold_reverse(Tab, Fun, Acc, Prefix, infinity).
rdb_fold_reverse(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
, is_binary(Prefix) ->
true = valid_limit(Limit),
mrdb_select:rdb_fold_reverse(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
valid_limit(L) ->
case L of
infinity ->
true;
@@ -1280,15 +1417,7 @@ valid_limit(L) ->
end.
write_info(Tab, K, V) ->
R = ensure_ref(Tab),
Alias = case R of
#{type := standalone, vsn := 1, alias := A} = TRef ->
%% Also write on legacy info format
write_info_standalone(TRef, K, V),
A;
#{alias := A} ->
A
end,
#{alias := Alias} = ensure_ref(Tab),
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
write_info_(#{} = R, Tab, K, V) ->
@@ -1305,13 +1434,8 @@ read_info(Tab, K) ->
read_info(Tab, K, Default) when K==size; K==memory ->
read_direct_info_(ensure_ref(Tab), K, Default);
read_info(Tab, K, Default) ->
#{alias := Alias} = R = ensure_ref(Tab),
case R of
#{type := standalone, vsn := 1} = TRef ->
read_info_standalone(TRef, K, Default);
#{alias := Alias} ->
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
end.
#{alias := Alias} = ensure_ref(Tab),
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
read_direct_info_(R, memory, _Def) ->
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
@@ -1335,26 +1459,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
%%rocksdb_boolean(<<"1">>) -> true;
%%rocksdb_boolean(<<"0">>) -> false.
write_info_standalone(#{} = R, K, V) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
EncV = term_to_binary(V),
rdb_put(R, EncK, EncV, write_opts(R, [])).
read_info_standalone(#{} = R, K, Default) ->
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
get_info_res(Res, Default) ->
case Res of
not_found ->
Default;
{ok, Bin} ->
%% no fancy tricks when encoding/decoding info values
binary_to_term(Bin);
{error, E} ->
error(E)
end.
%% insert_bag_v2(Ref, K, V, Opts) ->
%% rdb_merge(Ref, K, {list_append, [V]}
@@ -1494,8 +1598,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
rdb_iterator(R, Opts) ->
rdb_iterator_(R, read_opts(R, Opts)).
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
%% Note: versions before 1.8.0 expected DbRef as first argument
rocksdb:transaction_iterator(TxH, CfH, ROpts);
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
rocksdb:iterator(DbRef, CfH, ROpts).
@@ -1506,11 +1611,11 @@ rdb_merge_(#{db_ref := DbRef, cf_handle := CfH}, K, Op, WOpts) ->
rocksdb:merge(DbRef, CfH, K, Op, WOpts).
write_opts(#{write_opts := Os}, Opts) -> Os ++ Opts;
write_opts(_, Opts) ->
write_opts(_, Opts) ->
Opts.
read_opts(#{read_opts := Os}, Opts) -> Os ++ Opts;
read_opts(_, Opts) ->
read_opts(_, Opts) ->
Opts.
-define(EOT, '$end_of_table').
+42 -3
View File
@@ -6,11 +6,14 @@
, iterator_move/2
, iterator/2
, iterator_close/1
, fold/4
, rev_fold/4
, index_ref/2
]).
-record(mrdb_ix_iter, { i :: mrdb:iterator()
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
, type = set :: set | bag
, sub :: mrdb:ref() | pid()
, sub :: pid() | mrdb:db_ref()
}).
-type ix_iterator() :: #mrdb_ix_iter{}.
@@ -19,7 +22,7 @@
-type object() :: tuple().
-record(subst, { i :: mrdb:iterator()
-record(subst, { i :: mrdb:mrdb_iterator()
, vals_f
, cur
, mref }).
@@ -62,6 +65,41 @@ iterator(Tab, IxPos) ->
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Folds over the index table corresponding to Tab and IxPos.
%% The fold fun is called with the index key and the corresponding object,
%% or [] if there is no such object.
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, first, next, FoldFun, Acc).
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
when Acc :: any().
%% Like fold/4 above, but folds over the index in the reverse order.
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
with_iterator(
Tab, IxPos,
fun(I) ->
iter_fold(I, Start, Dir, FoldFun, Acc)
end).
iter_fold(I, Start, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
iter_fold_({error, _}, _, _, _, Acc) ->
Acc.
index_ref(Tab, Pos) ->
TRef = mrdb:ensure_ref(Tab),
ensure_index_ref(Pos, TRef).
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
case mrdb:iterator_move(I, Dir) of
{ok, {{FKey, PKey}}} ->
@@ -83,6 +121,7 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
Other
end.
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
opt_read(R, Key) ->
case mrdb:read(R, Key, []) of
[Obj] ->
+92 -62
View File
@@ -3,79 +3,109 @@
-export([ do/2 ]).
-export([ ensure_tab/0 ]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
%% critical section.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
%% Releasing the resource is done by notifying the server.
%%
%% Previous implementations tested:
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
%% This is a pretty simple implementation, but it lacks ordering, and appeared
%% to sometimes lead to starvation.
%% * A duplicate_bag ets table: These are defined such that insertion order is
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
%% Unfortunately, performance plummeted when tested with > 25 concurrent
%% processes. While this is likely a very high number, given that we're talking
%% about contention in a system using optimistic concurrency, it's never good
%% if performance falls off a cliff.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
try F()
after
release(Rsrc)
mrdb_mutex_serializer:done(Rsrc, Ref)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
-ifdef(TEST).
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
setup() ->
case whereis(mrdb_mutex_serializer) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
false
end.
cleanup(Pid) ->
unlink(Pid),
exit(Pid, kill).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
swarm_do() ->
Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() ->
send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,1000)],
await_pids(Pids),
Results = fetch(Pid),
{incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
_ ->
true
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
Pid ! {self(), result, N};
false ->
exit(not_even)
end
end).
-endif.
+98
View File
@@ -0,0 +1,98 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.
+111 -32
View File
@@ -1,11 +1,15 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(mrdb_select).
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
-export([ select/3 %% (Ref, MatchSpec, Limit)
, select/4 %% (Ref, MatchSpec, AccKeys, Limit)
, select_reverse/3
, select_reverse/4
, select/1 %% (Cont)
, fold/5 %% (Ref, Fun, Acc, MatchSpec, Limit)
, fold_reverse/5
, rdb_fold/5 %% (Ref, Fun, Acc, Prefix, Limit)
, rdb_fold_reverse/5 %% (Ref, Fun, Acc, Prefix, Limit)
]).
-export([continuation_info/2]).
@@ -25,18 +29,27 @@
, compiled_ms
, limit
, key_only = false % TODO: not used
, direction = forward % TODO: not used
, direction = forward
}).
select(Ref, MS, Limit) when is_map(Ref), is_list(MS) ->
select(Ref, MS, false, Limit).
select(Ref, MS, false, forward, Limit).
select(Ref, MS, AccKeys, Limit)
select(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, forward, Limit).
select_reverse(Ref, MS, Limit) ->
select(Ref, MS, false, reverse, Limit).
select_reverse(Ref, MS, AccKeys, Limit) ->
select(Ref, MS, AccKeys, reverse, Limit).
select(Ref, MS, AccKeys, Dir, Limit)
when is_map(Ref), is_list(MS), is_boolean(AccKeys) ->
Sel = mk_sel(Ref, MS, Limit),
Sel = mk_sel(Ref, MS, Dir, Limit),
mrdb:with_rdb_iterator(Ref, fun(I) -> i_select(I, Sel, AccKeys, []) end).
mk_sel(#{name := Tab} = Ref, MS, Limit) ->
mk_sel(#{name := Tab} = Ref, MS, Dir, Limit) ->
Keypat = keypat(MS, keypos(Tab), Ref),
#sel{tab = Tab,
ref = Ref,
@@ -44,6 +57,7 @@ mk_sel(#{name := Tab} = Ref, MS, Limit) ->
ms = MS,
compiled_ms = ets:match_spec_compile(MS),
key_only = needs_key_only(MS),
direction = Dir,
limit = Limit}.
select(Cont) ->
@@ -64,6 +78,12 @@ continuation_info_(direction, #sel{direction = Dir}) -> Dir;
continuation_info_(_, _) -> undefined.
fold(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, forward, Limit).
fold_reverse(Ref, Fun, Acc, MS, Limit) ->
do_fold(Ref, Fun, Acc, MS, reverse, Limit).
do_fold(Ref, Fun, Acc, MS, Dir, Limit) ->
{AccKeys, F} =
if is_function(Fun, 3) ->
{true, fun({K, Obj}, Acc1) ->
@@ -74,7 +94,7 @@ fold(Ref, Fun, Acc, MS, Limit) ->
true ->
mrdb:abort(invalid_fold_fun)
end,
fold_(select(Ref, MS, AccKeys, Limit), F, Acc).
fold_(select(Ref, MS, AccKeys, Dir, Limit), F, Acc).
fold_('$end_of_table', _, Acc) ->
Acc;
@@ -86,12 +106,43 @@ fold_({L, Cont}, Fun, Acc) ->
rdb_fold(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rocksdb:iterator_move(I, first(Ref)),
MovRes = fwd_init_seek(I, Prefix),
i_rdb_fold(MovRes, I, Prefix, Fun, Acc, Limit)
end).
first(#{vsn := 1}) -> <<?DATA_START>>;
first(_) -> first.
rdb_fold_reverse(Ref, Fun, Acc, Prefix, Limit) ->
mrdb:with_rdb_iterator(
Ref, fun(I) ->
MovRes = rev_init_seek(I, Prefix),
i_rdb_fold_reverse(MovRes, I, Prefix, Fun, Acc, Limit)
end).
fwd_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, fwd_init_seek_tgt(Pfx)).
fwd_init_seek_tgt(<<>> ) -> first;
fwd_init_seek_tgt(Prefix) -> {seek, Prefix}.
rev_init_seek(I, Pfx) ->
rocksdb:iterator_move(I, rev_init_seek_tgt(Pfx)).
rev_init_seek_tgt(<<>>) -> last;
rev_init_seek_tgt(Prefix) ->
case incr_prefix(Prefix) of
last -> last;
Pfx1 when is_binary(Pfx1) ->
{seek, Pfx1}
end.
incr_prefix(<<>>) -> last;
incr_prefix(Pfx) when is_binary(Pfx) ->
PfxI = binary:decode_unsigned(Pfx),
MaxI = (1 bsl (byte_size(Pfx) * 8)) - 1,
case PfxI + 1 of
I1 when I1 >= MaxI -> last;
I1 ->
binary:encode_unsigned(I1)
end.
i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
@@ -104,18 +155,35 @@ i_rdb_fold({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
i_rdb_fold(_, _, _, _, Acc, _) ->
Acc.
i_rdb_fold_reverse({ok, K, V}, I, Pfx, Fun, Acc, Limit) when Limit > 0 ->
case is_prefix(Pfx, K) of
true ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun,
Fun(K, V, Acc), decr(Limit));
false when K > Pfx ->
i_rdb_fold_reverse(rocksdb:iterator_move(I, prev), I, Pfx, Fun, Acc, Limit);
false ->
Acc
end;
i_rdb_fold_reverse(_, _, _, _, Acc, _) ->
Acc.
i_select(I, #sel{ keypat = Pfx
, compiled_ms = MS
, limit = Limit
, ref = #{vsn := Vsn, encoding := Enc} } = Sel, AccKeys, Acc) ->
StartKey = case {Pfx, Vsn, Enc} of
{<<>>, 1, {sext, _}} ->
<<?DATA_START>>;
{_, _, {term, _}} ->
<<>>;
_ ->
Pfx
end,
, direction = Dir
, ref = #{encoding := Enc} } = Sel0, AccKeys, Acc) ->
{StartKey, Sel} = case Enc of
{term, _} ->
%% No defined ordering - do forward select
{first, Sel0#sel{direction = forward}};
_ ->
SK = case Dir of
forward -> fwd_init_seek_tgt(Pfx);
reverse -> rev_init_seek_tgt(Pfx)
end,
{SK, Sel0}
end,
select_traverse(rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc).
@@ -193,7 +261,7 @@ map_vars([H|T], P) ->
map_vars([], _) ->
[].
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R, direction = Dir} = Sel,
AccKeys, Acc) ->
case is_prefix(Pfx, K) of
true ->
@@ -202,7 +270,7 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
case ets:match_spec_run([Rec], MS) of
[] ->
select_traverse(
rocksdb:iterator_move(I, next), Limit, Pfx, MS,
rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
[Match] ->
Acc1 = if AccKeys ->
@@ -212,6 +280,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end;
false when Dir == reverse, K > Pfx ->
select_traverse(rocksdb:iterator_move(I, prev), Limit, Pfx, MS,
I, Sel, AccKeys, Acc);
false when Limit == infinity ->
lists:reverse(Acc);
false ->
@@ -220,6 +291,9 @@ select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{ref = R} = Sel,
select_traverse({error, _}, Limit, _, _, _, _, _, Acc) ->
select_return(Limit, {lists:reverse(Acc), '$end_of_table'}).
next_or_prev(forward) -> next;
next_or_prev(reverse) -> prev.
select_return(infinity, {L, '$end_of_table'}) ->
L;
select_return(_, Ret) ->
@@ -239,29 +313,34 @@ decr(I) when is_integer(I) ->
decr(infinity) ->
infinity.
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref, direction = Dir} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc),
fun(sel) -> Sel;
(cont) ->
mrdb:with_rdb_iterator(
Ref,
fun(NewI) ->
select_traverse(iterator_next(NewI, K),
select_traverse(iterator_next(NewI, K, Dir),
Limit, Pfx, MS, NewI, Sel,
AccKeys, [])
end)
end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
traverse_continue(_K, Limit, Pfx, MS, I, #sel{direction = Dir} = Sel, AccKeys, Acc) ->
select_traverse(rocksdb:iterator_move(I, next_or_prev(Dir)), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) ->
case rocksdb:iterator_move(I, K) of
iterator_next(I, K, Dir) ->
case i_move(I, K, Dir) of
{ok, K, _} ->
rocksdb:iterator_move(I, next);
rocksdb:iterator_move(I, next_or_prev(Dir));
Other ->
Other
end.
i_move(I, K, reverse) ->
rocksdb:iterator_move(I, {seek_for_prev, K});
i_move(I, K, forward) ->
rocksdb:iterator_move(I, K).
keypat([H|T], KeyPos, Ref) ->
keypat(T, KeyPos, Ref, keypat_pfx(H, KeyPos, Ref)).
+74
View File
@@ -0,0 +1,74 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
%% @doc Statistics API for the mnesia_rocksdb plugin
%%
%% Some counters are maintained for each active alias. Currently, the following
%% counters are supported:
%% * inner_retries
%% * outer_retries
%%
-module(mrdb_stats).
-export([new/0]).
-export([incr/3,
get/1,
get/2]).
-type alias() :: mnesia_rocksdb:alias().
-type db_ref() :: mrdb:db_ref().
-type counter() :: atom().
-type increment() :: integer().
-type counters() :: #{ counter() := integer() }.
new() ->
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
, meta => ctr_meta()}.
ctr_meta() ->
#{ inner_retries => 1
, outer_retries => 2 }.
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
%%
%% Note that the first argument may also be a `db_ref()' map,
%% corresponding to `mrdb:get_ref({admin, Alias})'.
%% @end
incr(Alias, Ctr, N) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
incr_(Ref, Meta, Ctr, N);
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
incr_(Ref, Meta, Ctr, N).
-spec get(alias() | db_ref(), counter()) -> integer().
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
%% @end
get(Alias, Ctr) when is_atom(Alias) ->
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
get_(Ref, Meta, Ctr);
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
get_(Ref, Meta, Ctr).
-spec get(alias() | db_ref()) -> counters().
%% @doc Fetches all known counters for `Alias', in the form of a map,
%% `#{Counter => Value}'.
%% @end
get(Alias) when is_atom(Alias) ->
get_(mrdb:get_ref({admin, Alias}));
get(Ref) when is_map(Ref) ->
get_(Ref).
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
lists:foldl(
fun({K, P}, M) ->
M#{K := counters:get(Ref, P)}
end, Meta, maps:to_list(Meta)).
get_(Ref, Meta, Attr) ->
Ix = maps:get(Attr, Meta),
counters:get(Ref, Ix).
incr_(Ref, Meta, Attr, N) ->
Ix = maps:get(Attr, Meta),
counters:add(Ref, Ix, N).
+187 -15
View File
@@ -24,8 +24,12 @@
, mrdb_abort/1
, mrdb_two_procs/1
, mrdb_two_procs_tx_restart/1
, mrdb_two_procs_tx_inner_restart/1
, mrdb_two_procs_snap/1
, mrdb_three_procs/1
, create_counters/1
, update_counters/1
, restart_node/1
]).
-include_lib("common_test/include/ct.hrl").
@@ -41,7 +45,8 @@ all() ->
groups() ->
[
{all_tests, [sequence], [ {group, checks}
, {group, mrdb} ]}
, {group, mrdb}
, {group, counters}]}
%% , error_handling ]}
, {checks, [sequence], [ encoding_sext_attrs
, encoding_binary_binary
@@ -53,8 +58,13 @@ groups() ->
, mrdb_abort
, mrdb_two_procs
, mrdb_two_procs_tx_restart
, mrdb_two_procs_tx_inner_restart
, mrdb_two_procs_snap
, mrdb_three_procs ]}
, {counters, [sequence], [ create_counters
, update_counters
, restart_node
, update_counters ]}
].
@@ -267,7 +277,7 @@ mrdb_abort(Config) ->
Pre = mrdb:read(tx_abort, a),
D0 = get_dict(),
TRes = try mrdb:activity(
tx, rdb,
{tx, #{mnesia_compatible => true}}, rdb,
fun() ->
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
error(abort_here),
@@ -287,10 +297,17 @@ mrdb_abort(Config) ->
mrdb_two_procs(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
tr_flags(
{self(), [call, sos, p]},
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {mrdb, activity, x}], tr_opts()))).
mrdb_two_procs_(Config) ->
@@ -314,11 +331,16 @@ mrdb_two_procs_(Config) ->
Pre = mrdb:read(R, a),
go_ahead_other(POther),
await_other_down(POther, MRef, ?LINE),
%% The test proc is still inside the transaction, and POther
%% has terminated/committed. If we now read 'a', we should see
%% the value that POther wrote (no isolation).
[{R, a, 17}] = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
%% Our transaction should fail due to the resource conflict, and because
%% we're not using retries.
try mrdb:activity({tx, #{no_snapshot => true,
retries => 0}}, rdb, F1) of
ok -> error(unexpected)
@@ -339,6 +361,7 @@ mrdb_two_procs_tx_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
check_stats(rdb),
mrdb:insert(R, {R, a, 1}),
Pre = mrdb:read(R, a),
F0 = fun() ->
@@ -354,7 +377,7 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = [{R, a, 17}],
Att = get_attempt(),
Expected = case Att of
1 -> Pre;
0 -> Pre;
_ -> OtherWrite
end,
Expected = mrdb:read(R, a),
@@ -363,14 +386,104 @@ mrdb_two_procs_tx_restart_(Config) ->
OtherWrite = mrdb:read(R, a),
ok = mrdb:insert(R, {R, a, 18})
end,
go_ahead_other(1, POther),
go_ahead_other(0, POther),
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
dictionary_unchanged(Do0),
check_stats(rdb),
[{R, a, 18}] = mrdb:read(R, a),
delete_tabs(Created),
ok.
mrdb_two_procs_tx_inner_restart(Config) ->
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
dbg_tr_opts()).
mrdb_two_procs_tx_inner_restart_(Config) ->
R = ?FUNCTION_NAME,
Parent = self(),
Created = create_tabs([{R, []}], Config),
mrdb:insert(R, {R, a, 1}),
mrdb:insert(R, {R, b, 1}),
PreA = mrdb:read(R, a),
PreB = mrdb:read(R, b),
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
F0 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, a, 17}),
wait_for_other(Parent, ?LINE)
end,
F1 = fun() ->
wait_for_other(Parent, ?LINE),
ok = mrdb:insert(R, {R, b, 147}),
wait_for_other(Parent, ?LINE)
end,
Spawn = fun(F) ->
Res = spawn_opt(
fun() ->
ok = mrdb:activity(tx, rdb, F)
end, [monitor]),
Res
end,
{P1, MRef1} = Spawn(F0),
{P2, MRef2} = Spawn(F1),
F2 = fun() ->
PostA = [{R, a, 17}], % once F0 (P1) has committed
PostB = [{R, b, 147}], % once F1 (P2) has committed
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
BRes = mrdb:read(R, b),
Att = get_attempt(),
ct:log("Att = ~p", [Att]),
case Att of
0 -> % This is the first run (no retry yet)
ARes = PreA,
BRes = PreB,
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
await_other_down(P1, MRef1, ?LINE),
PostA = mrdb:read(R, a); % now, P1's write should leak through here
{0, 1} -> % This is the first (outer) retry
go_ahead_other(0, P2), % Let P2 write
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
await_other_down(P2, MRef2, ?LINE),
PostA = mrdb:read(R, a), % now we should see writes from both P1
%% On OTP 23, the following step might fail due to timing, even
%% though the trace output looks as expected. Possibly some quirk
%% with leakage propagation time in rocksdb. Let's retry to be sure.
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
{1, 1} ->
PostA = mrdb:read(R, a),
PostB = mrdb:read(R, b),
ok
end,
mrdb:insert(R, {R, a, 18}),
mrdb:insert(R, {R, b, 18})
end,
Do0 = get_dict(),
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
check_stats(rdb),
dictionary_unchanged(Do0),
[{R, a, 18}] = mrdb:read(R, a),
[{R, b, 18}] = mrdb:read(R, b),
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
delete_tabs(Created),
ok.
try_until(Result, F) ->
try_until(Result, F, 10).
try_until(Result, F, N) when N > 0 ->
case F() of
Result ->
ok;
_ ->
receive after 100 -> ok end,
try_until(Result, F, N-1)
end;
try_until(Result, F, _) ->
error({badmatch, {Result, F}}).
%
%% For testing purposes, we use side-effects inside the transactions
@@ -383,7 +496,7 @@ mrdb_two_procs_tx_restart_(Config) ->
%% attempt, and ignore the sync ops on retries.
%%
-define(IF_FIRST(N, Expr),
if N == 1 ->
if N == 0 ->
Expr;
true ->
ok
@@ -413,8 +526,8 @@ mrdb_two_procs_snap(Config) ->
go_ahead_other(Att, POther),
ARes = mrdb:read(R, a),
ARes = case Att of
1 -> Pre;
2 -> [{R, a, 17}]
0 -> Pre;
_ -> [{R, a, 17}]
end,
await_other_down(POther, MRef, ?LINE),
PreB = mrdb:read(R, b),
@@ -434,7 +547,7 @@ mrdb_two_procs_snap(Config) ->
%% We make sure that P2 commits before finishing the other two, and P3 and the
%% main thread sync, so as to maximize the contention for the retry lock.
mrdb_three_procs(Config) ->
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
mrdb_three_procs_(Config) ->
R = ?FUNCTION_NAME,
@@ -452,7 +565,7 @@ mrdb_three_procs_(Config) ->
spawn_opt(fun() ->
D0 = get_dict(),
do_when_p_allows(
1, Parent, ?LINE,
0, Parent, ?LINE,
fun() ->
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
end),
@@ -488,8 +601,8 @@ mrdb_three_procs_(Config) ->
fun() ->
Att = get_attempt(),
ARes = case Att of
1 -> [A0];
2 -> [A1]
0 -> [A0];
_ -> [A1]
end,
%% First, ensure that P2 tx is running
go_ahead_other(Att, P2),
@@ -519,6 +632,7 @@ tr_opts() ->
, {?MODULE, await_other_down, 3, x}
, {?MODULE, do_when_p_allows, 4, x}
, {?MODULE, allow_p, 3, x}
, {?MODULE, check_stats, 1, x}
]}.
light_tr_opts() ->
@@ -529,6 +643,23 @@ light_tr_opts() ->
, {mrdb, read, 2, x}
, {mrdb, activity, x} ], tr_opts())).
dbg_tr_opts() ->
tr_flags(
{self(), [call, sos, p, 'receive']},
tr_patterns(
mrdb, [ {mrdb, insert, 2, x}
, {mrdb, read, 2, x}
, {mrdb, retry_activity, 3, x}
, {mrdb, try_f, 2, x}
, {mrdb, incr_attempt, 2, x}
, {mrdb, current_context, 0, x}
, {mrdb_mutex, do, 2, x}
, {mrdb_mutex_serializer, do, 2, x}
, {?MODULE, wait_for_other, 2, x}
, {?MODULE, go_ahead_other, 1, x}
, {?MODULE, try_until, 3, x}
, {mrdb, activity, x} ], tr_opts())).
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
Opts#{patterns => Ps ++ Pats1}.
@@ -542,12 +673,13 @@ wait_for_other(Parent, L) ->
wait_for_other(Att, Parent, L) ->
wait_for_other(Att, Parent, 1000, L).
wait_for_other(1, Parent, Timeout, L) ->
wait_for_other(0, Parent, Timeout, L) ->
MRef = monitor(process, Parent),
Parent ! {self(), ready},
receive
{Parent, cont} ->
demonitor(MRef),
Parent ! {self(), cont_ack},
ok;
{'DOWN', MRef, _, _, Reason} ->
ct:log("Parent died, Reason = ~p", [Reason]),
@@ -586,7 +718,13 @@ go_ahead_other(Att, POther, Timeout) ->
go_ahead_other_(POther, Timeout) ->
receive
{POther, ready} ->
POther ! {self(), cont}
POther ! {self(), cont},
receive
{POther, cont_ack} ->
ok
after Timeout ->
error(cont_ack_timeout)
end
after Timeout ->
error(go_ahead_timeout)
end.
@@ -619,6 +757,35 @@ get_attempt() ->
#{activity := #{attempt := Attempt}} = mrdb:current_context(),
Attempt.
create_counters(_Config) ->
create_tab(counters, []),
mrdb:insert(counters, {counters, c0, 1}),
mrdb:update_counter(counters, c1, 1),
[{counters, c0, 1}] = mrdb:read(counters, c0),
[{counters, c1, 1}] = mrdb:read(counters, c1),
ct:log("Created tab counters, with objs c0 (1) and c1 (1)", []),
ok.
restart_node(_Config) ->
mnesia:stop(),
ok = mnesia:start(),
ct:log("mnesia restarted", []),
ok.
update_counters(_Config) ->
[{counters, c0, C0Prev}] = mrdb:read(counters, c0),
[{counters, c1, C1Prev}] = mrdb:read(counters, c1),
ct:log("co: ~p, c1: ~p", [C0Prev, C1Prev]),
ok = mrdb:update_counter(counters, c0, 1),
ok = mrdb:update_counter(counters, c1, 1),
ct:log("Incremented c0 and c1 by 1", []),
C0 = C0Prev + 1,
C1 = C1Prev + 1,
[{counters, c0, C0}] = mrdb:read(counters, c0),
[{counters, c1, C1}] = mrdb:read(counters, c1),
ct:log("c0: ~p, c1: ~p", [C0, C1]),
ok.
create_tabs(Tabs, Config) ->
Res = lists:map(fun create_tab/1, Tabs),
tr_ct:trace_checkpoint(?TABS_CREATED, Config),
@@ -645,3 +812,8 @@ dictionary_unchanged(Old) ->
, added := [] } = #{ deleted => Old -- New
, added => New -- Old },
ok.
check_stats(Alias) ->
Stats = mrdb_stats:get(Alias),
ct:log("Stats: ~p", [Stats]),
Stats.
+46 -2
View File
@@ -33,6 +33,7 @@
-export([
index_plugin_mgmt/1
, add_indexes/1
, delete_indexes/1
, create_bag_index/1
, create_ordered_index/1
, test_1_ram_copies/1
@@ -40,10 +41,12 @@
, fail_1_disc_only/1
, plugin_ram_copies1/1
, plugin_ram_copies2/1
, plugin_ram_copies3/1
, plugin_disc_copies/1
, fail_plugin_disc_only/1
, plugin_disc_copies_bag/1
, plugin_rdb_ordered/1
, plugin_rdb_none/1
, index_iterator/1
]).
@@ -73,10 +76,12 @@ run(Config) ->
{pfx},mnesia_rocksdb, ix_prefixes),
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
test_index_plugin(cfg([pl3, rdb, none], Config)),
index_plugin_mgmt(Config),
ok.
@@ -94,6 +99,7 @@ groups() ->
, create_ordered_index
, index_plugin_mgmt
, add_indexes
, delete_indexes
]}
, {access, [sequence], [
test_1_ram_copies
@@ -104,10 +110,12 @@ groups() ->
, {plugin, [sequence], [
plugin_ram_copies1
, plugin_ram_copies2
, plugin_ram_copies3
, plugin_disc_copies
, fail_plugin_disc_only
, plugin_disc_copies_bag
, plugin_rdb_ordered
, plugin_rdb_none
]}
].
@@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
test(N, Type, T) ->
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
{attributes,[k,a,b,c]},
@@ -204,7 +215,7 @@ add_del_indexes() ->
test_index_plugin(Config) ->
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
{index, [{{pfx}, IxType}]}]),
{index, [ixtype(IxType)]}]),
mnesia:dirty_write({Tab, "foobar", "sentence"}),
mnesia:dirty_write({Tab, "yellow", "sensor"}),
mnesia:dirty_write({Tab, "truth", "white"}),
@@ -216,13 +227,27 @@ test_index_plugin(Config) ->
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx});
IxType == ordered ->
IxType == ordered; IxType == none ->
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
Tab, <<"foo">>, {pfx})
end,
if Type == rdb ->
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
[{Tab,"foobar","sentence"}] = mrdb:index_read(
Tab, <<"foo">>, {pfx});
true ->
ok
end.
ixtype(T) when T==bag;
T==ordered ->
{{pfx}, T};
ixtype(none) ->
{pfx}.
create_bag_index(_Config) ->
{aborted, {combine_error, _, _}} =
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
@@ -239,6 +264,25 @@ add_indexes(_Config) ->
{atomic, ok} = mnesia:add_table_index(T, a),
ok.
delete_indexes(_Config) ->
T = ?TAB(t1),
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
ok = mrdb:insert(T, {T, a, 1, 1}),
ok = mrdb:insert(T, {T, b, 1, 2}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
{atomic, ok} = mnesia:del_table_index(T, a),
ok = mrdb:delete(T, b),
ok = mrdb:insert(T, {T, c, 2, 3}),
{atomic, ok} = mnesia:add_table_index(T, a),
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
ok.
ix_fold_acc(K, V, Acc) ->
[{K, V} | Acc].
index_plugin_mgmt(_Config) ->
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
+2 -1
View File
@@ -149,8 +149,9 @@ create_migrateable_db(Config) ->
Config.
fill_tabs(Tabs) ->
%% Fill with more than 300, since that's the currently hard-coded chunk size
lists:foreach(fun(Tab) ->
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,3)]
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,500)]
end, Tabs).
create_tabs(Tabs, Config) ->
+1 -1
View File
@@ -1,6 +1,6 @@
-module(mrdb_bench).
-compile(export_all).
-compile([export_all, nowarn_export_all]).
init() ->
mnesia:delete_schema([node()]),
+226
View File
@@ -0,0 +1,226 @@
-module(mrdb_fold_SUITE).
-export([
all/0
, groups/0
, suite/0
, init_per_suite/1
, end_per_suite/1
, init_per_group/2
, end_per_group/2
, init_per_testcase/2
, end_per_testcase/2
]).
-export([
no_prefix_fwd_rdb_fold/1
, prefixed_fwd_rdb_fold/1
, no_prefix_rev_rdb_fold/1
, prefixed_rev_rdb_fold/1
, prefixed_rev_rdb_fold_max/1
, fwd_fold/1
, filtered_fwd_fold/1
, rev_fold/1
, filtered_rev_fold/1
, fwd_select/1
, rev_select/1
, select_cont/1
, rev_select_cont/1
]).
-include_lib("common_test/include/ct.hrl").
-record(r, {k, v}).
suite() ->
[].
all() ->
[{group, all_tests}].
groups() ->
[
{all_tests, [sequence], [ {group, rdb_fold}
, {group, fold}
, {group, select} ]}
, {rdb_fold, [sequence], [ no_prefix_fwd_rdb_fold
, prefixed_fwd_rdb_fold
, no_prefix_rev_rdb_fold
, prefixed_rev_rdb_fold
, prefixed_rev_rdb_fold_max ]}
, {fold, [sequence], [ fwd_fold
, filtered_fwd_fold
, rev_fold
, filtered_rev_fold ]}
, {select, [sequence], [ fwd_select
, rev_select
, select_cont
, rev_select_cont ]}
].
init_per_suite(Config) ->
mnesia:stop(),
ok = mnesia_rocksdb_tlib:start_mnesia(reset),
Config.
end_per_suite(_Config) ->
mnesia:stop(),
ok.
init_per_group(G, Config) when G==rdb_fold; G==fold ->
mk_tab(G),
Config;
init_per_group(_, Config) ->
Config.
end_per_group(_, _Config) ->
ok.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
mk_tab(G) ->
T = tab_name(G),
case create_tab(T) of
{atomic, ok} -> fill_tab(T);
{aborted,{already_exists,T}} -> ok
end.
tab_name(fold ) -> r;
tab_name(select ) -> r;
tab_name(rdb_fold) -> t.
create_tab(T) ->
Opts = tab_opts(T),
mnesia:create_table(T, [{rdb, [node()]} | Opts]).
tab_opts(r) ->
[{attributes, [k, v]}, {type, ordered_set}];
tab_opts(t) ->
[{attributes, [k, v]}, {user_properties, [{mrdb_encoding, {raw, raw}}]}].
no_prefix_fwd_rdb_fold(_Config) ->
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = lists:reverse(raw_objs()),
{Res, Res} = {Res, Expected}.
fwd_fold(_Config) ->
Res = mrdb:fold(r, fun simple_acc/2, []),
Expected = lists:reverse(objs()),
{Res, Res} = {Res, Expected}.
prefixed_fwd_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = lists:reverse(prefixed_raw_objs(Pfx)),
{Res, Res} = {Res, Expected}.
filtered_fwd_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold(r, fun simple_acc/2, [], MS, infinity),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
no_prefix_rev_rdb_fold(_Config) ->
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], <<>>, infinity),
Expected = raw_objs(),
{Res, Res} = {Res, Expected}.
rev_fold(_Config) ->
Res = mrdb:fold_reverse(r, fun simple_acc/2, []),
Expected = objs(),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold(_Config) ->
Pfx = <<"aa">>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
filtered_rev_fold(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:fold_reverse(r, fun simple_acc/2, [], MS, infinity),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
prefixed_rev_rdb_fold_max(_Config) ->
Pfx = <<255,255>>,
Res = mrdb:rdb_fold_reverse(t, fun simple_rdb_acc/3, [], Pfx, infinity),
Expected = prefixed_raw_objs(Pfx),
{Res, Res} = {Res, Expected}.
fwd_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select(r, MS),
Expected = filtered_objs(MS),
{Res, Res} = {Res, Expected}.
rev_select(_Config) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Res = mrdb:select_reverse(r, MS),
Expected = lists:reverse(filtered_objs(MS)),
{Res, Res} = {Res, Expected}.
select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = filtered_objs(MS),
select_cont_loop(mrdb:select(r, MS, 1), Expected).
rev_select_cont(_Cont) ->
MS = [{#r{k = {a,b,'_'}, v = '_'}, [], ['$_']}],
Expected = lists:reverse(filtered_objs(MS)),
select_cont_loop(mrdb:select_reverse(r, MS, 1), Expected).
select_cont_loop({[Obj], Cont}, [Obj|Rest]) ->
select_cont_loop(mrdb:select(Cont), Rest);
select_cont_loop({[], '$end_of_table'}, []) ->
ok.
simple_acc(#r{} = Obj, Acc) ->
[Obj | Acc].
simple_rdb_acc(K, V, Acc) ->
[{K,V} | Acc].
fill_tab(t = Tab) ->
[mrdb:insert(Tab, {Tab, K, V}) || {K,V} <- raw_objs()],
ok;
fill_tab(r = Tab) ->
[mrdb:insert(Tab, Obj) || Obj <- objs()],
ok.
prefixed_raw_objs(Pfx) ->
[Obj || {K,_} = Obj <- raw_objs(),
is_prefix(Pfx, K)].
raw_objs() ->
[ {<<"aa">> , <<"1">>}
, {<<"aa1">>, <<"2">>}
, {<<"ab">> , <<"3">>}
, {<<"ab1">>, <<"4">>}
, {<<255,255>>, <<"5">>} ].
filtered_objs(MS) ->
MSC = ets:match_spec_compile(MS),
ets:match_spec_run(objs(), MSC).
objs() ->
[ #r{k = {a,a,1}, v = 1}
, #r{k = {a,b,2}, v = 2}
, #r{k = {a,b,3}, v = 3}
, #r{k = {a,c,4}, v = 4}
, #r{k = {b,b,5}, v = 5}
].
%% copied from mrdb_select.erl
is_prefix(A, B) when is_binary(A), is_binary(B) ->
Sa = byte_size(A),
case B of
<<A:Sa/binary, _/binary>> ->
true;
_ ->
false
end.