Compare commits
58 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 56d78768d9 | |||
| a768d8008f | |||
| 9cea41cb9a | |||
| d42055c7ac | |||
| 0ae7ecd41d | |||
| ece9db2b09 | |||
| 0e4382d5f7 | |||
| d9d82d7ead | |||
| 34285da6d8 | |||
| cb123ee28a | |||
| 996ae82717 | |||
| d6b86524fd | |||
| 296813ef7b | |||
| 16d0533acf | |||
| 1d6ae82c6d | |||
| 71ebaf2d66 | |||
| f2b6116d31 | |||
| 33ee7929d4 | |||
| 78669bb8bf | |||
| 346decee6e | |||
| 4da0fce567 | |||
| e7d42f9500 | |||
| 699a7d5920 | |||
| 57f02078bc | |||
| 791cec41db | |||
| 6eff62df6d | |||
| 2e430fc5eb | |||
| e76a01f8c4 | |||
| 4d0a78612a | |||
| eeb6aff242 | |||
| f11e16e29f | |||
| 0aecf5ef01 | |||
| 465a220bfe | |||
| 19140c738b | |||
| bed66b2998 | |||
| 3635eac717 | |||
| 95abe4e36e | |||
| 7c729bd932 | |||
| 4489e5d743 | |||
| d1a6bf22d5 | |||
| b65e82ed71 | |||
| ee9e7eac67 | |||
| ce2be519b4 | |||
| 8073a0daa5 | |||
| ab15b7f399 | |||
| a75f6e0c43 | |||
| 1340bb2050 | |||
| d1177b6ad4 | |||
| b908998e6b | |||
| dfc0125800 | |||
| 296abb23bb | |||
| 7057f4dcbd | |||
| c4235be94a | |||
| 302aa1252b | |||
| fde2e1194e | |||
| c4f7b7ac02 | |||
| 226a3b8e91 | |||
| 73784fe765 |
+23
-3
@@ -2,15 +2,35 @@ version: 2.1
|
||||
|
||||
executors:
|
||||
aebuilder:
|
||||
parameters:
|
||||
otp:
|
||||
type: string
|
||||
docker:
|
||||
- image: aeternity/builder
|
||||
- image: aeternity/builder:focal-<< parameters.otp >>
|
||||
user: builder
|
||||
environment:
|
||||
ERLANG_ROCKSDB_BUILDOPTS: "-j2"
|
||||
ERLANG_ROCKSDB_OPTS: "-DWITH_SYSTEM_ROCKSDB=ON -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_BZ2=ON -DWITH_ZSTD=ON"
|
||||
|
||||
|
||||
jobs:
|
||||
build:
|
||||
executor: aebuilder
|
||||
build_and_test:
|
||||
parameters:
|
||||
otp:
|
||||
type: string
|
||||
executor:
|
||||
name: aebuilder
|
||||
otp: << parameters.otp >>
|
||||
steps:
|
||||
- checkout
|
||||
- run: make test
|
||||
- store_artifacts:
|
||||
path: _build/test/logs
|
||||
|
||||
workflows:
|
||||
commit:
|
||||
jobs:
|
||||
- build_and_test:
|
||||
matrix:
|
||||
parameters:
|
||||
otp: ["otp24", "otp25", "otp26"]
|
||||
|
||||
@@ -80,7 +80,7 @@ RocksDB supports a number of customization options. These can be specified
|
||||
by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`,
|
||||
for example:
|
||||
|
||||
```
|
||||
```erlang
|
||||
mnesia:create_table(foo, [{rocksdb_copies, [node()]},
|
||||
...
|
||||
{user_properties,
|
||||
@@ -93,7 +93,7 @@ for information on configuration parameters. Also see the section below on handl
|
||||
|
||||
The default configuration for tables in `mnesia_rocksdb` is:
|
||||
|
||||
```
|
||||
```erlang
|
||||
default_open_opts() ->
|
||||
[ {create_if_missing, true}
|
||||
, {cache_size,
|
||||
@@ -131,6 +131,18 @@ depend on having an up to date size count at all times, you need to maintain
|
||||
it yourself. If you only need the size occasionally, you may traverse the
|
||||
table to count the elements.
|
||||
|
||||
When `mrdb` transactions abort, they will return a stacktrace caught
|
||||
from within the transaction fun, giving much better debugging info.
|
||||
This is different from how mnesia does it.
|
||||
|
||||
If behavior closer to mnesia's abort returns are needed, say, for backwards
|
||||
compatibility, this can be controlled by setting the environment variable
|
||||
`-mnesia_rocksdb mnesia_compatible_aborts true`, or by adding a transaction
|
||||
option, e.g. `mrdb:activity({tx, #{mnesia_compatible => true}}, fun() ... end)`.
|
||||
For really performance-critical transactions which may abort often, it might
|
||||
make a difference to set this option to `true`, since there is a cost involved
|
||||
in producing stacktraces.
|
||||
|
||||
|
||||
### <a name="Mnesia_backend_plugins">Mnesia backend plugins</a> ###
|
||||
|
||||
@@ -195,7 +207,7 @@ our example. It returns a list of index terms.
|
||||
|
||||
Given the following index plugin implementation:
|
||||
|
||||
```
|
||||
```erlang
|
||||
-module(words).
|
||||
-export([words_f/3]).
|
||||
|
||||
@@ -212,7 +224,7 @@ words_(_) ->
|
||||
|
||||
We can register the plugin and use it in table definitions:
|
||||
|
||||
```
|
||||
```erlang
|
||||
Eshell V12.1.3 (abort with ^G)
|
||||
1> mnesia:start().
|
||||
ok
|
||||
@@ -228,7 +240,7 @@ as an exported function along the node's code path.
|
||||
|
||||
To see what happens when we insert an object, we can turn on call trace.
|
||||
|
||||
```
|
||||
```erlang
|
||||
4> dbg:tracer().
|
||||
{ok,<0.108.0>}
|
||||
5> dbg:tp(words, x).
|
||||
|
||||
+2
-1
@@ -2,4 +2,5 @@
|
||||
{application,mnesia_rocksdb}.
|
||||
{modules,[mnesia_rocksdb,mnesia_rocksdb_admin,mnesia_rocksdb_app,
|
||||
mnesia_rocksdb_lib,mnesia_rocksdb_params,mnesia_rocksdb_sup,
|
||||
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,mrdb_select]}.
|
||||
mnesia_rocksdb_tuning,mrdb,mrdb_index,mrdb_mutex,
|
||||
mrdb_mutex_serializer,mrdb_select,mrdb_stats]}.
|
||||
|
||||
+6
-2
@@ -4,8 +4,8 @@
|
||||
{deps,
|
||||
[
|
||||
{sext, "1.8.0"},
|
||||
{rocksdb, {git, "https://gitlab.com/seanhinde/erlang-rocksdb.git", {ref,"9ae37839"}}},
|
||||
{hut, "1.3.0"}
|
||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb.git", {ref,"d695c6e"}}},
|
||||
{hut, "1.4.0"}
|
||||
]}.
|
||||
|
||||
{xref_checks, [
|
||||
@@ -14,6 +14,10 @@
|
||||
deprecated_function_calls
|
||||
]}.
|
||||
|
||||
{dialyzer, [{plt_apps, all_deps},
|
||||
{base_plt_apps, [erts, kernel, stdlib, mnesia ]}
|
||||
]}.
|
||||
|
||||
{profiles,
|
||||
[
|
||||
{test,
|
||||
|
||||
@@ -1,11 +1,4 @@
|
||||
%% -*- erlang-mode -*-
|
||||
case os:getenv("ERLANG_ROCKSDB_OPTS") of
|
||||
false ->
|
||||
true = os:putenv("ERLANG_ROCKSDB_OPTS", "-DWITH_BUNDLE_LZ4=ON");
|
||||
_ ->
|
||||
%% If manually set, we assume it's throught through
|
||||
skip
|
||||
end.
|
||||
case os:getenv("DEBUG") of
|
||||
"true" ->
|
||||
Opts = proplists:get_value(erl_opts, CONFIG, []),
|
||||
|
||||
+5
-5
@@ -1,15 +1,15 @@
|
||||
{"1.2.0",
|
||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
|
||||
[{<<"hut">>,{pkg,<<"hut">>,<<"1.4.0">>},0},
|
||||
{<<"rocksdb">>,
|
||||
{git,"https://gitlab.com/seanhinde/erlang-rocksdb.git",
|
||||
{ref,"9ae378391ffc94200bde24efcd7a4921eba688d0"}},
|
||||
{git,"https://github.com/emqx/erlang-rocksdb.git",
|
||||
{ref,"d695c6ee9dd27bfe492ed4e24c72ad20ab0d770b"}},
|
||||
0},
|
||||
{<<"sext">>,{pkg,<<"sext">>,<<"1.8.0">>},0}]}.
|
||||
[
|
||||
{pkg_hash,[
|
||||
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
|
||||
{<<"hut">>, <<"7A1238EC00F95C9EC75412587EE11AC652ECA308A7F4B8CC9629746D579D6CF0">>},
|
||||
{<<"sext">>, <<"90A95B889F5C781B70BBCF44278B763148E313C376B60D87CE664CB1C1DD29B5">>}]},
|
||||
{pkg_hash_ext,[
|
||||
{<<"hut">>, <<"7E15D28555D8A1F2B5A3A931EC120AF0753E4853A4C66053DB354F35BF9AB563">>},
|
||||
{<<"hut">>, <<"7AF8704B9BAE98A336F70D9560FC3C97F15665265FA603DBD05352E63D6EBB03">>},
|
||||
{<<"sext">>, <<"BC6016CB8690BAF677EACACFE6E7CADFEC8DC7E286CBBED762F6CD55B0678E73">>}]}
|
||||
].
|
||||
|
||||
+12
-10
@@ -346,14 +346,11 @@ semantics(_Alias, index_fun) -> fun index_f/4;
|
||||
semantics(_Alias, _) -> undefined.
|
||||
|
||||
is_index_consistent(Alias, {Tab, index, PosInfo}) ->
|
||||
case info(Alias, Tab, {index_consistent, PosInfo}) of
|
||||
true -> true;
|
||||
_ -> false
|
||||
end.
|
||||
mnesia_rocksdb_admin:read_info(Alias, Tab, {index_consistent, PosInfo}, false).
|
||||
|
||||
index_is_consistent(_Alias, {Tab, index, PosInfo}, Bool)
|
||||
index_is_consistent(Alias, {Tab, index, PosInfo}, Bool)
|
||||
when is_boolean(Bool) ->
|
||||
mrdb:write_info(Tab, {index_consistent, PosInfo}, Bool).
|
||||
mnesia_rocksdb_admin:write_info(Alias, Tab, {index_consistent, PosInfo}, Bool).
|
||||
|
||||
|
||||
%% PRIVATE FUN
|
||||
@@ -454,8 +451,13 @@ close_table(Alias, Tab) ->
|
||||
error ->
|
||||
ok;
|
||||
_ ->
|
||||
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
||||
close_table_(Alias, Tab)
|
||||
case get(mnesia_dumper_dets) of
|
||||
undefined ->
|
||||
ok = mnesia_rocksdb_admin:prep_close(Alias, Tab),
|
||||
close_table_(Alias, Tab);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end.
|
||||
|
||||
close_table_(Alias, Tab) ->
|
||||
@@ -798,7 +800,7 @@ handle_call({create_table, Tab, Props}, _From,
|
||||
exit:{aborted, Error} ->
|
||||
{reply, {aborted, Error}, St}
|
||||
end;
|
||||
handle_call({load_table, _LoadReason, Props}, _From,
|
||||
handle_call({load_table, _LoadReason, Props}, _,
|
||||
#st{alias = Alias, tab = Tab} = St) ->
|
||||
{ok, _Ref} = mnesia_rocksdb_admin:load_table(Alias, Tab, Props),
|
||||
{reply, ok, St#st{status = active}};
|
||||
@@ -826,7 +828,7 @@ handle_call({delete, Key}, _From, St) ->
|
||||
handle_call({match_delete, Pat}, _From, #st{tab = Tab} = St) ->
|
||||
Res = mrdb:match_delete(get_ref(Tab), Pat),
|
||||
{reply, Res, St};
|
||||
handle_call(close_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
||||
handle_call(close_table, _, #st{alias = Alias, tab = Tab} = St) ->
|
||||
_ = mnesia_rocksdb_admin:close_table(Alias, Tab),
|
||||
{reply, ok, St#st{status = undefined}};
|
||||
handle_call(delete_table, _From, #st{alias = Alias, tab = Tab} = St) ->
|
||||
|
||||
+208
-46
@@ -18,7 +18,8 @@
|
||||
, clear_table/1
|
||||
]).
|
||||
|
||||
-export([ migrate_standalone/2 ]).
|
||||
-export([ migrate_standalone/2
|
||||
, migrate_standalone/3 ]).
|
||||
|
||||
-export([ start_link/0
|
||||
, init/1
|
||||
@@ -32,6 +33,7 @@
|
||||
, read_info/2 %% (Alias, Tab)
|
||||
, read_info/4 %% (Alias, Tab, Key, Default)
|
||||
, write_info/4 %% (Alias, Tab, Key, Value)
|
||||
, delete_info/3 %% (Alias, Tab, Key)
|
||||
, write_table_property/3 %% (Alias, Tab, Property)
|
||||
]).
|
||||
|
||||
@@ -66,6 +68,8 @@
|
||||
|
||||
-type cf() :: mrdb:db_ref().
|
||||
|
||||
-type rpt() :: undefined | map().
|
||||
|
||||
-type req() :: {create_table, table(), properties()}
|
||||
| {delete_table, table()}
|
||||
| {load_table, table(), properties()}
|
||||
@@ -74,7 +78,7 @@
|
||||
| {add_aliases, [alias()]}
|
||||
| {write_table_property, tabname(), tuple()}
|
||||
| {remove_aliases, [alias()]}
|
||||
| {migrate, [{tabname(), map()}]}
|
||||
| {migrate, [{tabname(), map()}], rpt()}
|
||||
| {prep_close, table()}
|
||||
| {close_table, table()}
|
||||
| {clear_table, table() | cf() }.
|
||||
@@ -277,9 +281,20 @@ write_info(Alias, Tab, K, V) ->
|
||||
write_info_(get_ref({admin, Alias}), Tab, K, V).
|
||||
|
||||
write_info_(Ref, Tab, K, V) ->
|
||||
write_info_encv(Ref, Tab, K, term_to_binary(V)).
|
||||
|
||||
write_info_encv(Ref, Tab, K, V) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info,Tab,K}, sext),
|
||||
maybe_write_standalone_info(Ref, K, V),
|
||||
mrdb:rdb_put(Ref, EncK, term_to_binary(V), []).
|
||||
mrdb:rdb_put(Ref, EncK, V, []).
|
||||
|
||||
delete_info(Alias, Tab, K) ->
|
||||
delete_info_(get_ref({admin, Alias}), Tab, K).
|
||||
|
||||
delete_info_(Ref, Tab, K) ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key({info, Tab, K}, sext),
|
||||
maybe_delete_standalone_info(Ref, K),
|
||||
mrdb:rdb_delete(Ref, EncK, []).
|
||||
|
||||
maybe_write_standalone_info(Ref, K, V) ->
|
||||
case Ref of
|
||||
@@ -292,11 +307,28 @@ maybe_write_standalone_info(Ref, K, V) ->
|
||||
ok
|
||||
end.
|
||||
|
||||
maybe_delete_standalone_info(Ref, K) ->
|
||||
case Ref of
|
||||
#{type := standalone, vsn := 1, db_ref := DbRef} ->
|
||||
EncK = mnesia_rocksdb_lib:encode_key(K, sext),
|
||||
Key = <<?INFO_TAG, EncK/binary>>,
|
||||
rocksdb:delete(DbRef, Key, []);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
||||
call(Alias, {write_table_property, Tab, Prop}).
|
||||
|
||||
migrate_standalone(Alias, Tabs) ->
|
||||
call(Alias, {migrate, Tabs}).
|
||||
migrate_standalone(Alias, Tabs, undefined).
|
||||
|
||||
migrate_standalone(Alias, Tabs, Rpt0) ->
|
||||
Rpt = case Rpt0 of
|
||||
undefined -> undefined;
|
||||
To -> #{to => To, tag => migrate_standalone}
|
||||
end,
|
||||
call(Alias, {migrate, Tabs, Rpt}).
|
||||
|
||||
-spec call(alias() | [], req()) -> no_return() | any().
|
||||
call(Alias, Req) ->
|
||||
@@ -313,13 +345,13 @@ call(Alias, Req, Timeout) ->
|
||||
end.
|
||||
|
||||
start_link() ->
|
||||
mrdb_mutex:ensure_tab(),
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
_ = maybe_initial_meta(), %% bootstrap pt
|
||||
Opts = default_opts(),
|
||||
process_flag(trap_exit, true),
|
||||
ensure_cf_cache(),
|
||||
mnesia:subscribe({table, schema, simple}),
|
||||
{ok, recover_state(#st{default_opts = Opts})}.
|
||||
|
||||
@@ -394,12 +426,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs
|
||||
%% We need to store the persistent ref explicitly here,
|
||||
%% since mnesia knows nothing of our admin table.
|
||||
AdminTab = {admin, Alias},
|
||||
Stats = mrdb_stats:new(),
|
||||
CfI = update_cf_info(AdminTab, #{ status => open
|
||||
, name => AdminTab
|
||||
, vsn => ?VSN
|
||||
, encoding => {sext,{value,term}}
|
||||
, attr_pos => #{key => 1,
|
||||
value => 2}
|
||||
, stats => Stats
|
||||
, mountpoint => MP
|
||||
, properties =>
|
||||
#{ attributes => [key, val]
|
||||
@@ -499,12 +533,17 @@ intersection(A, B) ->
|
||||
|
||||
-spec handle_req(alias(), req(), backend(), st()) -> gen_server_reply().
|
||||
handle_req(Alias, {create_table, Name, Props}, Backend, St) ->
|
||||
case create_trec(Alias, Name, Props, Backend, St) of
|
||||
{ok, NewCf} ->
|
||||
St1 = update_cf(Alias, Name, NewCf, St),
|
||||
{reply, {ok, NewCf}, St1};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
case find_cf(Alias, Name, Backend, St) of
|
||||
{ok, TRec} ->
|
||||
{reply, {ok, TRec}, St};
|
||||
error ->
|
||||
case create_trec(Alias, Name, Props, Backend, St) of
|
||||
{ok, NewCf} ->
|
||||
St1 = update_cf(Alias, Name, NewCf, St),
|
||||
{reply, {ok, NewCf}, maybe_update_main(Alias, Name, create, St1)};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
end
|
||||
end;
|
||||
handle_req(Alias, {load_table, Name, Props}, Backend, St) ->
|
||||
try
|
||||
@@ -588,10 +627,10 @@ handle_req(Alias, {write_table_property, Tab, Prop}, Backend, St) ->
|
||||
_ ->
|
||||
{reply, {error, not_found}, St}
|
||||
end;
|
||||
handle_req(Alias, {migrate, Tabs0}, Backend, St) ->
|
||||
case prepare_migration(Alias, Tabs0, St) of
|
||||
handle_req(Alias, {migrate, Tabs0, Rpt}, Backend, St) ->
|
||||
case prepare_migration(Alias, Tabs0, Rpt, St) of
|
||||
{ok, Tabs} ->
|
||||
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, St),
|
||||
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, Rpt, St),
|
||||
{reply, Res, St1};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
@@ -617,8 +656,14 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
||||
case {Op, lists:member(I, Index)} of
|
||||
{delete, true} ->
|
||||
CfM1 = CfM#{properties => Props#{index => Index -- [I]}},
|
||||
delete_info(Alias, Main, {index_consistent, I}),
|
||||
maybe_update_pt(Main, CfM1),
|
||||
update_cf(Alias, Main, CfM1, St);
|
||||
{create, _} ->
|
||||
%% Due to a previous bug, this marker might linger
|
||||
%% In any case, it mustn't be there for a newly created index
|
||||
delete_info(Alias, Main, {index_consistent, I}),
|
||||
St;
|
||||
_ ->
|
||||
St
|
||||
end;
|
||||
@@ -629,6 +674,7 @@ maybe_update_main(Alias, {Main, index, I}, Op, St) ->
|
||||
maybe_update_main(_, _, _, St) ->
|
||||
St.
|
||||
|
||||
|
||||
%% The pt may not have been created yet. If so, don't do it here.
|
||||
maybe_update_pt(Name, Ref) ->
|
||||
case get_pt(Name, error) of
|
||||
@@ -772,7 +818,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
||||
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
|
||||
{ok, OldTRec, _} ->
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, St);
|
||||
?log(info, "Migrating ~p to column family", [Name]),
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
||||
_Other ->
|
||||
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St)
|
||||
end
|
||||
@@ -781,11 +828,11 @@ create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
||||
{Exists, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
|
||||
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, St) ->
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
||||
?log(debug, "Migrate to cf (~p)", [Name]),
|
||||
{ok, NewCf, St1} = create_table_as_cf(
|
||||
Alias, Name, TRec#{db_ref => DbRef}, St),
|
||||
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, St1),
|
||||
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, Rpt, St1),
|
||||
{ok, NewCf, St2}.
|
||||
|
||||
%% Return {Migrate, MP} iff table exists standalone; just false if it doesn't
|
||||
@@ -802,17 +849,33 @@ should_we_migrate_standalone(#{name := Name}) ->
|
||||
false
|
||||
end.
|
||||
|
||||
prepare_migration(Alias, Tabs, St) ->
|
||||
prepare_migration(Alias, Tabs, Rpt, St) ->
|
||||
Res = lists:map(fun(T) ->
|
||||
prepare_migration_(Alias, T, St)
|
||||
end, Tabs),
|
||||
Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St),
|
||||
case [E || {error, _} = E <- Res1] of
|
||||
[] -> {ok, Res1};
|
||||
[] ->
|
||||
rpt(Rpt, "Will migrate ~p~n", [[T || {T,_,_} <- Res1]]),
|
||||
{ok, Res1};
|
||||
[_|_] = Errors ->
|
||||
rpt(Rpt, "Errors encountered: ~p~n", [Errors]),
|
||||
{error, Errors}
|
||||
end.
|
||||
|
||||
rpt(Rpt, Fmt, Args) ->
|
||||
rpt(Rpt, erlang:system_time(millisecond), Fmt, Args).
|
||||
|
||||
rpt(undefined, _, _, _) -> ok;
|
||||
rpt(#{to := Rpt} = R, Time, Fmt, Args) ->
|
||||
Rpt ! {mnesia_rocksdb, report, R#{time => Time, fmt => Fmt, args => Args}},
|
||||
ok.
|
||||
|
||||
maybe_progress(#{to := To}, C) when C rem 100000 =:= 0 ->
|
||||
To ! {mnesia_rocksdb, report, progress};
|
||||
maybe_progress(_, _) ->
|
||||
ok.
|
||||
|
||||
add_related_tabs(Ts, Backend, Alias, St) ->
|
||||
lists:flatmap(
|
||||
fun({error,_} = E) -> [E];
|
||||
@@ -839,29 +902,31 @@ prepare_migration_(Alias, T, #st{} = St) ->
|
||||
{error, {no_such_table, TName}}
|
||||
end.
|
||||
|
||||
do_migrate_tabs(Alias, Tabs, Backend, St) ->
|
||||
do_migrate_tabs(Alias, Tabs, Backend, Rpt, St) ->
|
||||
lists:mapfoldl(fun(T, St1) ->
|
||||
do_migrate_table(Alias, T, Backend, St1)
|
||||
do_migrate_table(Alias, T, Backend, Rpt, St1)
|
||||
end, St, Tabs).
|
||||
|
||||
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, St) when is_map(TRec0) ->
|
||||
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, Rpt, St) when is_map(TRec0) ->
|
||||
T0 = erlang:system_time(millisecond),
|
||||
rpt(Rpt, T0, "Migrate ~p~n", [Name]),
|
||||
TRec = maps:without([encoding, vsn], TRec0),
|
||||
maybe_write_user_props(TRec),
|
||||
{ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec,
|
||||
TRec, Backend, St),
|
||||
TRec, Backend, Rpt, St),
|
||||
put_pt(Name, CF),
|
||||
T1 = erlang:system_time(millisecond),
|
||||
rpt(Rpt, T1, "~nDone (~p)~n", [Name]),
|
||||
Time = T1 - T0,
|
||||
io:fwrite("~p migrated, ~p ms~n", [Name, Time]),
|
||||
{{Name, {ok, Time}}, St1}.
|
||||
|
||||
migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
|
||||
#st{standalone = Ts} = St) ->
|
||||
Rpt, #st{standalone = Ts} = St) ->
|
||||
ChunkSz = chunk_size(TRec),
|
||||
KeyPos = mnesia_rocksdb_lib:keypos(T),
|
||||
migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz),
|
||||
TRec, OldTRec, KeyPos),
|
||||
TRec, OldTRec, KeyPos, set_count(0, Rpt)),
|
||||
case maps:is_key({Alias,T}, Ts)
|
||||
andalso table_is_empty(OldTRec) of
|
||||
true ->
|
||||
@@ -871,27 +936,36 @@ migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
|
||||
{ok, St}
|
||||
end.
|
||||
|
||||
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos) ->
|
||||
mrdb:as_batch(
|
||||
Cf,
|
||||
fun(New) ->
|
||||
mrdb:as_batch(
|
||||
DbRec,
|
||||
fun(Old) ->
|
||||
lists:foreach(
|
||||
fun(Obj) ->
|
||||
mrdb:insert(New, Obj),
|
||||
mrdb:delete(Old, element(KeyPos,Obj))
|
||||
end, L)
|
||||
end)
|
||||
end),
|
||||
migrate_to_cf(cont(Cont), Cf, DbRec, KeyPos);
|
||||
migrate_to_cf('$end_of_table', _, _, _) ->
|
||||
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos, Rpt) ->
|
||||
Count0 = get_count(Rpt),
|
||||
Count = mrdb:as_batch(
|
||||
Cf,
|
||||
fun(New) ->
|
||||
mrdb:as_batch(
|
||||
DbRec,
|
||||
fun(Old) ->
|
||||
lists:foldl(
|
||||
fun(Obj, C) ->
|
||||
mrdb:insert(New, Obj),
|
||||
mrdb:delete(Old, element(KeyPos,Obj)),
|
||||
maybe_progress(Rpt, C),
|
||||
C + 1
|
||||
end, Count0, L)
|
||||
end)
|
||||
end),
|
||||
migrate_to_cf(mrdb_select:select(Cont), Cf, DbRec, KeyPos, set_count(Count, Rpt));
|
||||
migrate_to_cf('$end_of_table', _, _, _, _) ->
|
||||
ok.
|
||||
|
||||
cont('$end_of_table' = E) -> E;
|
||||
cont(F) when is_function(F,0) ->
|
||||
F().
|
||||
get_count(undefined) ->
|
||||
0;
|
||||
get_count(R) when is_map(R) ->
|
||||
maps:get({count}, R, 0).
|
||||
|
||||
set_count(_, undefined) ->
|
||||
undefined;
|
||||
set_count(Count, R) when is_map(R) ->
|
||||
R#{{count} => Count}.
|
||||
|
||||
chunk_size(_) ->
|
||||
300.
|
||||
@@ -1184,7 +1258,18 @@ load_info_(Res, I, ARef, Tab) ->
|
||||
DecK = mnesia_rocksdb_lib:decode_key(K),
|
||||
case read_info_(ARef, Tab, DecK, undefined) of
|
||||
undefined ->
|
||||
write_info_(ARef, Tab, DecK, V);
|
||||
write_info_encv(ARef, Tab, DecK, V);
|
||||
<<131,_/binary>> = Value ->
|
||||
%% Due to a previous bug, info values could be double-encoded with binary_to_term()
|
||||
try binary_to_term(Value) of
|
||||
_DecVal ->
|
||||
%% We haven't been storing erlang-term encoded data as info,
|
||||
%% so assume this is double-encoded and correct
|
||||
write_info_encv(ARef, Tab, DecK, Value)
|
||||
catch
|
||||
error:_ ->
|
||||
skip
|
||||
end;
|
||||
_ ->
|
||||
skip
|
||||
end,
|
||||
@@ -1251,7 +1336,7 @@ rocksdb_opts_from_trec(TRec) ->
|
||||
|
||||
create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
|
||||
CfName = tab_to_cf_name(Name),
|
||||
case rocksdb:create_column_family(DbRef, CfName, cfopts()) of
|
||||
case create_column_family(DbRef, CfName, cfopts(), R) of
|
||||
{ok, CfH} ->
|
||||
R1 = check_version_and_encoding(R#{ cf_handle => CfH
|
||||
, type => column_family }),
|
||||
@@ -1260,6 +1345,82 @@ create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
|
||||
Error
|
||||
end.
|
||||
|
||||
create_column_family(DbRef, CfName, CfOpts, R) ->
|
||||
Res = case column_family_exists(CfName, R) of
|
||||
true ->
|
||||
case find_active_cf_handle(DbRef, CfName) of
|
||||
error ->
|
||||
{error, {no_handle_for_existing_cf, CfName}};
|
||||
{ok, _} = Ok ->
|
||||
Ok
|
||||
end;
|
||||
false ->
|
||||
rocksdb:create_column_family(DbRef, CfName, CfOpts)
|
||||
end,
|
||||
maybe_note_active_cf(Res, CfName),
|
||||
Res.
|
||||
|
||||
column_family_exists(CfName, #{mountpoint := MP}) ->
|
||||
case rocksdb:list_column_families(MP, []) of
|
||||
{ok, CFs} ->
|
||||
lists:member(CfName, CFs);
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
column_family_exists(CfName, #{alias := Alias}) ->
|
||||
case get_ref({admin, Alias}, error) of
|
||||
error ->
|
||||
false;
|
||||
Adm ->
|
||||
column_family_exists(CfName, Adm)
|
||||
end.
|
||||
|
||||
%% Column family handle caching ======================================================
|
||||
%%
|
||||
%% At least as far as I can tell, there is no way to query erlang-rocksdb for currently
|
||||
%% active column family handles. This can become an issue e.g. during table migration
|
||||
%% from standalone to column families, where the meta structures aren't updated until
|
||||
%% after completed migration. A transient error during migration should be addressable
|
||||
%% by simply retrying, but if the CF has already been created, and we've lost the handle,
|
||||
%% there is no easy way to get it back. Unfortunately, if we start caching CFs, we also
|
||||
%% need to garbage collect them.
|
||||
%%
|
||||
|
||||
-define(CFH_CACHE, mnesia_rocksdb_cf_handle_cache).
|
||||
|
||||
ensure_cf_cache() ->
|
||||
case ets:info(?CFH_CACHE, name) of
|
||||
undefined ->
|
||||
ets:new(?CFH_CACHE, [ordered_set, public, named_table]);
|
||||
_ ->
|
||||
true
|
||||
end.
|
||||
|
||||
maybe_note_active_cf({ok, CfH}, CfName) ->
|
||||
ets:insert(?CFH_CACHE, {{CfName, CfH}});
|
||||
maybe_note_active_cf(_, _) ->
|
||||
false.
|
||||
|
||||
find_active_cf_handle(DbRef, CfName) ->
|
||||
Candidates = ets:select(?CFH_CACHE, [{{{CfName,'$1'}}, [], ['$1']}]),
|
||||
lists:foldl(fun(CfH, Acc) -> check_cfh(DbRef, CfH, CfName, Acc) end, error, Candidates).
|
||||
|
||||
check_cfh(DbRef, CfH, CfName, Acc) ->
|
||||
case rocksdb:iterator(DbRef, CfH, []) of
|
||||
{ok, I} ->
|
||||
rocksdb:iterator_close(I),
|
||||
{ok, CfH};
|
||||
{error, _} ->
|
||||
ets:delete(?CFH_CACHE, {CfName, CfH}),
|
||||
Acc
|
||||
end.
|
||||
|
||||
drop_cached_cf(CfName, CfH) ->
|
||||
ets:delete(?CFH_CACHE, {CfName, CfH}).
|
||||
|
||||
%%
|
||||
%% ===================================================================================
|
||||
|
||||
do_prep_close(Name, Backend, St) ->
|
||||
RelTabs = get_related_resources(Name, Backend),
|
||||
erase_pt_list([Name | RelTabs]),
|
||||
@@ -1302,6 +1463,7 @@ do_delete_table(Alias, Name, Backend, #st{} = St) ->
|
||||
case Where of
|
||||
#{db_ref := DbRef, cf_handle := CfH, type := column_family} ->
|
||||
rocksdb:drop_column_family(DbRef, CfH),
|
||||
drop_cached_cf(tab_to_cf_name(Name), CfH),
|
||||
rocksdb:destroy_column_family(DbRef, CfH),
|
||||
{ok, delete_cf(Alias, Name, St)};
|
||||
#{type := standalone} = R ->
|
||||
|
||||
@@ -42,4 +42,5 @@ start_link() ->
|
||||
%% ===================================================================
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }.
|
||||
{ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
|
||||
, ?CHILD(mnesia_rocksdb_params, worker)]} }.
|
||||
|
||||
+128
-49
@@ -105,6 +105,8 @@
|
||||
-include("mnesia_rocksdb.hrl").
|
||||
-include("mnesia_rocksdb_int.hrl").
|
||||
|
||||
-define(BATCH_REF_DUMMY, '$mrdb_batch_ref_dummy').
|
||||
|
||||
-type tab_name() :: atom().
|
||||
-type alias() :: atom().
|
||||
-type admin_tab() :: {admin, alias()}.
|
||||
@@ -115,7 +117,9 @@
|
||||
| index()
|
||||
| retainer().
|
||||
|
||||
-type retries() :: non_neg_integer().
|
||||
-type inner() :: non_neg_integer().
|
||||
-type outer() :: non_neg_integer().
|
||||
-type retries() :: outer() | {inner(), outer()}.
|
||||
|
||||
%% activity type 'ets' makes no sense in this context
|
||||
-type mnesia_activity_type() :: transaction
|
||||
@@ -141,7 +145,7 @@
|
||||
|
||||
-type tx_activity() :: #{ type := 'tx'
|
||||
, handle := tx_handle()
|
||||
, attempt := non_neg_integer() }.
|
||||
, attempt := 'undefined' | retries() }.
|
||||
-type batch_activity() :: #{ type := 'batch'
|
||||
, handle := batch_handle() }.
|
||||
-type activity() :: tx_activity() | batch_activity().
|
||||
@@ -246,7 +250,7 @@ release_snapshot(SHandle) ->
|
||||
%% <li> `{tx, TxOpts}' - A `rocksdb' transaction with sligth modifications</li>
|
||||
%% <li> `batch' - A `rocksdb' batch operation</li>
|
||||
%% </ul>
|
||||
%%
|
||||
%%
|
||||
%% By default, transactions are combined with a snapshot with 1 retry.
|
||||
%% The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
|
||||
%% A transaction will be retried if it detects that the commit set conflicts with recent changes.
|
||||
@@ -254,6 +258,14 @@ release_snapshot(SHandle) ->
|
||||
%% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
|
||||
%% the commit set. It will then be re-run again, until the retry count is exhausted.
|
||||
%%
|
||||
%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a
|
||||
%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by
|
||||
%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
|
||||
%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last
|
||||
%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
|
||||
%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first
|
||||
%% attempt, but only on outer retries (the first retry is always an outer retry).
|
||||
%%
|
||||
%% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'.
|
||||
%%
|
||||
%% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and
|
||||
@@ -269,41 +281,92 @@ activity(Type, Alias, F) ->
|
||||
#{ alias => Alias
|
||||
, db_ref => DbRef }, TxCtxt);
|
||||
batch ->
|
||||
Batch = get_batch_(DbRef),
|
||||
Batch = init_batch_ref(DbRef),
|
||||
#{ activity => #{ type => batch
|
||||
, handle => Batch }
|
||||
, alias => Alias
|
||||
, db_ref => DbRef }
|
||||
end,
|
||||
do_activity(F, Alias, Ctxt, false).
|
||||
do_activity(F, Alias, Ctxt).
|
||||
|
||||
do_activity(F, Alias, Ctxt, WithLock) ->
|
||||
try run_f(F, Ctxt, WithLock, Alias) of
|
||||
Res ->
|
||||
try commit_and_pop(Res)
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
do_activity(F, Alias, Ctxt, true)
|
||||
end
|
||||
do_activity(F, Alias, Ctxt) ->
|
||||
try try_f(F, Ctxt)
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
retry_activity(F, Alias, Ctxt)
|
||||
end.
|
||||
|
||||
try_f(F, Ctxt) ->
|
||||
try_f(mnesia_compatible_aborts(Ctxt), F, Ctxt).
|
||||
|
||||
try_f(false, F, Ctxt) ->
|
||||
try run_f(F, Ctxt) of
|
||||
Res ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
Cat:Err:T ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult. Incompatible with mnesia, though.
|
||||
abort_and_pop(Cat, {Err, T})
|
||||
end;
|
||||
try_f(true, F, Ctxt) ->
|
||||
try run_f(F, Ctxt) of
|
||||
Res ->
|
||||
commit_and_pop(Res)
|
||||
catch
|
||||
throw:Something ->
|
||||
abort_and_pop(throw, Something);
|
||||
Cat:Err ->
|
||||
%% Without capturing the stacktract here,
|
||||
%% debugging gets pretty difficult
|
||||
abort_and_pop(Cat, Err)
|
||||
end.
|
||||
|
||||
run_f(F, Ctxt, false, _) ->
|
||||
push_ctxt(Ctxt),
|
||||
F();
|
||||
run_f(F, Ctxt, true, Alias) ->
|
||||
mrdb_mutex:do(
|
||||
Alias,
|
||||
fun() ->
|
||||
push_ctxt(incr_attempt(Ctxt)),
|
||||
F()
|
||||
end).
|
||||
|
||||
incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) ->
|
||||
run_f(F, Ctxt) ->
|
||||
push_ctxt(Ctxt),
|
||||
F().
|
||||
|
||||
incr_attempt(0, {_,O}) when O > 0 ->
|
||||
{outer, {0,1}};
|
||||
incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O),
|
||||
is_integer(Ri), is_integer(Ro) ->
|
||||
if I < Ri -> {inner, {I+1, O}};
|
||||
O < Ro -> {outer, {0, O+1}};
|
||||
true ->
|
||||
error
|
||||
end;
|
||||
incr_attempt(_, _) ->
|
||||
error.
|
||||
|
||||
retry_activity(F, Alias, #{activity := #{ type := Type
|
||||
, attempt := A
|
||||
, retries := R} = Act} = Ctxt) ->
|
||||
case incr_attempt(A, R) of
|
||||
{RetryCtxt, A1} ->
|
||||
Act1 = Act#{attempt := A1},
|
||||
Ctxt1 = Ctxt#{activity := Act1},
|
||||
try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1))
|
||||
catch
|
||||
throw:{?MODULE, busy} ->
|
||||
retry_activity(F, Alias, Ctxt1)
|
||||
end;
|
||||
error ->
|
||||
return_abort(Type, error, retry_limit)
|
||||
end.
|
||||
|
||||
retry_activity_(inner, F, Alias, Ctxt) ->
|
||||
mrdb_stats:incr(Alias, inner_retries, 1),
|
||||
try_f(F, Ctxt);
|
||||
retry_activity_(outer, F, Alias, Ctxt) ->
|
||||
mrdb_stats:incr(Alias, outer_retries, 1),
|
||||
mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end).
|
||||
|
||||
restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) ->
|
||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||
Act1 = Act#{attempt := A+1, handle := TxH},
|
||||
Act1 = Act#{handle := TxH},
|
||||
C1 = C#{ activity := Act1 },
|
||||
case maps:is_key(snapshot, C) of
|
||||
true ->
|
||||
@@ -368,7 +431,7 @@ apply_tx_opts(Opts0) when is_map(Opts0) ->
|
||||
check_tx_opts(maps:merge(default_tx_opts(), Opts0)).
|
||||
|
||||
check_tx_opts(Opts) ->
|
||||
case maps:without([no_snapshot, retries], Opts) of
|
||||
case maps:without([no_snapshot, retries, mnesia_compatible], Opts) of
|
||||
Other when map_size(Other) > 0 ->
|
||||
abort({invalid_tx_opts, maps:keys(Other)});
|
||||
_ ->
|
||||
@@ -376,10 +439,14 @@ check_tx_opts(Opts) ->
|
||||
end.
|
||||
|
||||
check_retries(#{retries := Retries} = Opts) ->
|
||||
if is_integer(Retries), Retries >= 0 ->
|
||||
Opts;
|
||||
true ->
|
||||
error({invalid_tx_option, {retries, Retries}})
|
||||
case Retries of
|
||||
_ when is_integer(Retries), Retries >= 0 ->
|
||||
Opts#{retries := {0, Retries}};
|
||||
{Inner, Outer} when is_integer(Inner), is_integer(Outer),
|
||||
Inner >= 0, Outer >= 0 ->
|
||||
Opts;
|
||||
_ ->
|
||||
error({invalid_tx_option, {retries, Retries}})
|
||||
end.
|
||||
|
||||
check_nosnap(#{no_snapshot := NoSnap} = Opts) ->
|
||||
@@ -394,7 +461,7 @@ create_tx(Opts, DbRef) ->
|
||||
{ok, TxH} = rdb_transaction(DbRef, []),
|
||||
Opts#{activity => maps:merge(Opts, #{ type => tx
|
||||
, handle => TxH
|
||||
, attempt => 1})}.
|
||||
, attempt => 0 })}.
|
||||
|
||||
maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) ->
|
||||
case NoSnap of
|
||||
@@ -414,8 +481,7 @@ commit_and_pop(Res) ->
|
||||
Res;
|
||||
{error, {error, "Resource busy" ++ _ = Busy}} ->
|
||||
case A of
|
||||
#{retries := Retries, attempt := Att}
|
||||
when Att =< Retries ->
|
||||
#{retries := {I,O}} when I > 0; O > 0 ->
|
||||
throw({?MODULE, busy});
|
||||
_ ->
|
||||
error({error, Busy})
|
||||
@@ -477,6 +543,11 @@ re_throw(Cat, Err) ->
|
||||
mnesia_compatible_aborts() ->
|
||||
mnesia_rocksdb_admin:get_cached_env(mnesia_compatible_aborts, false).
|
||||
|
||||
mnesia_compatible_aborts(#{activity := #{mnesia_compatible := Bool}}) ->
|
||||
Bool;
|
||||
mnesia_compatible_aborts(_) ->
|
||||
mnesia_compatible_aborts().
|
||||
|
||||
fix_error({aborted, Err}) ->
|
||||
Err;
|
||||
fix_error(Err) ->
|
||||
@@ -486,7 +557,7 @@ rdb_transaction(DbRef, Opts) ->
|
||||
rocksdb:transaction(DbRef, Opts).
|
||||
|
||||
rdb_transaction_commit_and_pop(H) ->
|
||||
try rdb_transaction_commit(H)
|
||||
try rdb_transaction_commit(H)
|
||||
after
|
||||
pop_ctxt()
|
||||
end.
|
||||
@@ -507,10 +578,13 @@ rdb_write_batch_and_pop(BatchRef, C) ->
|
||||
pop_ctxt()
|
||||
end.
|
||||
|
||||
rdb_release_batch(?BATCH_REF_DUMMY) ->
|
||||
ok;
|
||||
rdb_release_batch(H) ->
|
||||
rocksdb:release_batch(H).
|
||||
|
||||
%% @doc Aborts an ongoing {@link activity/2}
|
||||
-spec abort(_) -> no_return().
|
||||
abort(Reason) ->
|
||||
case mnesia_compatible_aborts() of
|
||||
true ->
|
||||
@@ -529,7 +603,7 @@ new_tx(#{activity := _}, _) ->
|
||||
new_tx(Tab, Opts) ->
|
||||
#{db_ref := DbRef} = R = ensure_ref(Tab),
|
||||
{ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)),
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}.
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 0}}.
|
||||
|
||||
-spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref().
|
||||
tx_ref(Tab, TxH) ->
|
||||
@@ -539,7 +613,7 @@ tx_ref(Tab, TxH) ->
|
||||
#{activity := #{type := tx, handle := OtherTxH}} ->
|
||||
error({tx_handle_conflict, OtherTxH});
|
||||
R ->
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 1}}
|
||||
R#{activity => #{type => tx, handle => TxH, attempt => 0}}
|
||||
end.
|
||||
|
||||
-spec tx_commit(tx_handle() | db_ref()) -> ok.
|
||||
@@ -798,7 +872,7 @@ update_index_do_bag(Ixs, Name, R, Key, Obj, Opts) ->
|
||||
not_found
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
update_index_do([{_Pos,ordered} = Ix|Ixs], Name, R, Key, Obj, Rest, Opts) ->
|
||||
Tab = {Name, index, Ix},
|
||||
#{ix_vals_f := IxValsF} = IRef = ensure_ref(Tab, R),
|
||||
@@ -889,7 +963,7 @@ index_read_(#{name := Main, semantics := Sem} = Ref, Val, Ix) ->
|
||||
_ when is_atom(Ix) ->
|
||||
{attr_pos(Ix, Ref), ordered};
|
||||
{_} ->
|
||||
Ix;
|
||||
{Ix, ordered};
|
||||
_ when is_integer(Ix) ->
|
||||
{Ix, ordered}
|
||||
end,
|
||||
@@ -970,7 +1044,7 @@ alias_of(Tab) ->
|
||||
%% and when releasing, all batches are released. This will not ensure
|
||||
%% atomicity, but there is no way in rocksdb to achieve atomicity
|
||||
%% across db instances. At least, data should end up where you expect.
|
||||
%%
|
||||
%%
|
||||
%% @end
|
||||
-spec as_batch(ref_or_tab(), fun( (db_ref()) -> Res )) -> Res.
|
||||
as_batch(Tab, F) ->
|
||||
@@ -1010,18 +1084,20 @@ get_batch(#{db_ref := DbRef, batch := BatchRef}) ->
|
||||
get_batch(_) ->
|
||||
{error, badarg}.
|
||||
|
||||
get_batch_(DbRef) ->
|
||||
init_batch_ref(DbRef) ->
|
||||
Ref = make_ref(),
|
||||
{ok, Batch} = rdb_batch(),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => ?BATCH_REF_DUMMY}),
|
||||
Ref.
|
||||
|
||||
get_batch_(DbRef) -> Ref = make_ref(), {ok, Batch} = rdb_batch(),
|
||||
pdict_put({mrdb_batch, Ref}, #{DbRef => Batch}), Ref.
|
||||
|
||||
get_batch_(DbRef, BatchRef) ->
|
||||
Key = batch_ref_key(BatchRef),
|
||||
case pdict_get(Key) of
|
||||
undefined ->
|
||||
error(stale_batch_ref);
|
||||
#{DbRef := Batch} ->
|
||||
#{DbRef := Batch} when Batch =/= ?BATCH_REF_DUMMY ->
|
||||
Batch;
|
||||
Map ->
|
||||
{ok, Batch} = rdb_batch(),
|
||||
@@ -1050,7 +1126,9 @@ write_batches(BatchRef, Opts) ->
|
||||
pdict_erase(Key),
|
||||
ret_batch_write_acc(
|
||||
maps:fold(
|
||||
fun(DbRef, Batch, Acc) ->
|
||||
fun(_, ?BATCH_REF_DUMMY, Acc) ->
|
||||
Acc;
|
||||
(DbRef, Batch, Acc) ->
|
||||
case rocksdb:write_batch(DbRef, Batch, Opts) of
|
||||
ok ->
|
||||
Acc;
|
||||
@@ -1269,7 +1347,7 @@ rdb_fold(Tab, Fun, Acc, Prefix, Limit) when is_function(Fun, 3)
|
||||
true = valid_limit(Limit),
|
||||
mrdb_select:rdb_fold(ensure_ref(Tab), Fun, Acc, Prefix, Limit).
|
||||
|
||||
valid_limit(L) ->
|
||||
valid_limit(L) ->
|
||||
case L of
|
||||
infinity ->
|
||||
true;
|
||||
@@ -1494,8 +1572,9 @@ rdb_iterator(R) -> rdb_iterator(R, []).
|
||||
rdb_iterator(R, Opts) ->
|
||||
rdb_iterator_(R, read_opts(R, Opts)).
|
||||
|
||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
||||
rocksdb:transaction_iterator(DbRef, TxH, CfH, ROpts);
|
||||
rdb_iterator_(#{cf_handle := CfH, activity := #{type := tx, handle := TxH}}, ROpts) ->
|
||||
%% Note: versions before 1.8.0 expected DbRef as first argument
|
||||
rocksdb:transaction_iterator(TxH, CfH, ROpts);
|
||||
rdb_iterator_(#{db_ref := DbRef, cf_handle := CfH}, ROpts) ->
|
||||
rocksdb:iterator(DbRef, CfH, ROpts).
|
||||
|
||||
@@ -1506,11 +1585,11 @@ rdb_merge_(#{db_ref := DbRef, cf_handle := CfH}, K, Op, WOpts) ->
|
||||
rocksdb:merge(DbRef, CfH, K, Op, WOpts).
|
||||
|
||||
write_opts(#{write_opts := Os}, Opts) -> Os ++ Opts;
|
||||
write_opts(_, Opts) ->
|
||||
write_opts(_, Opts) ->
|
||||
Opts.
|
||||
|
||||
read_opts(#{read_opts := Os}, Opts) -> Os ++ Opts;
|
||||
read_opts(_, Opts) ->
|
||||
read_opts(_, Opts) ->
|
||||
Opts.
|
||||
|
||||
-define(EOT, '$end_of_table').
|
||||
|
||||
+42
-3
@@ -6,11 +6,14 @@
|
||||
, iterator_move/2
|
||||
, iterator/2
|
||||
, iterator_close/1
|
||||
, fold/4
|
||||
, rev_fold/4
|
||||
, index_ref/2
|
||||
]).
|
||||
|
||||
-record(mrdb_ix_iter, { i :: mrdb:iterator()
|
||||
-record(mrdb_ix_iter, { i :: mrdb:mrdb_iterator()
|
||||
, type = set :: set | bag
|
||||
, sub :: mrdb:ref() | pid()
|
||||
, sub :: pid() | mrdb:db_ref()
|
||||
}).
|
||||
|
||||
-type ix_iterator() :: #mrdb_ix_iter{}.
|
||||
@@ -19,7 +22,7 @@
|
||||
|
||||
-type object() :: tuple().
|
||||
|
||||
-record(subst, { i :: mrdb:iterator()
|
||||
-record(subst, { i :: mrdb:mrdb_iterator()
|
||||
, vals_f
|
||||
, cur
|
||||
, mref }).
|
||||
@@ -62,6 +65,41 @@ iterator(Tab, IxPos) ->
|
||||
iterator_move(#mrdb_ix_iter{type = set} = IxI, Dir) -> iterator_move_set(IxI, Dir);
|
||||
iterator_move(#mrdb_ix_iter{type = bag} = IxI, Dir) -> iterator_move_bag(IxI, Dir).
|
||||
|
||||
-spec fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||
when Acc :: any().
|
||||
%% Folds over the index table corresponding to Tab and IxPos.
|
||||
%% The fold fun is called with the index key and the corresponding object,
|
||||
%% or [] if there is no such object.
|
||||
fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||
fold_(Tab, IxPos, first, next, FoldFun, Acc).
|
||||
|
||||
-spec rev_fold(mrdb:ref_or_tab(), mrdb:index_position(),
|
||||
fun( (index_value(), object() | [], Acc) -> Acc ), Acc) -> Acc
|
||||
when Acc :: any().
|
||||
%% Like fold/4 above, but folds over the index in the reverse order.
|
||||
rev_fold(Tab, IxPos, FoldFun, Acc) when is_function(FoldFun, 3) ->
|
||||
fold_(Tab, IxPos, last, prev, FoldFun, Acc).
|
||||
|
||||
fold_(Tab, IxPos, Start, Dir, FoldFun, Acc) ->
|
||||
with_iterator(
|
||||
Tab, IxPos,
|
||||
fun(I) ->
|
||||
iter_fold(I, Start, Dir, FoldFun, Acc)
|
||||
end).
|
||||
|
||||
iter_fold(I, Start, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Start), I, Dir, Fun, Acc).
|
||||
|
||||
iter_fold_({ok, IxVal, Obj}, I, Dir, Fun, Acc) ->
|
||||
iter_fold_(iterator_move(I, Dir), I, Dir, Fun, Fun(IxVal, Obj, Acc));
|
||||
iter_fold_({error, _}, _, _, _, Acc) ->
|
||||
Acc.
|
||||
|
||||
index_ref(Tab, Pos) ->
|
||||
TRef = mrdb:ensure_ref(Tab),
|
||||
ensure_index_ref(Pos, TRef).
|
||||
|
||||
iterator_move_set(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
case mrdb:iterator_move(I, Dir) of
|
||||
{ok, {{FKey, PKey}}} ->
|
||||
@@ -83,6 +121,7 @@ iterator_move_bag(#mrdb_ix_iter{i = I, sub = Sub}, Dir) ->
|
||||
Other
|
||||
end.
|
||||
|
||||
-spec opt_read(mrdb:ref_or_tab(), Key :: any()) -> any().
|
||||
opt_read(R, Key) ->
|
||||
case mrdb:read(R, Key, []) of
|
||||
[Obj] ->
|
||||
|
||||
+92
-62
@@ -3,79 +3,109 @@
|
||||
|
||||
-export([ do/2 ]).
|
||||
|
||||
-export([ ensure_tab/0 ]).
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
||||
-define(LOCK_TAB, ?MODULE).
|
||||
|
||||
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
|
||||
%% The claim operation is done using an atomic list of two updates:
|
||||
%% first, incrementing with 0 - this returns the previous value
|
||||
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
|
||||
%% regardless of previous value. This means that if [0,1] is returned, the resource
|
||||
%% was not locked previously; if [1,1] is returned, it was.
|
||||
%% We use a gen_server-based FIFO queue (one queue per alias) to manage the
|
||||
%% critical section.
|
||||
%%
|
||||
%% Releasing the resource is done by deleting the resource. If we just decrement,
|
||||
%% we will end up with lingering unlocked resources, so we might as well delete.
|
||||
%% Either operation is atomic, and the claim op creates the object if it's missing.
|
||||
%% Releasing the resource is done by notifying the server.
|
||||
%%
|
||||
%% Previous implementations tested:
|
||||
%% * A counter (in ets), incremented atomically first with 0, then 1. This lets
|
||||
%% the caller know if the 'semaphore' was 1 or zero before. If it was already
|
||||
%% 1, busy-loop over the counter until it reads as a transition from 0 to 1.
|
||||
%% This is a pretty simple implementation, but it lacks ordering, and appeared
|
||||
%% to sometimes lead to starvation.
|
||||
%% * A duplicate_bag ets table: These are defined such that insertion order is
|
||||
%% preserved. Thus, they can serve as a mutex with FIFO characteristics.
|
||||
%% Unfortunately, performance plummeted when tested with > 25 concurrent
|
||||
%% processes. While this is likely a very high number, given that we're talking
|
||||
%% about contention in a system using optimistic concurrency, it's never good
|
||||
%% if performance falls off a cliff.
|
||||
|
||||
do(Rsrc, F) when is_function(F, 0) ->
|
||||
true = claim(Rsrc),
|
||||
{ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
|
||||
try F()
|
||||
after
|
||||
release(Rsrc)
|
||||
mrdb_mutex_serializer:done(Rsrc, Ref)
|
||||
end.
|
||||
|
||||
claim(Rsrc) ->
|
||||
case claim_(Rsrc) of
|
||||
true -> true;
|
||||
false -> busy_wait(Rsrc, 1000)
|
||||
-ifdef(TEST).
|
||||
|
||||
mutex_test_() ->
|
||||
{foreach,
|
||||
fun setup/0,
|
||||
fun cleanup/1,
|
||||
[
|
||||
{"Check that all operations complete", fun swarm_do/0}
|
||||
]}.
|
||||
|
||||
setup() ->
|
||||
case whereis(mrdb_mutex_serializer) of
|
||||
undefined ->
|
||||
{ok, Pid} = mrdb_mutex_serializer:start_link(),
|
||||
Pid;
|
||||
Pid ->
|
||||
Pid
|
||||
end.
|
||||
|
||||
claim_(Rsrc) ->
|
||||
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
|
||||
{2, 1, 1, 1}], {Rsrc, 0}) of
|
||||
[0, 1] ->
|
||||
%% have lock
|
||||
true;
|
||||
[1, 1] ->
|
||||
false
|
||||
end.
|
||||
cleanup(Pid) ->
|
||||
unlink(Pid),
|
||||
exit(Pid, kill).
|
||||
|
||||
%% The busy-wait function makes use of the fact that we can read a timer to find out
|
||||
%% if it still has time remaining. This reduces the need for selective receive, looking
|
||||
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
|
||||
%% also be necessary for the `read_timer/1` value to refresh.
|
||||
%%
|
||||
busy_wait(Rsrc, Timeout) ->
|
||||
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
|
||||
do_wait(Rsrc, Ref).
|
||||
|
||||
do_wait(Rsrc, Ref) ->
|
||||
erlang:yield(),
|
||||
case erlang:read_timer(Ref) of
|
||||
false ->
|
||||
erlang:cancel_timer(Ref),
|
||||
error(lock_wait_timeout);
|
||||
_ ->
|
||||
case claim_(Rsrc) of
|
||||
true ->
|
||||
erlang:cancel_timer(Ref),
|
||||
ok;
|
||||
false ->
|
||||
do_wait(Rsrc, Ref)
|
||||
end
|
||||
end.
|
||||
|
||||
release(Rsrc) ->
|
||||
ets:delete(?LOCK_TAB, Rsrc),
|
||||
swarm_do() ->
|
||||
Rsrc = ?LINE,
|
||||
Pid = spawn(fun() -> collect([]) end),
|
||||
L = lists:seq(1, 1000),
|
||||
Evens = [X || X <- L, is_even(X)],
|
||||
Pids = [spawn_monitor(fun() ->
|
||||
send_even(Rsrc, N, Pid)
|
||||
end) || N <- lists:seq(1,1000)],
|
||||
await_pids(Pids),
|
||||
Results = fetch(Pid),
|
||||
{incorrect_results, []} = {incorrect_results, Results -- Evens},
|
||||
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
|
||||
ok.
|
||||
|
||||
|
||||
%% Called by the process holding the ets table.
|
||||
ensure_tab() ->
|
||||
case ets:info(?LOCK_TAB, name) of
|
||||
undefined ->
|
||||
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
|
||||
_ ->
|
||||
true
|
||||
collect(Acc) ->
|
||||
receive
|
||||
{_, result, N} ->
|
||||
collect([N|Acc]);
|
||||
{From, fetch} ->
|
||||
From ! {fetch_reply, Acc},
|
||||
done
|
||||
end.
|
||||
|
||||
fetch(Pid) ->
|
||||
Pid ! {self(), fetch},
|
||||
receive
|
||||
{fetch_reply, Result} ->
|
||||
Result
|
||||
end.
|
||||
|
||||
is_even(N) ->
|
||||
(N rem 2) =:= 0.
|
||||
|
||||
await_pids([{_, MRef}|Pids]) ->
|
||||
receive
|
||||
{'DOWN', MRef, _, _, _} ->
|
||||
await_pids(Pids)
|
||||
after 10000 ->
|
||||
error(timeout)
|
||||
end;
|
||||
await_pids([]) ->
|
||||
ok.
|
||||
|
||||
send_even(Rsrc, N, Pid) ->
|
||||
do(Rsrc, fun() ->
|
||||
case is_even(N) of
|
||||
true ->
|
||||
Pid ! {self(), result, N};
|
||||
false ->
|
||||
exit(not_even)
|
||||
end
|
||||
end).
|
||||
|
||||
-endif.
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
-module(mrdb_mutex_serializer).
|
||||
|
||||
-behavior(gen_server).
|
||||
|
||||
-export([wait/1,
|
||||
done/2]).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-record(st, { queues = #{}
|
||||
, empty_q = queue:new() }). %% perhaps silly optimization
|
||||
|
||||
wait(Rsrc) ->
|
||||
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
|
||||
|
||||
done(Rsrc, Ref) ->
|
||||
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
{ok, #st{}}.
|
||||
|
||||
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
|
||||
, empty_q = NewQ } = St) ->
|
||||
MRef = erlang:monitor(process, Pid),
|
||||
Q0 = maps:get(Rsrc, Queues, NewQ),
|
||||
WasEmpty = queue:is_empty(Q0),
|
||||
Q1 = queue:in({From, MRef}, Q0),
|
||||
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
|
||||
case WasEmpty of
|
||||
true ->
|
||||
{reply, {ok, MRef}, St1};
|
||||
false ->
|
||||
{noreply, St1}
|
||||
end.
|
||||
|
||||
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
|
||||
case maps:find(Rsrc, Queues) of
|
||||
{ok, Q} ->
|
||||
case queue:out(Q) of
|
||||
{{value, {_From, MRef}}, Q1} ->
|
||||
erlang:demonitor(MRef),
|
||||
ok = maybe_dispatch_one(Q1),
|
||||
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
|
||||
{_, _} ->
|
||||
%% Not the lock holder
|
||||
{noreply, St}
|
||||
end;
|
||||
error ->
|
||||
{noreply, St}
|
||||
end.
|
||||
|
||||
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
|
||||
Queues1 =
|
||||
maps:map(
|
||||
fun(_Rsrc, Q) ->
|
||||
drop_or_filter(Q, MRef)
|
||||
end, Queues),
|
||||
{noreply, St#st{ queues = Queues1 }};
|
||||
handle_info(_, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
terminate(_, _) ->
|
||||
ok.
|
||||
|
||||
code_change(_FromVsn, St, _Extra) ->
|
||||
{ok, St}.
|
||||
|
||||
%% In this function, we don't actually pop
|
||||
maybe_dispatch_one(Q) ->
|
||||
case queue:peek(Q) of
|
||||
empty ->
|
||||
ok;
|
||||
{value, {From, MRef}} ->
|
||||
gen_server:reply(From, {ok, MRef}),
|
||||
ok
|
||||
end.
|
||||
|
||||
drop_or_filter(Q, MRef) ->
|
||||
case queue:peek(Q) of
|
||||
{value, {_, MRef}} ->
|
||||
Q1 = queue:drop(Q),
|
||||
ok = maybe_dispatch_one(Q1),
|
||||
Q1;
|
||||
{value, _Other} ->
|
||||
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
|
||||
empty ->
|
||||
Q
|
||||
end.
|
||||
@@ -0,0 +1,74 @@
|
||||
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
|
||||
%% @doc Statistics API for the mnesia_rocksdb plugin
|
||||
%%
|
||||
%% Some counters are maintained for each active alias. Currently, the following
|
||||
%% counters are supported:
|
||||
%% * inner_retries
|
||||
%% * outer_retries
|
||||
%%
|
||||
-module(mrdb_stats).
|
||||
|
||||
-export([new/0]).
|
||||
|
||||
-export([incr/3,
|
||||
get/1,
|
||||
get/2]).
|
||||
|
||||
-type alias() :: mnesia_rocksdb:alias().
|
||||
-type db_ref() :: mrdb:db_ref().
|
||||
-type counter() :: atom().
|
||||
-type increment() :: integer().
|
||||
|
||||
-type counters() :: #{ counter() := integer() }.
|
||||
|
||||
new() ->
|
||||
#{ ref => counters:new(map_size(ctr_meta()), [write_concurrency])
|
||||
, meta => ctr_meta()}.
|
||||
|
||||
ctr_meta() ->
|
||||
#{ inner_retries => 1
|
||||
, outer_retries => 2 }.
|
||||
|
||||
-spec incr(alias() | db_ref(), counter(), increment()) -> ok.
|
||||
%% @doc Increment `Ctr' counter for `Alias` with increment `N'.
|
||||
%%
|
||||
%% Note that the first argument may also be a `db_ref()' map,
|
||||
%% corresponding to `mrdb:get_ref({admin, Alias})'.
|
||||
%% @end
|
||||
incr(Alias, Ctr, N) when is_atom(Alias) ->
|
||||
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||
incr_(Ref, Meta, Ctr, N);
|
||||
incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) ->
|
||||
incr_(Ref, Meta, Ctr, N).
|
||||
|
||||
-spec get(alias() | db_ref(), counter()) -> integer().
|
||||
%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'.
|
||||
%% @end
|
||||
get(Alias, Ctr) when is_atom(Alias) ->
|
||||
#{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}),
|
||||
get_(Ref, Meta, Ctr);
|
||||
get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) ->
|
||||
get_(Ref, Meta, Ctr).
|
||||
|
||||
-spec get(alias() | db_ref()) -> counters().
|
||||
%% @doc Fetches all known counters for `Alias', in the form of a map,
|
||||
%% `#{Counter => Value}'.
|
||||
%% @end
|
||||
get(Alias) when is_atom(Alias) ->
|
||||
get_(mrdb:get_ref({admin, Alias}));
|
||||
get(Ref) when is_map(Ref) ->
|
||||
get_(Ref).
|
||||
|
||||
get_(#{stats := #{ref := Ref, meta := Meta}}) ->
|
||||
lists:foldl(
|
||||
fun({K, P}, M) ->
|
||||
M#{K := counters:get(Ref, P)}
|
||||
end, Meta, maps:to_list(Meta)).
|
||||
|
||||
get_(Ref, Meta, Attr) ->
|
||||
Ix = maps:get(Attr, Meta),
|
||||
counters:get(Ref, Ix).
|
||||
|
||||
incr_(Ref, Meta, Attr, N) ->
|
||||
Ix = maps:get(Attr, Meta),
|
||||
counters:add(Ref, Ix, N).
|
||||
+149
-14
@@ -24,6 +24,7 @@
|
||||
, mrdb_abort/1
|
||||
, mrdb_two_procs/1
|
||||
, mrdb_two_procs_tx_restart/1
|
||||
, mrdb_two_procs_tx_inner_restart/1
|
||||
, mrdb_two_procs_snap/1
|
||||
, mrdb_three_procs/1
|
||||
]).
|
||||
@@ -53,6 +54,7 @@ groups() ->
|
||||
, mrdb_abort
|
||||
, mrdb_two_procs
|
||||
, mrdb_two_procs_tx_restart
|
||||
, mrdb_two_procs_tx_inner_restart
|
||||
, mrdb_two_procs_snap
|
||||
, mrdb_three_procs ]}
|
||||
].
|
||||
@@ -267,7 +269,7 @@ mrdb_abort(Config) ->
|
||||
Pre = mrdb:read(tx_abort, a),
|
||||
D0 = get_dict(),
|
||||
TRes = try mrdb:activity(
|
||||
tx, rdb,
|
||||
{tx, #{mnesia_compatible => true}}, rdb,
|
||||
fun() ->
|
||||
[{tx_abort, a, N}] = mrdb:read(tx_abort, a),
|
||||
error(abort_here),
|
||||
@@ -287,10 +289,17 @@ mrdb_abort(Config) ->
|
||||
mrdb_two_procs(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_two_procs_/1, Config,
|
||||
tr_flags(
|
||||
{self(), [call, sos, p]},
|
||||
{self(), [call, sos, p, 'receive']},
|
||||
tr_patterns(
|
||||
mrdb, [ {mrdb, insert, 2, x}
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, retry_activity, 3, x}
|
||||
, {mrdb, try_f, 2, x}
|
||||
, {mrdb, incr_attempt, 2, x}
|
||||
, {mrdb_mutex, do, 2, x}
|
||||
, {mrdb_mutex_serializer, do, 2, x}
|
||||
, {?MODULE, wait_for_other, 2, x}
|
||||
, {?MODULE, go_ahead_other, 1, x}
|
||||
, {mrdb, activity, x}], tr_opts()))).
|
||||
|
||||
mrdb_two_procs_(Config) ->
|
||||
@@ -314,11 +323,16 @@ mrdb_two_procs_(Config) ->
|
||||
Pre = mrdb:read(R, a),
|
||||
go_ahead_other(POther),
|
||||
await_other_down(POther, MRef, ?LINE),
|
||||
%% The test proc is still inside the transaction, and POther
|
||||
%% has terminated/committed. If we now read 'a', we should see
|
||||
%% the value that POther wrote (no isolation).
|
||||
[{R, a, 17}] = mrdb:read(R, a),
|
||||
ok = mrdb:insert(R, {R, a, 18})
|
||||
end,
|
||||
go_ahead_other(1, POther),
|
||||
go_ahead_other(0, POther),
|
||||
Do0 = get_dict(),
|
||||
%% Our transaction should fail due to the resource conflict, and because
|
||||
%% we're not using retries.
|
||||
try mrdb:activity({tx, #{no_snapshot => true,
|
||||
retries => 0}}, rdb, F1) of
|
||||
ok -> error(unexpected)
|
||||
@@ -339,6 +353,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
Parent = self(),
|
||||
Created = create_tabs([{R, []}], Config),
|
||||
check_stats(rdb),
|
||||
mrdb:insert(R, {R, a, 1}),
|
||||
Pre = mrdb:read(R, a),
|
||||
F0 = fun() ->
|
||||
@@ -354,7 +369,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
OtherWrite = [{R, a, 17}],
|
||||
Att = get_attempt(),
|
||||
Expected = case Att of
|
||||
1 -> Pre;
|
||||
0 -> Pre;
|
||||
_ -> OtherWrite
|
||||
end,
|
||||
Expected = mrdb:read(R, a),
|
||||
@@ -363,14 +378,104 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
OtherWrite = mrdb:read(R, a),
|
||||
ok = mrdb:insert(R, {R, a, 18})
|
||||
end,
|
||||
go_ahead_other(1, POther),
|
||||
go_ahead_other(0, POther),
|
||||
Do0 = get_dict(),
|
||||
mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1),
|
||||
dictionary_unchanged(Do0),
|
||||
check_stats(rdb),
|
||||
[{R, a, 18}] = mrdb:read(R, a),
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
mrdb_two_procs_tx_inner_restart(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config,
|
||||
dbg_tr_opts()).
|
||||
|
||||
mrdb_two_procs_tx_inner_restart_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
Parent = self(),
|
||||
Created = create_tabs([{R, []}], Config),
|
||||
mrdb:insert(R, {R, a, 1}),
|
||||
mrdb:insert(R, {R, b, 1}),
|
||||
PreA = mrdb:read(R, a),
|
||||
PreB = mrdb:read(R, b),
|
||||
#{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb),
|
||||
F0 = fun() ->
|
||||
wait_for_other(Parent, ?LINE),
|
||||
ok = mrdb:insert(R, {R, a, 17}),
|
||||
wait_for_other(Parent, ?LINE)
|
||||
end,
|
||||
F1 = fun() ->
|
||||
wait_for_other(Parent, ?LINE),
|
||||
ok = mrdb:insert(R, {R, b, 147}),
|
||||
wait_for_other(Parent, ?LINE)
|
||||
end,
|
||||
|
||||
Spawn = fun(F) ->
|
||||
Res = spawn_opt(
|
||||
fun() ->
|
||||
ok = mrdb:activity(tx, rdb, F)
|
||||
end, [monitor]),
|
||||
Res
|
||||
end,
|
||||
{P1, MRef1} = Spawn(F0),
|
||||
{P2, MRef2} = Spawn(F1),
|
||||
F2 = fun() ->
|
||||
PostA = [{R, a, 17}], % once F0 (P1) has committed
|
||||
PostB = [{R, b, 147}], % once F1 (P2) has committed
|
||||
ARes = mrdb:read(R, a), % We need to read first to establish a pre-image
|
||||
BRes = mrdb:read(R, b),
|
||||
Att = get_attempt(),
|
||||
ct:log("Att = ~p", [Att]),
|
||||
case Att of
|
||||
0 -> % This is the first run (no retry yet)
|
||||
ARes = PreA,
|
||||
BRes = PreB,
|
||||
go_ahead_other(0, P1), % Having our pre-image, now let P1 write
|
||||
go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal)
|
||||
await_other_down(P1, MRef1, ?LINE),
|
||||
PostA = mrdb:read(R, a); % now, P1's write should leak through here
|
||||
{0, 1} -> % This is the first (outer) retry
|
||||
go_ahead_other(0, P2), % Let P2 write
|
||||
go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal)
|
||||
await_other_down(P2, MRef2, ?LINE),
|
||||
PostA = mrdb:read(R, a), % now we should see writes from both P1
|
||||
%% On OTP 23, the following step might fail due to timing, even
|
||||
%% though the trace output looks as expected. Possibly some quirk
|
||||
%% with leakage propagation time in rocksdb. Let's retry to be sure.
|
||||
ok = try_until(PostB, fun() -> mrdb:read(R, b) end); % ... and P2
|
||||
{1, 1} ->
|
||||
PostA = mrdb:read(R, a),
|
||||
PostB = mrdb:read(R, b),
|
||||
ok
|
||||
end,
|
||||
mrdb:insert(R, {R, a, 18}),
|
||||
mrdb:insert(R, {R, b, 18})
|
||||
end,
|
||||
Do0 = get_dict(),
|
||||
mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2),
|
||||
check_stats(rdb),
|
||||
dictionary_unchanged(Do0),
|
||||
[{R, a, 18}] = mrdb:read(R, a),
|
||||
[{R, b, 18}] = mrdb:read(R, b),
|
||||
#{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb),
|
||||
{restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}},
|
||||
delete_tabs(Created),
|
||||
ok.
|
||||
|
||||
try_until(Result, F) ->
|
||||
try_until(Result, F, 10).
|
||||
|
||||
try_until(Result, F, N) when N > 0 ->
|
||||
case F() of
|
||||
Result ->
|
||||
ok;
|
||||
_ ->
|
||||
receive after 100 -> ok end,
|
||||
try_until(Result, F, N-1)
|
||||
end;
|
||||
try_until(Result, F, _) ->
|
||||
error({badmatch, {Result, F}}).
|
||||
|
||||
%
|
||||
%% For testing purposes, we use side-effects inside the transactions
|
||||
@@ -383,7 +488,7 @@ mrdb_two_procs_tx_restart_(Config) ->
|
||||
%% attempt, and ignore the sync ops on retries.
|
||||
%%
|
||||
-define(IF_FIRST(N, Expr),
|
||||
if N == 1 ->
|
||||
if N == 0 ->
|
||||
Expr;
|
||||
true ->
|
||||
ok
|
||||
@@ -413,8 +518,8 @@ mrdb_two_procs_snap(Config) ->
|
||||
go_ahead_other(Att, POther),
|
||||
ARes = mrdb:read(R, a),
|
||||
ARes = case Att of
|
||||
1 -> Pre;
|
||||
2 -> [{R, a, 17}]
|
||||
0 -> Pre;
|
||||
_ -> [{R, a, 17}]
|
||||
end,
|
||||
await_other_down(POther, MRef, ?LINE),
|
||||
PreB = mrdb:read(R, b),
|
||||
@@ -434,7 +539,7 @@ mrdb_two_procs_snap(Config) ->
|
||||
%% We make sure that P2 commits before finishing the other two, and P3 and the
|
||||
%% main thread sync, so as to maximize the contention for the retry lock.
|
||||
mrdb_three_procs(Config) ->
|
||||
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()).
|
||||
tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()).
|
||||
|
||||
mrdb_three_procs_(Config) ->
|
||||
R = ?FUNCTION_NAME,
|
||||
@@ -452,7 +557,7 @@ mrdb_three_procs_(Config) ->
|
||||
spawn_opt(fun() ->
|
||||
D0 = get_dict(),
|
||||
do_when_p_allows(
|
||||
1, Parent, ?LINE,
|
||||
0, Parent, ?LINE,
|
||||
fun() ->
|
||||
ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1)
|
||||
end),
|
||||
@@ -488,8 +593,8 @@ mrdb_three_procs_(Config) ->
|
||||
fun() ->
|
||||
Att = get_attempt(),
|
||||
ARes = case Att of
|
||||
1 -> [A0];
|
||||
2 -> [A1]
|
||||
0 -> [A0];
|
||||
_ -> [A1]
|
||||
end,
|
||||
%% First, ensure that P2 tx is running
|
||||
go_ahead_other(Att, P2),
|
||||
@@ -519,6 +624,7 @@ tr_opts() ->
|
||||
, {?MODULE, await_other_down, 3, x}
|
||||
, {?MODULE, do_when_p_allows, 4, x}
|
||||
, {?MODULE, allow_p, 3, x}
|
||||
, {?MODULE, check_stats, 1, x}
|
||||
]}.
|
||||
|
||||
light_tr_opts() ->
|
||||
@@ -529,6 +635,23 @@ light_tr_opts() ->
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, activity, x} ], tr_opts())).
|
||||
|
||||
dbg_tr_opts() ->
|
||||
tr_flags(
|
||||
{self(), [call, sos, p, 'receive']},
|
||||
tr_patterns(
|
||||
mrdb, [ {mrdb, insert, 2, x}
|
||||
, {mrdb, read, 2, x}
|
||||
, {mrdb, retry_activity, 3, x}
|
||||
, {mrdb, try_f, 2, x}
|
||||
, {mrdb, incr_attempt, 2, x}
|
||||
, {mrdb, current_context, 0, x}
|
||||
, {mrdb_mutex, do, 2, x}
|
||||
, {mrdb_mutex_serializer, do, 2, x}
|
||||
, {?MODULE, wait_for_other, 2, x}
|
||||
, {?MODULE, go_ahead_other, 1, x}
|
||||
, {?MODULE, try_until, 3, x}
|
||||
, {mrdb, activity, x} ], tr_opts())).
|
||||
|
||||
tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) ->
|
||||
Pats1 = [P || P <- Pats, element(1,P) =/= Mod],
|
||||
Opts#{patterns => Ps ++ Pats1}.
|
||||
@@ -542,12 +665,13 @@ wait_for_other(Parent, L) ->
|
||||
wait_for_other(Att, Parent, L) ->
|
||||
wait_for_other(Att, Parent, 1000, L).
|
||||
|
||||
wait_for_other(1, Parent, Timeout, L) ->
|
||||
wait_for_other(0, Parent, Timeout, L) ->
|
||||
MRef = monitor(process, Parent),
|
||||
Parent ! {self(), ready},
|
||||
receive
|
||||
{Parent, cont} ->
|
||||
demonitor(MRef),
|
||||
Parent ! {self(), cont_ack},
|
||||
ok;
|
||||
{'DOWN', MRef, _, _, Reason} ->
|
||||
ct:log("Parent died, Reason = ~p", [Reason]),
|
||||
@@ -586,7 +710,13 @@ go_ahead_other(Att, POther, Timeout) ->
|
||||
go_ahead_other_(POther, Timeout) ->
|
||||
receive
|
||||
{POther, ready} ->
|
||||
POther ! {self(), cont}
|
||||
POther ! {self(), cont},
|
||||
receive
|
||||
{POther, cont_ack} ->
|
||||
ok
|
||||
after Timeout ->
|
||||
error(cont_ack_timeout)
|
||||
end
|
||||
after Timeout ->
|
||||
error(go_ahead_timeout)
|
||||
end.
|
||||
@@ -645,3 +775,8 @@ dictionary_unchanged(Old) ->
|
||||
, added := [] } = #{ deleted => Old -- New
|
||||
, added => New -- Old },
|
||||
ok.
|
||||
|
||||
check_stats(Alias) ->
|
||||
Stats = mrdb_stats:get(Alias),
|
||||
ct:log("Stats: ~p", [Stats]),
|
||||
Stats.
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
-export([
|
||||
index_plugin_mgmt/1
|
||||
, add_indexes/1
|
||||
, delete_indexes/1
|
||||
, create_bag_index/1
|
||||
, create_ordered_index/1
|
||||
, test_1_ram_copies/1
|
||||
@@ -40,10 +41,12 @@
|
||||
, fail_1_disc_only/1
|
||||
, plugin_ram_copies1/1
|
||||
, plugin_ram_copies2/1
|
||||
, plugin_ram_copies3/1
|
||||
, plugin_disc_copies/1
|
||||
, fail_plugin_disc_only/1
|
||||
, plugin_disc_copies_bag/1
|
||||
, plugin_rdb_ordered/1
|
||||
, plugin_rdb_none/1
|
||||
, index_iterator/1
|
||||
]).
|
||||
|
||||
@@ -73,10 +76,12 @@ run(Config) ->
|
||||
{pfx},mnesia_rocksdb, ix_prefixes),
|
||||
test_index_plugin(cfg([pr1, ram_copies, ordered], Config)),
|
||||
test_index_plugin(cfg([pr2, ram_copies, bag], Config)),
|
||||
test_index_plugin(cfg([pr3, ram_copies, none], Config)),
|
||||
test_index_plugin(cfg([pd1, disc_copies, ordered], Config)),
|
||||
fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Config)]),
|
||||
test_index_plugin(cfg([pd2, disc_copies, bag], Config)),
|
||||
test_index_plugin(cfg([pl2, rdb, ordered], Config)),
|
||||
test_index_plugin(cfg([pl3, rdb, none], Config)),
|
||||
index_plugin_mgmt(Config),
|
||||
ok.
|
||||
|
||||
@@ -94,6 +99,7 @@ groups() ->
|
||||
, create_ordered_index
|
||||
, index_plugin_mgmt
|
||||
, add_indexes
|
||||
, delete_indexes
|
||||
]}
|
||||
, {access, [sequence], [
|
||||
test_1_ram_copies
|
||||
@@ -104,10 +110,12 @@ groups() ->
|
||||
, {plugin, [sequence], [
|
||||
plugin_ram_copies1
|
||||
, plugin_ram_copies2
|
||||
, plugin_ram_copies3
|
||||
, plugin_disc_copies
|
||||
, fail_plugin_disc_only
|
||||
, plugin_disc_copies_bag
|
||||
, plugin_rdb_ordered
|
||||
, plugin_rdb_none
|
||||
]}
|
||||
].
|
||||
|
||||
@@ -177,11 +185,14 @@ fail_1_disc_only( _Cfg) -> fail(test, [1, disc_only_copies, do1]).
|
||||
|
||||
plugin_ram_copies1(Cfg) -> test_index_plugin(cfg([pr1, ram_copies, ordered], Cfg)).
|
||||
plugin_ram_copies2(Cfg) -> test_index_plugin(cfg([pr2, ram_copies, bag], Cfg)).
|
||||
plugin_ram_copies3(Cfg) -> test_index_plugin(cfg([pr3, ram_copies, none], Cfg)).
|
||||
plugin_disc_copies(Cfg) -> test_index_plugin(cfg([pd1, disc_copies, ordered], Cfg)).
|
||||
fail_plugin_disc_only(Cfg) -> fail(test_index_plugin, [cfg([pd2, disc_only_copies, ordered], Cfg)]).
|
||||
plugin_disc_copies_bag(Cfg) -> test_index_plugin(cfg([pd2, disc_copies, bag], Cfg)).
|
||||
plugin_rdb_ordered(Cfg) -> test_index_plugin(cfg([pl2, rdb, ordered], Cfg)).
|
||||
|
||||
plugin_rdb_none(Cfg) -> test_index_plugin(cfg([pl3, rdb, none], Cfg)).
|
||||
|
||||
test(N, Type, T) ->
|
||||
{atomic, ok} = mnesia:create_table(T, [{Type,[node()]},
|
||||
{attributes,[k,a,b,c]},
|
||||
@@ -204,7 +215,7 @@ add_del_indexes() ->
|
||||
test_index_plugin(Config) ->
|
||||
#{tab := Tab, type := Type, ixtype := IxType} = cfg(Config),
|
||||
{atomic, ok} = mnesia:create_table(Tab, [{Type, [node()]},
|
||||
{index, [{{pfx}, IxType}]}]),
|
||||
{index, [ixtype(IxType)]}]),
|
||||
mnesia:dirty_write({Tab, "foobar", "sentence"}),
|
||||
mnesia:dirty_write({Tab, "yellow", "sensor"}),
|
||||
mnesia:dirty_write({Tab, "truth", "white"}),
|
||||
@@ -216,13 +227,27 @@ test_index_plugin(Config) ->
|
||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
IxType == ordered ->
|
||||
IxType == ordered; IxType == none ->
|
||||
Res1 = lists:sort(mnesia:dirty_index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mnesia:dirty_index_read(Tab,<<"whi">>, {pfx})),
|
||||
[{Tab,"foobar","sentence"}] = mnesia:dirty_index_read(
|
||||
Tab, <<"foo">>, {pfx})
|
||||
end,
|
||||
if Type == rdb ->
|
||||
Res1 = lists:sort(mrdb:index_read(Tab,<<"sen">>, {pfx})),
|
||||
Res2 = lists:sort(mrdb:index_read(Tab,<<"whi">>, {pfx})),
|
||||
[{Tab,"foobar","sentence"}] = mrdb:index_read(
|
||||
Tab, <<"foo">>, {pfx});
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
ixtype(T) when T==bag;
|
||||
T==ordered ->
|
||||
{{pfx}, T};
|
||||
ixtype(none) ->
|
||||
{pfx}.
|
||||
|
||||
create_bag_index(_Config) ->
|
||||
{aborted, {combine_error, _, _}} =
|
||||
mnesia:create_table(bi, [{rdb, [node()]}, {index, [{val, bag}]}]),
|
||||
@@ -239,6 +264,25 @@ add_indexes(_Config) ->
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
ok.
|
||||
|
||||
delete_indexes(_Config) ->
|
||||
T = ?TAB(t1),
|
||||
{atomic, ok} = mnesia:create_table(T, [{rdb, [node()]}, {attributes, [k, a, b]}]),
|
||||
ok = mrdb:insert(T, {T, a, 1, 1}),
|
||||
ok = mrdb:insert(T, {T, b, 1, 2}),
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
[{1, {T,a,1,1}}, {1, {T,b,1,2}}] =
|
||||
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||
{atomic, ok} = mnesia:del_table_index(T, a),
|
||||
ok = mrdb:delete(T, b),
|
||||
ok = mrdb:insert(T, {T, c, 2, 3}),
|
||||
{atomic, ok} = mnesia:add_table_index(T, a),
|
||||
[{1, {T,a,1,1}}, {2, {T,c,2,3}}] =
|
||||
mrdb_index:rev_fold(T, a, fun ix_fold_acc/3, []),
|
||||
ok.
|
||||
|
||||
ix_fold_acc(K, V, Acc) ->
|
||||
[{K, V} | Acc].
|
||||
|
||||
index_plugin_mgmt(_Config) ->
|
||||
{aborted,_} = mnesia:create_table(x, [{index,[{unknown}]}]),
|
||||
{aborted,_} = mnesia:create_table(x, [{index,[{{unknown},bag}]}]),
|
||||
|
||||
@@ -149,8 +149,9 @@ create_migrateable_db(Config) ->
|
||||
Config.
|
||||
|
||||
fill_tabs(Tabs) ->
|
||||
%% Fill with more than 300, since that's the currently hard-coded chunk size
|
||||
lists:foreach(fun(Tab) ->
|
||||
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,3)]
|
||||
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,500)]
|
||||
end, Tabs).
|
||||
|
||||
create_tabs(Tabs, Config) ->
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
-module(mrdb_bench).
|
||||
|
||||
-compile(export_all).
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
||||
init() ->
|
||||
mnesia:delete_schema([node()]),
|
||||
|
||||
Reference in New Issue
Block a user