From 3635eac71701b7f368c6338834d9f5c4e8ad74dd Mon Sep 17 00:00:00 2001
From: Ulf Wiger Behaviours: gen_server, mnesia_backend_type. rocksdb storage backend for Mnesia. alias() = atom() data_tab() = atom() error() = {error, any()} index_info() = {index_pos(), index_type()} index_pos() = integer() | {atom()} index_tab() = {data_tab(), index, index_info()} index_type() = ordered retainer_name() = any() retainer_tab() = {data_tab(), retainer, retainer_name()} table() = data_tab() | index_tab() | retainer_tab() table_type() = set | ordered_set | bag add_aliases(Aliases) -> any() check_definition(Alias, Tab, Nodes, Props) -> any() close_table(Alias, Tab) -> any() code_change(FromVsn, St, Extra) -> any() create_schema(Nodes) -> any() create_schema(Nodes, Aliases) -> any() create_table(Alias, Tab, Props) -> any() decode_key(Key) -> any() decode_key(Key, Metadata) -> any() decode_val(Val) -> any() decode_val(Val, Key, Metadata) -> any() default_alias() -> any() delete(Alias, Tab, Key) -> any() delete_table(Alias, Tab) -> any() encode_key(Key) -> any() encode_key(Key, Metadata) -> any() encode_val(Val) -> any() encode_val(Val, Metadata) -> any() first(Alias, Tab) -> any() fixtable(Alias, Tab, Bool) -> any() handle_call(M, From, St) -> any() handle_cast(X1, St) -> any() handle_info(EXIT, St) -> any() index_is_consistent(Alias, X2, Bool) -> any() info(Alias, Tab, Item) -> any() init(X1) -> any() init_backend() -> any() Called by mnesia_schema in order to intialize the backend This is called when the backend is registered with the first alias, or ... See OTP issue #425 (16 Feb 2021). This callback is supposed to be called
+before first use of the backend, but unfortunately, it is only called at
+mnesia startup and when a backend module is registered MORE THAN ONCE.
+This means we need to handle this function being called multiple times. The bug has been fixed as of OTP 24.0-rc3 If processes need to be started, this can be done using
+
+
+Module mnesia_rocksdb
+rocksdb storage backend for Mnesia.
+
+Description
Data Types
+
+alias()
+data_tab()
+error()
+index_info()
+index_pos()
+index_tab()
+index_type()
+retainer_name()
+retainer_tab()
+table()
+table_type()
+Function Index
+Function Details
+
+add_aliases/1
+check_definition/4
+close_table/2
+code_change/3
+create_schema/1
+create_schema/2
+create_table/3
+decode_key/1
+decode_key/2
+decode_val/1
+decode_val/3
+default_alias/0
+delete/3
+delete_table/2
+encode_key/1
+encode_key/2
+encode_val/1
+encode_val/2
+first/2
+fixtable/3
+handle_call/3
+handle_cast/2
+handle_info/2
+index_is_consistent/3
+info/3
+init/1
+init_backend/0
+mnesia_ext_sup:start_proc(Name, Mod, F, Args [, Opts])
+where Opts are parameters for the supervised child:restart
(default: transient
)
+ * shutdown
(default: 120000
)
+ * type
(default: worker
)
+ * modules
(default: [Mod]
)
insert(Alias, Tab, Obj) -> any()
++
is_index_consistent(Alias, X2) -> any()
++
ix_listvals(Tab, Pos, Obj) -> any()
++
ix_prefixes(Tab, Pos, Obj) -> any()
++
last(Alias, Tab) -> any()
++
load_table(Alias, Tab, LoadReason, Props) -> any()
++
lookup(Alias, Tab, Key) -> any()
++
match_delete(Alias, Tab, Pat) -> any()
++
next(Alias, Tab, Key) -> any()
++
prev(Alias, Tab, Key) -> any()
++
real_suffixes() -> any()
++
receive_data(Data, Alias, Tab, Sender, State) -> any()
++
receive_done(Alias, Tab, Sender, State) -> any()
++
receiver_first_message(Pid, Msg, Alias, Tab) -> any()
++
register() -> {ok, alias()} | {error, term()}
+
Equivalent to register(rocksdb_copies).
+ + +Convenience function for registering a mnesia_rocksdb backend plugin
+ + The function used to register a plugin ismnesia_schema:add_backend_type(Alias, Module)
+ where Module
implements a backend_type behavior. Alias
is an atom, and is used
+ in the same way as ram_copies
etc. The default alias is rocksdb_copies
.
+
+remove_aliases(Aliases) -> any()
++
repair_continuation(Cont, Ms) -> any()
++
select(Cont) -> any()
++
select(Alias, Tab, Ms) -> any()
++
select(Alias, IxTab, Ms, Limit) -> any()
++
semantics(Alias, X2) -> any()
++
sender_handle_info(Msg, Alias, Tab, ReceiverPid, Cont) -> any()
++
sender_init(Alias, Tab, RemoteStorage, Pid) -> any()
++
show_table(Tab) -> any()
++
A debug function that shows the rocksdb table content
+ +show_table(Tab, Limit) -> any()
++
slot(Alias, Tab, Pos) -> any()
++
start_proc(Alias, Tab, Type, ProcName, Props, RdbOpts) -> any()
++
sync_close_table(Alias, Tab) -> any()
++
terminate(Reason, St) -> any()
++
tmp_suffixes() -> any()
++
update_counter(Alias, Tab, C, Val) -> any()
++
validate_key(Alias, Tab, RecName, Arity, Type, Key) -> any()
++
validate_record(Alias, Tab, RecName, Arity, Type, Obj) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_admin.html b/doc/mnesia_rocksdb_admin.html new file mode 100644 index 0000000..82dd924 --- /dev/null +++ b/doc/mnesia_rocksdb_admin.html @@ -0,0 +1,294 @@ + + + + +Behaviours: gen_server.
+ +alias() = atom()
+ + +backend() = #{db_ref := db_ref(), cf_info := #{table() := cf()}}
+ + +cf() = mrdb:db_ref()
+ + +db_ref() = rocksdb:db_handle()
+ + +gen_server_noreply() = {noreply, st()} | {stop, reason(), st()}
+ + +gen_server_reply() = {reply, reply(), st()} | {stop, reason(), reply(), st()}
+ + +properties() = [{atom(), any()}]
+ + +reason() = any()
+ + +reply() = any()
+ + +req() = {create_table, table(), properties()} | {delete_table, table()} | {load_table, table(), properties()} | {related_resources, table()} | {get_ref, table()} | {add_aliases, [alias()]} | {write_table_property, tabname(), tuple()} | {remove_aliases, [alias()]} | {migrate, [{tabname(), map()}], rpt()} | {prep_close, table()} | {close_table, table()} | {clear_table, table() | cf()}
+ + +rpt() = undefined | map()
+ + +st() = #st{backends = #{alias() => backend()}, standalone = #{{alias(), table()} := cf()}, default_opts = [{atom(), term()}]}
+ + +table() = tabname() | {admin, alias()} | {tabname(), index, any()} | {tabname(), retainer, any()}
+ + +tabname() = atom()
+ + +add_aliases(Aliases) -> any()
++
clear_table(Name) -> any()
++
close_table(Alias, Name) -> any()
++
code_change(FromVsn, St, Extra) -> any()
++
create_table(Alias, Name, Props) -> any()
++
ensure_started() -> ok
+
get_cached_env(Key, Default) -> any()
++
get_ref(Name) -> any()
++
get_ref(Name, Default) -> any()
++
handle_call(Req::{alias(), req()}, From::any(), St::st()) -> gen_server_reply()
+
handle_cast(Msg::any(), St::st()) -> gen_server_noreply()
+
handle_info(Msg::any(), St::st()) -> gen_server_noreply()
+
init(X1) -> any()
++
load_table(Alias, Name, Props) -> any()
++
meta() -> any()
++
migrate_standalone(Alias, Tabs) -> any()
++
migrate_standalone(Alias, Tabs, Rpt0) -> any()
++
prep_close(Alias, Tab) -> any()
++
read_info(TRec) -> any()
++
read_info(Alias, Tab) -> any()
++
read_info(Alias, Tab, K, Default) -> any()
++
related_resources(Alias, Name) -> any()
++
remove_aliases(Aliases) -> any()
++
request_ref(Alias, Name) -> any()
++
set_and_cache_env(Key, Value) -> any()
++
start_link() -> any()
++
terminate(X1, St) -> any()
++
write_info(Alias, Tab, K, V) -> any()
++
write_table_property(Alias, Tab, Prop) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_app.html b/doc/mnesia_rocksdb_app.html new file mode 100644 index 0000000..bd763b9 --- /dev/null +++ b/doc/mnesia_rocksdb_app.html @@ -0,0 +1,40 @@ + + + + +Behaviours: application.
+ +start/2 | |
stop/1 |
start(StartType, StartArgs) -> any()
++
stop(State) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_lib.html b/doc/mnesia_rocksdb_lib.html new file mode 100644 index 0000000..5f6a873 --- /dev/null +++ b/doc/mnesia_rocksdb_lib.html @@ -0,0 +1,181 @@ + + + + +check_encoding(Encoding, Attributes) -> any()
++
create_mountpoint(Tab) -> any()
++
data_mountpoint(Tab) -> any()
++
decode(Val, X2) -> any()
++
decode_key(CodedKey::binary()) -> any()
+
decode_key(CodedKey, Enc) -> any()
++
decode_val(CodedVal::binary()) -> any()
+
decode_val(CodedVal, K, Ref) -> any()
++
default_encoding(X1, Type, As) -> any()
++
delete(Ref, K, Opts) -> any()
++
encode(Value, X2) -> any()
++
encode_key(Key::any()) -> binary()
+
encode_key(Key, X2) -> any()
++
encode_val(Val::any()) -> binary()
+
encode_val(Val, Enc) -> any()
++
keypos(Tab) -> any()
++
open_rocksdb(MPd, RdbOpts, CFs) -> any()
++
put(Ref, K, V, Opts) -> any()
++
tabname(Tab) -> any()
++
valid_key_type(X1, Key) -> any()
++
valid_obj_type(X1, Obj) -> any()
++
write(X1, L, Opts) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_params.html b/doc/mnesia_rocksdb_params.html new file mode 100644 index 0000000..087b8ae --- /dev/null +++ b/doc/mnesia_rocksdb_params.html @@ -0,0 +1,96 @@ + + + + +Behaviours: gen_server.
+ +code_change/3 | |
delete/1 | |
handle_call/3 | |
handle_cast/2 | |
handle_info/2 | |
init/1 | |
lookup/2 | |
start_link/0 | |
store/2 | |
terminate/2 |
code_change(X1, S, X3) -> any()
++
delete(Tab) -> any()
++
handle_call(X1, X2, S) -> any()
++
handle_cast(X1, S) -> any()
++
handle_info(X1, S) -> any()
++
init(X1) -> any()
++
lookup(Tab, Default) -> any()
++
start_link() -> any()
++
store(Tab, Params) -> any()
++
terminate(X1, X2) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_sup.html b/doc/mnesia_rocksdb_sup.html new file mode 100644 index 0000000..4a4d957 --- /dev/null +++ b/doc/mnesia_rocksdb_sup.html @@ -0,0 +1,40 @@ + + + + +Behaviours: supervisor.
+ +init/1 | |
start_link/0 |
init(X1) -> any()
++
start_link() -> any()
++
Generated by EDoc
+ + diff --git a/doc/mnesia_rocksdb_tuning.html b/doc/mnesia_rocksdb_tuning.html new file mode 100644 index 0000000..1f31903 --- /dev/null +++ b/doc/mnesia_rocksdb_tuning.html @@ -0,0 +1,151 @@ + + + + +cache(X1) -> any()
++
calc_sizes() -> any()
++
calc_sizes(D) -> any()
++
count_rdb_tabs() -> any()
++
count_rdb_tabs(Db) -> any()
++
default(X1) -> any()
++
describe_env() -> any()
++
get_avail_ram() -> any()
++
get_maxfiles() -> any()
++
get_maxfiles(X1) -> any()
++
ideal_max_files() -> any()
++
ideal_max_files(D) -> any()
++
max_files(X1) -> any()
++
rdb_indexes() -> any()
++
rdb_indexes(Db) -> any()
++
rdb_tabs() -> any()
++
rdb_tabs(Db) -> any()
++
write_buffer(X1) -> any()
++
Generated by EDoc
+ + diff --git a/doc/modules-frame.html b/doc/modules-frame.html new file mode 100644 index 0000000..bebffb0 --- /dev/null +++ b/doc/modules-frame.html @@ -0,0 +1,24 @@ + + + +Mid-level access API for Mnesia-managed rocksdb tables
+ +This module implements access functions for the mnesia_rocksdb +backend plugin. The functions are designed to also support +direct access to rocksdb with little overhead. Such direct +access will maintain existing indexes, but not support +replication.
+ +Each table has a metadata structure stored as a persistent +term for fast access. The structure of the metadata is as +follows:
+ +#{ name := <Logical table name> + , db_ref := <Rocksdb database Ref> + , cf_handle := <Rocksdb column family handle> + , activity := Ongoing batch or transaction, if any (map()) + , attr_pos := #{AttrName := Pos} + , mode := <Set to 'mnesia' for mnesia access flows> + , properties := <Mnesia table props in map format> + , type := column_family | standalone + }+ +
Helper functions like as_batch(Ref, fun(R) -> ... end)
and
+ with_iterator(Ref, fun(I) -> ... end)
add some extra
+ convenience on top of the rocksdb
API.
mrdb
updates will
+ not be replicated.
+activity() = tx_activity() | batch_activity()
+ + +activity_type() = mrdb_activity_type() | mnesia_activity_type()
+ + +admin_tab() = {admin, alias()}
+ + +alias() = atom()
+ + +attr_pos() = #{atom() := pos()}
+ + +batch_activity() = #{type := batch, handle := batch_handle()}
+ + +batch_handle() = rocksdb:batch_handle()
+ + +cf_handle() = rocksdb:cf_handle()
+ + +db_handle() = rocksdb:db_handle()
+ + +db_ref() = #{name => table(), alias => atom(), vsn => non_neg_integer(), db_ref := db_handle(), cf_handle := cf_handle(), semantics := semantics(), encoding := encoding(), attr_pos := attr_pos(), type := column_family | standalone, status := open | closed | pre_existing, properties := properties(), mode => mnesia, ix_vals_f => fun((tuple()) -> [any()]), activity => activity(), term() => term()}
+ + +encoding() = raw | sext | term | {key_encoding(), val_encoding()}
+ + +error() = {error, any()}
+ + +index() = {tab_name(), index, any()}
+ + +index_position() = atom() | pos()
+ + +inner() = non_neg_integer()
+ + +iterator_action() = first | last | next | prev | binary() | {seek, binary()} | {seek_for_prev, binary()}
+ + +itr_handle() = rocksdb:itr_handle()
+ + +key() = any()
+ + +key_encoding() = raw | sext | term
+ + +mnesia_activity_type() = transaction | sync_transaction | async_dirty | sync_dirty
+ + +mrdb_activity_type() = tx | {tx, tx_options()} | batch
+ + +mrdb_iterator() = #mrdb_iter{i = itr_handle(), ref = db_ref()}
+ + +obj() = tuple()
+ + +outer() = non_neg_integer()
+ + +pos() = non_neg_integer()
+ + +properties() = #{record_name := atom(), attributes := [atom()], index := [{pos(), bag | ordered}]}
+ + +read_options() = [{verify_checksums, boolean()} | {fill_cache, boolean()} | {iterate_upper_bound, binary()} | {iterate_lower_bound, binary()} | {tailing, boolean()} | {total_order_seek, boolean()} | {prefix_same_as_start, boolean()} | {snapshot, snapshot_handle()}]
+ + +ref_or_tab() = table() | db_ref()
+ + +retainer() = {tab_name(), retainer, any()}
+ + +retries() = outer() | {inner(), outer()}
+ + +semantics() = bag | set
+ + +snapshot_handle() = rocksdb:snapshot_handle()
+ + +tab_name() = atom()
+ + +table() = atom() | admin_tab() | index() | retainer()
+ + +tx_activity() = #{type := tx, handle := tx_handle(), attempt := undefined | retries()}
+ + +tx_handle() = rocksdb:transaction_handle()
+ + +tx_options() = #{retries => retries(), no_snapshot => boolean()}
+ + +val_encoding() = {value | object, term | raw} | raw
+ + +write_options() = [{sync, boolean()} | {disable_wal, boolean()} | {ignore_missing_column_families, boolean()} | {no_slowdown, boolean()} | {low_pri, boolean()}]
+ + +abort(Reason) -> any()
++
Aborts an ongoing activity/2
activity(Type::activity_type(), Alias::alias(), F::fun(() -> Res)) -> Res
+
Run an activity (similar to //mnesia/mnesia:activity/2
).
transaction
- An optimistic rocksdb
transaction{tx, TxOpts}
- A rocksdb
transaction with sligth modificationsbatch
- A rocksdb
batch operationBy default, transactions are combined with a snapshot with 1 retry.
+ The snapshot ensures that writes from concurrent transactions don't leak into the transaction context.
+ A transaction will be retried if it detects that the commit set conflicts with recent changes.
+ A mutex is used to ensure that only one of potentially conflicting mrdb
transactions is run at a time.
+The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with
+the commit set. It will then be re-run again, until the retry count is exhausted.
For finer-grained retries, it's possible to set retries => {Inner, Outer}
. Setting the retries to a
+ single number, Retries
, is analogous to {0, Retries}`. Each outer retry requests a
mutex lock' by
+ waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries
+ as specified by Inner
. If these fail, the activity again goes to the FIFO queue (ending up last
+ in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts
+ with retry_limit
. Note that transactions, being optimistic, do not request a lock on the first
+attempt, but only on outer retries (the first retry is always an outer retry).
Valid TxOpts
are #{no_snapshot => boolean(), retries => retries()}
.
tx | transaction | sync_transaction
are synonyms, and
+ batch | async_dirty | sync_dirty
are synonyms.
+
+alias_of(Tab::ref_or_tab()) -> alias()
+
Returns the alias of a given table or table reference.
+ +as_batch(Tab::ref_or_tab(), F::fun((db_ref()) -> Res)) -> Res
+
Creates a rocksdb
batch context and executes the fun F
in it.
as_batch(Tab, F, Opts) -> any()
++
as as_batch/2
, but with the ability to pass Opts
to rocksdb:write_batch/2
batch_write(Tab, L) -> any()
++
batch_write(Tab, L, Opts) -> any()
++
clear_table(Tab) -> any()
++
current_context() -> any()
++
delete(Tab::ref_or_tab(), Key::key()) -> ok
+
delete(Tab::ref_or_tab(), Key::key(), Opts::write_options()) -> ok
+
delete_object(Tab, Obj) -> any()
++
delete_object(Tab, Obj, Opts) -> any()
++
ensure_ref(R::ref_or_tab()) -> db_ref()
+
ensure_ref(Ref, R) -> any()
++
first(Tab::ref_or_tab()) -> key() | '$end_of_table'
+
first(Tab::ref_or_tab(), Opts::read_options()) -> key() | '$end_of_table'
+
fold(Tab, Fun, Acc) -> any()
++
fold(Tab, Fun, Acc, MatchSpec) -> any()
++
fold(Tab, Fun, Acc, MatchSpec, Limit) -> any()
++
get_batch(X1) -> any()
++
index_read(Tab, Val, Ix) -> any()
++
insert(Tab::ref_or_tab(), Obj::obj()) -> ok
+
insert(Tab::ref_or_tab(), Obj0::obj(), Opts::write_options()) -> ok
+
iterator(Tab::ref_or_tab()) -> {ok, mrdb_iterator()} | {error, term()}
+
iterator(Tab::ref_or_tab(), Opts::read_options()) -> {ok, mrdb_iterator()} | {error, term()}
+
iterator_close(Mrdb_iter::mrdb_iterator()) -> ok
+
iterator_move(Mrdb_iter::mrdb_iterator(), Dir::iterator_action()) -> {ok, tuple()} | {error, any()}
+
last(Tab::ref_or_tab()) -> key() | '$end_of_table'
+
last(Tab::ref_or_tab(), Opts::read_options()) -> key() | '$end_of_table'
+
match_delete(Tab, Pat) -> any()
++
new_tx(Tab::ref_or_tab(), Opts::write_options()) -> db_ref()
+
next(Tab::ref_or_tab(), K::key()) -> key() | '$end_of_table'
+
next(Tab::ref_or_tab(), K::key(), Opts::read_options()) -> key() | '$end_of_table'
+
prev(Tab::ref_or_tab(), K::key()) -> key() | '$end_of_table'
+
prev(Tab::ref_or_tab(), K::key(), Opts::read_options()) -> key() | '$end_of_table'
+
rdb_delete(R, K) -> any()
++
rdb_delete(R, K, Opts) -> any()
++
rdb_fold(Tab, Fun, Acc, Prefix) -> any()
++
rdb_fold(Tab, Fun, Acc, Prefix, Limit) -> any()
++
rdb_get(R, K) -> any()
++
rdb_get(R, K, Opts) -> any()
++
rdb_iterator(R) -> any()
++
rdb_iterator(R, Opts) -> any()
++
rdb_iterator_move(I, Dir) -> any()
++
rdb_put(R, K, V) -> any()
++
rdb_put(R, K, V, Opts) -> any()
++
read(Tab, Key) -> any()
++
read(Tab, Key, Opts) -> any()
++
read_info(Tab) -> any()
++
read_info(Tab, K) -> any()
++
release_snapshot(SHandle::snapshot_handle()) -> ok | error()
+
release a snapshot created by snapshot/1
.
select(Cont) -> any()
++
select(Tab, Pat) -> any()
++
select(Tab, Pat, Limit) -> any()
++
snapshot(Name::alias() | ref_or_tab()) -> {ok, snapshot_handle()} | error()
+
Create a snapshot of the database instance associated with the +table reference, table name or alias.
+ + Snapshots provide consistent read-only views over the entire state of the key-value store. + +tx_commit(TxH::tx_handle() | db_ref()) -> ok
+
tx_ref(Tab::ref_or_tab() | db_ref() | db_ref(), TxH::tx_handle()) -> db_ref()
+
update_counter(Tab, C, Val) -> any()
++
update_counter(Tab, C, Val, Opts) -> any()
++
with_iterator(Tab::ref_or_tab(), Fun::fun((mrdb_iterator()) -> Res)) -> Res
+
with_iterator(Tab::ref_or_tab(), Fun::fun((mrdb_iterator()) -> Res), Opts::read_options()) -> Res
+
with_rdb_iterator(Tab::ref_or_tab(), Fun::fun((itr_handle()) -> Res)) -> Res
+
with_rdb_iterator(Tab::ref_or_tab(), Fun::fun((itr_handle()) -> Res), Opts::read_options()) -> Res
+
write_info(Tab, K, V) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mrdb_index.html b/doc/mrdb_index.html new file mode 100644 index 0000000..7603753 --- /dev/null +++ b/doc/mrdb_index.html @@ -0,0 +1,71 @@ + + + + +index_value() = any()
+ + +iterator_action() = mrdb:iterator_action()
+ + +ix_iterator() = #mrdb_ix_iter{i = mrdb:iterator(), type = set | bag, sub = mrdb:ref() | pid()}
+ + +object() = tuple()
+ + +iterator/2 | |
iterator_close/1 | |
iterator_move/2 | |
with_iterator/3 |
iterator(Tab::mrdb:ref_or_tab(), IxPos::mrdb:index_position()) -> {ok, ix_iterator()} | {error, term()}
+
iterator_close(Mrdb_ix_iter::ix_iterator()) -> ok
+
iterator_move(Mrdb_ix_iter::ix_iterator(), Dir::iterator_action()) -> {ok, index_value(), object()} | {error, term()}
+
with_iterator(Tab::mrdb:ref_or_tab(), IxPos::mrdb:index_position(), Fun::fun((ix_iterator()) -> Res)) -> Res
+
Generated by EDoc
+ + diff --git a/doc/mrdb_mutex.html b/doc/mrdb_mutex.html new file mode 100644 index 0000000..cb6e597 --- /dev/null +++ b/doc/mrdb_mutex.html @@ -0,0 +1,32 @@ + + + + +do/2 |
do(Rsrc, F) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mrdb_mutex_serializer.html b/doc/mrdb_mutex_serializer.html new file mode 100644 index 0000000..8e07aa4 --- /dev/null +++ b/doc/mrdb_mutex_serializer.html @@ -0,0 +1,88 @@ + + + + +code_change/3 | |
done/2 | |
handle_call/3 | |
handle_cast/2 | |
handle_info/2 | |
init/1 | |
start_link/0 | |
terminate/2 | |
wait/1 |
code_change(FromVsn, St, Extra) -> any()
++
done(Rsrc, Ref) -> any()
++
handle_call(X1, From, St) -> any()
++
handle_cast(X1, St) -> any()
++
handle_info(X1, St) -> any()
++
init(X1) -> any()
++
start_link() -> any()
++
terminate(X1, X2) -> any()
++
wait(Rsrc) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mrdb_select.html b/doc/mrdb_select.html new file mode 100644 index 0000000..9acf856 --- /dev/null +++ b/doc/mrdb_select.html @@ -0,0 +1,67 @@ + + + + +continuation_info/2 | |
fold/5 | |
rdb_fold/5 | |
select/1 | |
select/3 | |
select/4 |
continuation_info(Item, C) -> any()
++
fold(Ref, Fun, Acc, MS, Limit) -> any()
++
rdb_fold(Ref, Fun, Acc, Prefix, Limit) -> any()
++
select(Cont) -> any()
++
select(Ref, MS, Limit) -> any()
++
select(Ref, MS, AccKeys, Limit) -> any()
++
Generated by EDoc
+ + diff --git a/doc/mrdb_stats.html b/doc/mrdb_stats.html new file mode 100644 index 0000000..b4220fd --- /dev/null +++ b/doc/mrdb_stats.html @@ -0,0 +1,87 @@ + + + + +Statistics API for the mnesia_rocksdb plugin
+ + Some counters are maintained for each active alias. Currently, the following + counters are supported: + * inner_retries + * outer_retries + +alias() = mnesia_rocksdb:alias()
+ + +counter() = atom()
+ + +counters() = #{counter() := integer()}
+ + +db_ref() = mrdb:db_ref()
+ + +increment() = integer()
+ + +get/1 | Fetches all known counters for Alias , in the form of a map,
+ #{Counter => Value} . |
get/2 | Fetches the integer value of the known counter Ctr for Alias . |
incr/3 | Increment Ctr counter for Alias` with increment `N . |
new/0 |
get(Alias::alias() | db_ref()) -> counters()
+
Fetches all known counters for Alias
, in the form of a map,
+ #{Counter => Value}
.
Fetches the integer value of the known counter Ctr
for Alias
.
incr(Alias::alias() | db_ref(), Ctr::counter(), N::increment()) -> ok
+
Increment Ctr
counter for Alias` with increment `N
.
db_ref()
map,
+ corresponding to mrdb:get_ref({admin, Alias})
.
+
+new() -> any()
++
Generated by EDoc
+ + diff --git a/doc/overview-summary.html b/doc/overview-summary.html new file mode 100644 index 0000000..328774c --- /dev/null +++ b/doc/overview-summary.html @@ -0,0 +1,254 @@ + + + + +Copyright © 2013-21 Klarna AB
+Authors: Ulf Wiger (ulf@wiger.net).
+ + +The Mnesia DBMS, part of Erlang/OTP, supports 'backend plugins', making
+it possible to utilize more capable key-value stores than the dets
+module (limited to 2 GB per table). Unfortunately, this support is
+undocumented. Below, some informal documentation for the plugin system
+is provided.
Call mnesia_rocksdb:register()
immediately after
+starting mnesia.
Put {rocksdb_copies, [node()]}
into the table definitions of
+tables you want to be in RocksDB.
RocksDB tables support efficient selects on prefix keys.
+ +The backend uses the sext
module (see
+https://github.com/uwiger/sext) for mapping between Erlang terms and the
+binary data stored in the tables. This provides two useful properties:
{x, '_'}
is a prefix for keys {x, a}
,
+ {x, b}
and so on.This means that a prefix key identifies the start of the sequence of +entries whose keys match the prefix. The backend uses this to optimize +selects on prefix keys.
+ +### Customization
+ +RocksDB supports a number of customization options. These can be specified
+by providing a {Key, Value}
list named rocksdb_opts
under user_properties
,
+for example:
mnesia:create_table(foo, [{rocksdb_copies, [node()]}, + ... + {user_properties, + [{rocksdb_opts, [{max_open_files, 1024}]}] + }])+ +
Consult the RocksDB documentation +for information on configuration parameters. Also see the section below on handling write errors.
+ +The default configuration for tables inmnesia_rocksdb
is:
+default_open_opts() -> + [ {create_if_missing, true} + , {cache_size, + list_to_integer(get_env_default("ROCKSDB_CACHE_SIZE", "32212254"))} + , {block_size, 1024} + , {max_open_files, 100} + , {write_buffer_size, + list_to_integer(get_env_default( + "ROCKSDB_WRITE_BUFFER_SIZE", "4194304"))} + , {compression, + list_to_atom(get_env_default("ROCKSDB_COMPRESSION", "true"))} + , {use_bloomfilter, true} + ].+ +
It is also possible, for larger databases, to produce a tuning parameter file.
+This is experimental, and mostly copied from mnesia_leveldb
. Consult the
+source code in mnesia_rocksdb_tuning.erl
and mnesia_rocksdb_params.erl
.
+Contributions are welcome.
Avoid placing bag
tables in RocksDB. Although they work, each write
+requires additional reads, causing substantial runtime overheads. There
+are better ways to represent and process bag data (see above about
+prefix keys).
The mnesia:table_info(T, size)
call always returns zero for RocksDB
+tables. RocksDB itself does not track the number of elements in a table, and
+although it is possible to make the mnesia_rocksdb
backend maintain a size
+counter, it incurs a high runtime overhead for writes and deletes since it
+forces them to first do a read to check the existence of the key. If you
+depend on having an up to date size count at all times, you need to maintain
+it yourself. If you only need the size occasionally, you may traverse the
+table to count the elements.
Mnesia was initially designed to be a RAM-only DBMS, and Erlang's
+ets
tables were developed for this purpose. In order to support
+persistence, e.g. for configuration data, a disk-based version of ets
+(called dets
) was created. The dets
API mimicks the ets
API,
+and dets
is quite convenient and fast for (nowadays) small datasets.
+However, using a 32-bit bucket system, it is limited to 2GB of data.
+It also doesn't support ordered sets. When used in Mnesia, dets-based
+tables are called disc_only_copies
.
To circumvent these limitations, another table type, called disc_copies
+was added. This is a combination of ets
and disk_log
, where Mnesia
+periodically snapshots the ets
data to a log file on disk, and meanwhile
+maintains a log of updates, which can be applied at startup. These tables
+are quite performant (especially on read access), but all data is kept in
+RAM, which can become a serious limitation.
A backend plugin system was proposed by Ulf Wiger in 2016, and further +developed with Klarna's support, to finally become included in OTP 19. +Klarna uses a LevelDb backend, but Aeternity, in 2017, instead chose +to implement a Rocksdb backend plugin.
+ +As backend plugins were added on a long-since legacy-stable Mnesia,
+they had to conform to the existing code structure. For this reason,
+the plugin callbacks hook into the already present low-level access
+API in the mnesia_lib
module. As a consequence, backend plugins have
+the same access semantics and granularity as ets
and dets
. This
+isn't much of a disadvantage for key-value stores like LevelDb and RocksDB,
+but a more serious issue is that the update part of this API is called
+on after the point of no return. That is, Mnesia does not expect
+these updates to fail, and has no recourse if they do. As an aside,
+this could also happen if a disc_only_copies
table exceeds the 2 GB
+limit (mnesia will not check it, and dets
will not complain, but simply
+drop the update.)
When adding support for backend plugins, index plugins were also added. Unfortunately, they remain undocumented.
+ +An index plugin can be added in one of two ways:
+ +{index_plugins, [{Name, Module, Function}]}
options.mnesia_schema:add_index_plugin(Name, Module, Function)
Name
must be an atom wrapped as a 1-tuple, e.g. {words}
.
The plugin callback is called as Module:Function(Table, Pos, Obj)
, where Pos=={words}
in
+our example. It returns a list of index terms.
Example
+ +Given the following index plugin implementation:
+ +-module(words). +-export([words_f/3]). + +words_f(_,_,Obj) when is_tuple(Obj) -> + words_(tuple_to_list(Obj)). + +words_(Str) when is_binary(Str) -> + string:lexemes(Str, [$\s, $\n, [$\r,$\n]]); +words_(L) when is_list(L) -> + lists:flatmap(fun words_/1, L); +words_(_) -> + [].+ +
We can register the plugin and use it in table definitions:
+ +Eshell V12.1.3 (abort with ^G) +1> mnesia:start(). +ok +2> mnesia_schema:add_index_plugin({words}, words, words_f). +{atomic,ok} +3> mnesia:create_table(i, [{index, [{words}]}]). +{atomic,ok}+ +
Note that in this case, we had neither a backend plugin, nor even a persistent schema. +Index plugins can be used with all table types. The registered indexing function (arity 3) must exist +as an exported function along the node's code path.
+ +To see what happens when we insert an object, we can turn on call trace.
+ +4> dbg:tracer(). +{ok,<0.108.0>} +5> dbg:tp(words, x). +{ok,[{matched,nonode@nohost,3},{saved,x}]} +6> dbg:p(all,[c]). +{ok,[{matched,nonode@nohost,60}]} +7> mnesia:dirty_write({i,<<"one two">>, [<<"three">>, <<"four">>]}). +(<0.84.0>) call words:words_f(i,{words},{i,<<"one two">>,[<<"three">>,<<"four">>]}) +(<0.84.0>) returned from words:words_f/3 -> [<<"one">>,<<"two">>,<<"three">>, + <<"four">>] +(<0.84.0>) call words:words_f(i,{words},{i,<<"one two">>,[<<"three">>,<<"four">>]}) +(<0.84.0>) returned from words:words_f/3 -> [<<"one">>,<<"two">>,<<"three">>, + <<"four">>] +ok +8> dbg:ctp('_'), dbg:stop(). +ok +9> mnesia:dirty_index_read(i, <<"one">>, {words}). +[{i,<<"one two">>,[<<"three">>,<<"four">>]}]+ +
(The fact that the indexing function is called twice, seems like a performance bug.)
+ +We can observe that the indexing callback is able to operate on the whole object. +It needs to be side-effect free and efficient, since it will be called at least once for each update +(if an old object exists in the table, the indexing function will be called on it too, before it is +replaced by the new object.)
+ +Generated by EDoc
+ + diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index aa4662a..b2be125 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -407,12 +407,14 @@ try_load_admin_db(Alias, AliasOpts, #st{ backends = Bs %% We need to store the persistent ref explicitly here, %% since mnesia knows nothing of our admin table. AdminTab = {admin, Alias}, + Stats = mrdb_stats:new(), CfI = update_cf_info(AdminTab, #{ status => open , name => AdminTab , vsn => ?VSN , encoding => {sext,{value,term}} , attr_pos => #{key => 1, value => 2} + , stats => Stats , mountpoint => MP , properties => #{ attributes => [key, val] diff --git a/src/mrdb.erl b/src/mrdb.erl index 4b982a6..99a2a81 100644 --- a/src/mrdb.erl +++ b/src/mrdb.erl @@ -117,7 +117,9 @@ | index() | retainer(). --type retries() :: non_neg_integer(). +-type inner() :: non_neg_integer(). +-type outer() :: non_neg_integer(). +-type retries() :: outer() | {inner(), outer()}. %% activity type 'ets' makes no sense in this context -type mnesia_activity_type() :: transaction @@ -143,7 +145,7 @@ -type tx_activity() :: #{ type := 'tx' , handle := tx_handle() - , attempt := non_neg_integer() }. + , attempt := 'undefined' | retries() }. -type batch_activity() :: #{ type := 'batch' , handle := batch_handle() }. -type activity() :: tx_activity() | batch_activity(). @@ -256,6 +258,14 @@ release_snapshot(SHandle) -> %% The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with %% the commit set. It will then be re-run again, until the retry count is exhausted. %% +%% For finer-grained retries, it's possible to set `retries => {Inner, Outer}'. Setting the retries to a +%% single number, `Retries', is analogous to `{0, Retries}`. Each outer retry requests a 'mutex lock' by +%% waiting in a FIFO queue. Once it receives the lock, it will try the activity once + as many retries +%% as specified by `Inner'. If these fail, the activity again goes to the FIFO queue (ending up last +%% in line) if there are outer retries remaining. When all retries are exhaused, the activity aborts +%% with `retry_limit'. Note that transactions, being optimistic, do not request a lock on the first +%% attempt, but only on outer retries (the first retry is always an outer retry). +%% %% Valid `TxOpts' are `#{no_snapshot => boolean(), retries => retries()}'. %% %% To simplify code adaptation, `tx | transaction | sync_transaction' are synonyms, and @@ -277,32 +287,66 @@ activity(Type, Alias, F) -> , alias => Alias , db_ref => DbRef } end, - do_activity(F, Alias, Ctxt, false). + do_activity(F, Alias, Ctxt). -do_activity(F, Alias, Ctxt, WithLock) -> - try run_f(F, Ctxt, WithLock, Alias) of +do_activity(F, Alias, Ctxt) -> + try try_f(F, Ctxt) + catch + throw:{?MODULE, busy} -> + retry_activity(F, Alias, Ctxt) + end. + +try_f(F, Ctxt) -> + try run_f(F, Ctxt) of Res -> - try commit_and_pop(Res) - catch - throw:{?MODULE, busy} -> - do_activity(F, Alias, Ctxt, true) - end + commit_and_pop(Res) catch Cat:Err -> abort_and_pop(Cat, Err) end. --spec run_f(_, #{'activity':=#{'handle':=_, 'type':='batch' | 'tx', 'attempt'=>1, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, 'alias':=_, 'db_ref':=_, 'no_snapshot'=>boolean(), 'retries'=>non_neg_integer(), _=>_}, boolean(), _) -> any(). -run_f(F, Ctxt, false, _) -> +run_f(F, Ctxt) -> push_ctxt(Ctxt), - F(); -run_f(F, Ctxt, true, Alias) -> - push_ctxt(incr_attempt(Ctxt)), - mrdb_mutex:do(Alias, F). + F(). -incr_attempt(#{ activity := #{type := tx, attempt := A} = Act, db_ref := DbRef } = C) -> +incr_attempt(0, {_,O}) when O > 0 -> + {outer, {0,1}}; +incr_attempt({I,O}, {Ri,Ro}) when is_integer(I), is_integer(O), + is_integer(Ri), is_integer(Ro) -> + if I < Ri -> {inner, {I+1, O}}; + O < Ro -> {outer, {0, O+1}}; + true -> + error + end; +incr_attempt(_, _) -> + error. + +retry_activity(F, Alias, #{activity := #{ type := Type + , attempt := A + , retries := R} = Act} = Ctxt) -> + case incr_attempt(A, R) of + {RetryCtxt, A1} -> + Act1 = Act#{attempt := A1}, + Ctxt1 = Ctxt#{activity := Act1}, + try retry_activity_(RetryCtxt, F, Alias, restart_ctxt(Ctxt1)) + catch + throw:{?MODULE, busy} -> + retry_activity(F, Alias, Ctxt1) + end; + error -> + return_abort(Type, error, retry_limit) + end. + +retry_activity_(inner, F, Alias, Ctxt) -> + mrdb_stats:incr(Alias, inner_retries, 1), + try_f(F, Ctxt); +retry_activity_(outer, F, Alias, Ctxt) -> + mrdb_stats:incr(Alias, outer_retries, 1), + mrdb_mutex:do(Alias, fun() -> try_f(F, Ctxt) end). + +restart_ctxt(#{ activity := #{type := tx} = Act, db_ref := DbRef } = C) -> {ok, TxH} = rdb_transaction(DbRef, []), - Act1 = Act#{attempt := A+1, handle := TxH}, + Act1 = Act#{handle := TxH}, C1 = C#{ activity := Act1 }, case maps:is_key(snapshot, C) of true -> @@ -375,10 +419,14 @@ check_tx_opts(Opts) -> end. check_retries(#{retries := Retries} = Opts) -> - if is_integer(Retries), Retries >= 0 -> - Opts; - true -> - error({invalid_tx_option, {retries, Retries}}) + case Retries of + _ when is_integer(Retries), Retries >= 0 -> + Opts#{retries := {0, Retries}}; + {Inner, Outer} when is_integer(Inner), is_integer(Outer), + Inner >= 0, Outer >= 0 -> + Opts; + _ -> + error({invalid_tx_option, {retries, Retries}}) end. check_nosnap(#{no_snapshot := NoSnap} = Opts) -> @@ -393,7 +441,7 @@ create_tx(Opts, DbRef) -> {ok, TxH} = rdb_transaction(DbRef, []), Opts#{activity => maps:merge(Opts, #{ type => tx , handle => TxH - , attempt => 1})}. + , attempt => 0 })}. maybe_snapshot(#{no_snapshot := NoSnap} = Opts, DbRef) -> case NoSnap of @@ -413,8 +461,7 @@ commit_and_pop(Res) -> Res; {error, {error, "Resource busy" ++ _ = Busy}} -> case A of - #{retries := Retries, attempt := Att} - when Att =< Retries -> + #{retries := {I,O}} when I > 0; O > 0 -> throw({?MODULE, busy}); _ -> error({error, Busy}) @@ -530,7 +577,7 @@ new_tx(#{activity := _}, _) -> new_tx(Tab, Opts) -> #{db_ref := DbRef} = R = ensure_ref(Tab), {ok, TxH} = rdb_transaction(DbRef, write_opts(R, Opts)), - R#{activity => #{type => tx, handle => TxH, attempt => 1}}. + R#{activity => #{type => tx, handle => TxH, attempt => 0}}. -spec tx_ref(ref_or_tab() | db_ref() | db_ref(), tx_handle()) -> db_ref(). tx_ref(Tab, TxH) -> @@ -540,7 +587,7 @@ tx_ref(Tab, TxH) -> #{activity := #{type := tx, handle := OtherTxH}} -> error({tx_handle_conflict, OtherTxH}); R -> - R#{activity => #{type => tx, handle => TxH, attempt => 1}} + R#{activity => #{type => tx, handle => TxH, attempt => 0}} end. -spec tx_commit(tx_handle() | db_ref()) -> ok. diff --git a/src/mrdb_stats.erl b/src/mrdb_stats.erl new file mode 100644 index 0000000..78c5d3d --- /dev/null +++ b/src/mrdb_stats.erl @@ -0,0 +1,74 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% @doc Statistics API for the mnesia_rocksdb plugin +%% +%% Some counters are maintained for each active alias. Currently, the following +%% counters are supported: +%% * inner_retries +%% * outer_retries +%% +-module(mrdb_stats). + +-export([new/0]). + +-export([incr/3, + get/1, + get/2]). + +-type alias() :: mnesia_rocksdb:alias(). +-type db_ref() :: mrdb:db_ref(). +-type counter() :: atom(). +-type increment() :: integer(). + +-type counters() :: #{ counter() := integer() }. + +new() -> + #{ ref => counters:new(map_size(ctr_meta()), [write_concurrency]) + , meta => ctr_meta()}. + +ctr_meta() -> + #{ inner_retries => 1 + , outer_retries => 2 }. + +-spec incr(alias() | db_ref(), counter(), increment()) -> ok. +%% @doc Increment `Ctr' counter for `Alias` with increment `N'. +%% +%% Note that the first argument may also be a `db_ref()' map, +%% corresponding to `mrdb:get_ref({admin, Alias})'. +%% @end +incr(Alias, Ctr, N) when is_atom(Alias) -> + #{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}), + incr_(Ref, Meta, Ctr, N); +incr(#{stats := #{ref := Ref, meta := Meta}}, Ctr, N) -> + incr_(Ref, Meta, Ctr, N). + +-spec get(alias() | db_ref(), counter()) -> integer(). +%% @doc Fetches the integer value of the known counter `Ctr' for `Alias'. +%% @end +get(Alias, Ctr) when is_atom(Alias) -> + #{stats := #{ref := Ref, meta := Meta}} = mrdb:get_ref({admin, Alias}), + get_(Ref, Meta, Ctr); +get(#{stats := #{ref := Ref, meta := Meta}}, Ctr) -> + get_(Ref, Meta, Ctr). + +-spec get(alias() | db_ref()) -> counters(). +%% @doc Fetches all known counters for `Alias', in the form of a map, +%% `#{Counter => Value}'. +%% @end +get(Alias) when is_atom(Alias) -> + get_(mrdb:get_ref({admin, Alias})); +get(Ref) when is_map(Ref) -> + get_(Ref). + +get_(#{stats := #{ref := Ref, meta := Meta}}) -> + lists:foldl( + fun({K, P}, M) -> + M#{K := counters:get(Ref, P)} + end, Meta, maps:to_list(Meta)). + +get_(Ref, Meta, Attr) -> + Ix = maps:get(Attr, Meta), + counters:get(Ref, Ix). + +incr_(Ref, Meta, Attr, N) -> + Ix = maps:get(Attr, Meta), + counters:add(Ref, Ix, N). diff --git a/test/mnesia_rocksdb_SUITE.erl b/test/mnesia_rocksdb_SUITE.erl index 5c4d1eb..3927646 100644 --- a/test/mnesia_rocksdb_SUITE.erl +++ b/test/mnesia_rocksdb_SUITE.erl @@ -24,6 +24,7 @@ , mrdb_abort/1 , mrdb_two_procs/1 , mrdb_two_procs_tx_restart/1 + , mrdb_two_procs_tx_inner_restart/1 , mrdb_two_procs_snap/1 , mrdb_three_procs/1 ]). @@ -53,6 +54,7 @@ groups() -> , mrdb_abort , mrdb_two_procs , mrdb_two_procs_tx_restart + , mrdb_two_procs_tx_inner_restart , mrdb_two_procs_snap , mrdb_three_procs ]} ]. @@ -287,10 +289,17 @@ mrdb_abort(Config) -> mrdb_two_procs(Config) -> tr_ct:with_trace(fun mrdb_two_procs_/1, Config, tr_flags( - {self(), [call, sos, p]}, + {self(), [call, sos, p, 'receive']}, tr_patterns( mrdb, [ {mrdb, insert, 2, x} , {mrdb, read, 2, x} + , {mrdb, retry_activity, 3, x} + , {mrdb, try_f, 2, x} + , {mrdb, incr_attempt, 2, x} + , {mrdb_mutex, do, 2, x} + , {mrdb_mutex_serializer, do, 2, x} + , {?MODULE, wait_for_other, 2, x} + , {?MODULE, go_ahead_other, 1, x} , {mrdb, activity, x}], tr_opts()))). mrdb_two_procs_(Config) -> @@ -314,11 +323,16 @@ mrdb_two_procs_(Config) -> Pre = mrdb:read(R, a), go_ahead_other(POther), await_other_down(POther, MRef, ?LINE), + %% The test proc is still inside the transaction, and POther + %% has terminated/committed. If we now read 'a', we should see + %% the value that POther wrote (no isolation). [{R, a, 17}] = mrdb:read(R, a), ok = mrdb:insert(R, {R, a, 18}) end, - go_ahead_other(1, POther), + go_ahead_other(0, POther), Do0 = get_dict(), + %% Our transaction should fail due to the resource conflict, and because + %% we're not using retries. try mrdb:activity({tx, #{no_snapshot => true, retries => 0}}, rdb, F1) of ok -> error(unexpected) @@ -339,6 +353,7 @@ mrdb_two_procs_tx_restart_(Config) -> R = ?FUNCTION_NAME, Parent = self(), Created = create_tabs([{R, []}], Config), + check_stats(rdb), mrdb:insert(R, {R, a, 1}), Pre = mrdb:read(R, a), F0 = fun() -> @@ -354,7 +369,7 @@ mrdb_two_procs_tx_restart_(Config) -> OtherWrite = [{R, a, 17}], Att = get_attempt(), Expected = case Att of - 1 -> Pre; + 0 -> Pre; _ -> OtherWrite end, Expected = mrdb:read(R, a), @@ -363,14 +378,88 @@ mrdb_two_procs_tx_restart_(Config) -> OtherWrite = mrdb:read(R, a), ok = mrdb:insert(R, {R, a, 18}) end, - go_ahead_other(1, POther), + go_ahead_other(0, POther), Do0 = get_dict(), mrdb:activity({tx, #{no_snapshot => true}}, rdb, F1), dictionary_unchanged(Do0), + check_stats(rdb), [{R, a, 18}] = mrdb:read(R, a), delete_tabs(Created), ok. +mrdb_two_procs_tx_inner_restart(Config) -> + tr_ct:with_trace(fun mrdb_two_procs_tx_inner_restart_/1, Config, + dbg_tr_opts()). + +mrdb_two_procs_tx_inner_restart_(Config) -> + R = ?FUNCTION_NAME, + Parent = self(), + Created = create_tabs([{R, []}], Config), + mrdb:insert(R, {R, a, 1}), + mrdb:insert(R, {R, b, 1}), + PreA = mrdb:read(R, a), + PreB = mrdb:read(R, b), + #{inner_retries := Ri0, outer_retries := Ro0} = check_stats(rdb), + F0 = fun() -> + wait_for_other(Parent, ?LINE), + ok = mrdb:insert(R, {R, a, 17}), + wait_for_other(Parent, ?LINE) + end, + F1 = fun() -> + wait_for_other(Parent, ?LINE), + ok = mrdb:insert(R, {R, b, 147}), + wait_for_other(Parent, ?LINE) + end, + + Spawn = fun(F) -> + Res = spawn_opt( + fun() -> + ok = mrdb:activity(tx, rdb, F) + end, [monitor]), + Res + end, + {P1, MRef1} = Spawn(F0), + {P2, MRef2} = Spawn(F1), + F2 = fun() -> + PostA = [{R, a, 17}], % once F0 (P1) has committed + PostB = [{R, b, 147}], % once F1 (P2) has committed + ARes = mrdb:read(R, a), % We need to read first to establish a pre-image + BRes = mrdb:read(R, b), + Att = get_attempt(), + ct:log("Att = ~p", [Att]), + case Att of + 0 -> % This is the first run (no retry yet) + ARes = PreA, + BRes = PreB, + go_ahead_other(0, P1), % Having our pre-image, now let P1 write + go_ahead_other(0, P1), % Let P1 commit, then await DOWN (normal) + await_other_down(P1, MRef1, ?LINE), + PostA = mrdb:read(R, a); % now, P1's write should leak through here + {0, 1} -> % This is the first (outer) retry + go_ahead_other(0, P2), % Let P2 write + go_ahead_other(0, P2), % Let P2 commit, then await DOWN (normal) + await_other_down(P2, MRef2, ?LINE), + PostA = mrdb:read(R, a), % now we should see writes from both P1 + PostB = mrdb:read(R, b); % ... and P2 + {1, 1} -> + PostA = mrdb:read(R, a), + PostB = mrdb:read(R, b), + ok + end, + mrdb:insert(R, {R, a, 18}), + mrdb:insert(R, {R, b, 18}) + end, + Do0 = get_dict(), + mrdb:activity({tx, #{no_snapshot => true, retries => {1,1}}}, rdb, F2), + check_stats(rdb), + dictionary_unchanged(Do0), + [{R, a, 18}] = mrdb:read(R, a), + [{R, b, 18}] = mrdb:read(R, b), + #{inner_retries := Ri1, outer_retries := Ro1} = check_stats(rdb), + {restarts, {1, 1}} = {restarts, {Ri1 - Ri0, Ro1 - Ro0}}, + delete_tabs(Created), + ok. + % %% For testing purposes, we use side-effects inside the transactions @@ -383,7 +472,7 @@ mrdb_two_procs_tx_restart_(Config) -> %% attempt, and ignore the sync ops on retries. %% -define(IF_FIRST(N, Expr), - if N == 1 -> + if N == 0 -> Expr; true -> ok @@ -413,8 +502,8 @@ mrdb_two_procs_snap(Config) -> go_ahead_other(Att, POther), ARes = mrdb:read(R, a), ARes = case Att of - 1 -> Pre; - 2 -> [{R, a, 17}] + 0 -> Pre; + _ -> [{R, a, 17}] end, await_other_down(POther, MRef, ?LINE), PreB = mrdb:read(R, b), @@ -434,7 +523,7 @@ mrdb_two_procs_snap(Config) -> %% We make sure that P2 commits before finishing the other two, and P3 and the %% main thread sync, so as to maximize the contention for the retry lock. mrdb_three_procs(Config) -> - tr_ct:with_trace(fun mrdb_three_procs_/1, Config, light_tr_opts()). + tr_ct:with_trace(fun mrdb_three_procs_/1, Config, dbg_tr_opts()). mrdb_three_procs_(Config) -> R = ?FUNCTION_NAME, @@ -452,7 +541,7 @@ mrdb_three_procs_(Config) -> spawn_opt(fun() -> D0 = get_dict(), do_when_p_allows( - 1, Parent, ?LINE, + 0, Parent, ?LINE, fun() -> ok = mrdb:activity({tx,#{retries => 0}}, rdb, F1) end), @@ -488,8 +577,8 @@ mrdb_three_procs_(Config) -> fun() -> Att = get_attempt(), ARes = case Att of - 1 -> [A0]; - 2 -> [A1] + 0 -> [A0]; + _ -> [A1] end, %% First, ensure that P2 tx is running go_ahead_other(Att, P2), @@ -519,6 +608,7 @@ tr_opts() -> , {?MODULE, await_other_down, 3, x} , {?MODULE, do_when_p_allows, 4, x} , {?MODULE, allow_p, 3, x} + , {?MODULE, check_stats, 1, x} ]}. light_tr_opts() -> @@ -529,6 +619,22 @@ light_tr_opts() -> , {mrdb, read, 2, x} , {mrdb, activity, x} ], tr_opts())). +dbg_tr_opts() -> + tr_flags( + {self(), [call, sos, p, 'receive']}, + tr_patterns( + mrdb, [ {mrdb, insert, 2, x} + , {mrdb, read, 2, x} + , {mrdb, retry_activity, 3, x} + , {mrdb, try_f, 2, x} + , {mrdb, incr_attempt, 2, x} + , {mrdb, current_context, 0, x} + , {mrdb_mutex, do, 2, x} + , {mrdb_mutex_serializer, do, 2, x} + , {?MODULE, wait_for_other, 2, x} + , {?MODULE, go_ahead_other, 1, x} + , {mrdb, activity, x} ], tr_opts())). + tr_patterns(Mod, Ps, #{patterns := Pats} = Opts) -> Pats1 = [P || P <- Pats, element(1,P) =/= Mod], Opts#{patterns => Ps ++ Pats1}. @@ -542,12 +648,13 @@ wait_for_other(Parent, L) -> wait_for_other(Att, Parent, L) -> wait_for_other(Att, Parent, 1000, L). -wait_for_other(1, Parent, Timeout, L) -> +wait_for_other(0, Parent, Timeout, L) -> MRef = monitor(process, Parent), Parent ! {self(), ready}, receive {Parent, cont} -> demonitor(MRef), + Parent ! {self(), cont_ack}, ok; {'DOWN', MRef, _, _, Reason} -> ct:log("Parent died, Reason = ~p", [Reason]), @@ -586,7 +693,13 @@ go_ahead_other(Att, POther, Timeout) -> go_ahead_other_(POther, Timeout) -> receive {POther, ready} -> - POther ! {self(), cont} + POther ! {self(), cont}, + receive + {POther, cont_ack} -> + ok + after Timeout -> + error(cont_ack_timeout) + end after Timeout -> error(go_ahead_timeout) end. @@ -645,3 +758,8 @@ dictionary_unchanged(Old) -> , added := [] } = #{ deleted => Old -- New , added => New -- Old }, ok. + +check_stats(Alias) -> + Stats = mrdb_stats:get(Alias), + ct:log("Stats: ~p", [Stats]), + Stats.