Ulf Wiger d5dafb5b7e Refactor to support column families, direct rocksdb access
Expose low-level helpers, fix dialyzer warnings

WIP column families and mrdb API

Basic functionality in place

started adding documentation

remove doc/ from .gitignore

add doc/* files

recognize pre-existing tabs at startup

wip: most of the functionality in place (not yet merge ops)

wip: adding transaction support

wip: add transaction test case (currently dumps core)

First draft, mnesia plugin user guide

Fix note formatting

WIP working on indexing

Index iterators, dialyzer, xref fixes

open db with optimistic transactions

Use rocksdb-1.7.0

Use seanhinde rocksdb patch, enable rollback

Call the right transaction_get() function

WIP add 'snap_tx' activity type

tx restart using mrdb_mutex

Fix test suite sync bugs

WIP instrumented for debugging

WIP working on migration test case

Add migration test suite

Migration works, subscribe to schema changes

WIP fix batch handling

Manage separate batches per db_ref

Add mrdb:fold/3

Add some docs, erlang_ls config

Use seanhinde's rocksdb vsn
2022-03-25 15:48:19 +01:00

26 KiB

Module mrdb

Mid-level access API for Mnesia-managed rocksdb tables.

Description

This module implements access functions for the mnesia_rocksdb backend plugin. The functions are designed to also support direct access to rocksdb with little overhead. Such direct access will maintain existing indexes, but not support replication.

Each table has a metadata structure stored as a persistent term for fast access. The structure of the metadata is as follows:

  #{ name       := <Logical table name>
   , db_ref     := <Rocksdb database Ref>
   , cf_handle  := <Rocksdb column family handle>
   , batch      := <Batch reference, if any>
   , tx_handle  := <Rocksdb transaction handle, if any>
   , attr_pos   := #{AttrName := Pos}
   , mode       := <Set to 'mnesia' for mnesia access flows>
   , properties := <Mnesia table props in map format>
   , type       := column_family | standalone
   }

Helper functions like as_batch(Ref, fun(R) -> ... end) and with_iterator(Ref, fun(I) -> ... end) add some extra convenience on top of the rocksdb API.

Note that no automatic provision exists to manage concurrent updates via mnesia AND direct access to this API. It's advisable to use ONE primary mode of access. If replication is used, the mnesia API will support this, but direct mrdb updates will not be replicated.

Data Types

activity_type()


activity_type() = mrdb_activity_type() | mnesia_activity_type()

admin_tab()


admin_tab() = {admin, alias()}

alias()


alias() = atom()

attr_pos()


attr_pos() = #{atom() => pos()}

batch_handle()


batch_handle() = rocksdb:batch_handle()

cf_handle()


cf_handle() = rocksdb:cf_handle()

db_handle()


db_handle() = rocksdb:db_handle()

db_ref()


db_ref() = #{name => table(), alias => atom(), vsn => non_neg_integer(), db_ref => db_handle(), cf_handle => cf_handle(), semantics => semantics(), encoding => encoding(), attr_pos => attr_pos(), type => column_family | standalone, status => open | closed | pre_existing, properties => properties(), mode => mnesia, ix_vals_f => fun((tuple()) -> [any()]), batch => batch_handle(), tx_handle => tx_handle(), term() => term()}

encoding()


encoding() = raw | sext | term | {key_encoding(), val_encoding()}

error()


error() = {error, any()}

index()


index() = {tab_name(), index, any()}

index_position()


index_position() = atom() | pos()

iterator_action()


iterator_action() = first | last | next | prev | binary() | {seek, binary()} | {seek_for_prev, binary()}

itr_handle()


itr_handle() = rocksdb:itr_handle()

key()


key() = any()

key_encoding()


key_encoding() = raw | sext | term

mnesia_activity_type()


mnesia_activity_type() = transaction | sync_transaction | async_dirty | sync_dirty

mrdb_activity_type()


mrdb_activity_type() = tx | {tx, tx_options()} | batch

mrdb_iterator()


mrdb_iterator() = #mrdb_iter{i = itr_handle(), ref = db_ref()}

obj()


obj() = tuple()

pos()


pos() = non_neg_integer()

properties()


properties() = #{record_name => atom(), attributes => [atom()], index => [{pos(), bag | ordered}]}

read_options()


read_options() = [{verify_checksums, boolean()} | {fill_cache, boolean()} | {iterate_upper_bound, binary()} | {iterate_lower_bound, binary()} | {tailing, boolean()} | {total_order_seek, boolean()} | {prefix_same_as_start, boolean()} | {snapshot, snapshot_handle()}]

ref_or_tab()


ref_or_tab() = table() | db_ref()

retainer()


retainer() = {tab_name(), retainer, any()}

retries()


retries() = non_neg_integer()

semantics()


semantics() = bag | set

snapshot_handle()


snapshot_handle() = rocksdb:snapshot_handle()

tab_name()


tab_name() = atom()

table()


table() = atom() | admin_tab() | index() | retainer()

tx_handle()


tx_handle() = rocksdb:transaction_handle()

tx_options()


tx_options() = #{retries => retries(), no_snapshot => boolean()}

val_encoding()


val_encoding() = {value | object, term | raw} | raw

write_options()


write_options() = [{sync, boolean()} | {disable_wal, boolean()} | {ignore_missing_column_families, boolean()} | {no_slowdown, boolean()} | {low_pri, boolean()}]

Function Index

abort/1Aborts an ongoing activity/2
activity/3Run an activity (similar to //mnesia/mnesia:activity/2).
alias_of/1Returns the alias of a given table or table reference.
as_batch/2Creates a rocksdb batch context and executes the fun F in it.
as_batch/3as as_batch/2, but with the ability to pass Opts to rocksdb:write_batch/2
batch_write/2
batch_write/3
current_context/0
delete/2
delete/3
delete_object/2
delete_object/3
ensure_ref/1
ensure_ref/2
first/1
first/2
fold/3
fold/4
fold/5
get_batch/1
get_ref/1
index_read/3
insert/2
insert/3
iterator/1
iterator/2
iterator_close/1
iterator_move/2
last/1
last/2
match_delete/2
new_tx/1
new_tx/2
next/2
next/3
prev/2
prev/3
rdb_delete/2
rdb_delete/3
rdb_fold/4
rdb_fold/5
rdb_get/2
rdb_get/3
rdb_iterator/1
rdb_iterator/2
rdb_iterator_move/2
rdb_put/3
rdb_put/4
read/2
read/3
read_info/1
read_info/2
release_snapshot/1release a snapshot created by snapshot/1.
select/1
select/2
select/3
snapshot/1Create a snapshot of the database instance associated with the table reference, table name or alias.
tx_commit/1
tx_ref/2
update_counter/3
update_counter/4
with_iterator/2
with_iterator/3
with_rdb_iterator/2
with_rdb_iterator/3
write_info/3

Function Details

abort/1

abort(Reason) -> any()

Aborts an ongoing activity/2

activity/3


activity(Type::activity_type(), Alias::alias(), F::fun(() -> Res)) -> Res

Run an activity (similar to //mnesia/mnesia:activity/2).

Supported activity types are:

  • transaction - An optimistic rocksdb transaction

  • {tx, TxOpts} - A rocksdb transaction with sligth modifications

  • batch - A rocksdb batch operation

By default, transactions are combined with a snapshot with 1 retry. The snapshot ensures that writes from concurrent transactions don't leak into the transaction context. A transaction will be retried if it detects that the commit set conflicts with recent changes. A mutex is used to ensure that only one of potentially conflicting mrdb transactions is run at a time. The re-run transaction may still fail, if new transactions, or non-transaction writes interfere with the commit set. It will then be re-run again, until the retry count is exhausted.

Valid TxOpts are #{no_snapshot => boolean(), retries => retries()}.

To simplify code adaptation, tx | transaction | sync_transaction are synonyms, and batch | async_dirty | sync_dirty are synonyms.

alias_of/1


alias_of(Tab::ref_or_tab()) -> alias()

Returns the alias of a given table or table reference.

as_batch/2


as_batch(Tab::ref_or_tab(), F::fun((db_ref()) -> Res)) -> Res

Creates a rocksdb batch context and executes the fun F in it.

%% Rocksdb batches aren't tied to a specific DbRef until written. This can cause surprising problems if we're juggling multiple rocksdb instances (as we do if we have standalone tables). At the time of writing, all objects end up in the DbRef the batch is written to, albeit not necessarily in the intended column family. This will probably change, but no failure mode is really acceptable. The code below ensures that separate batches are created for each DbRef, under a unique reference stored in the pdict. When writing, all batches are written separately to the corresponding DbRef, and when releasing, all batches are released. This will not ensure atomicity, but there is no way in rocksdb to achieve atomicity across db instances. At least, data should end up where you expect.

as_batch/3

as_batch(Tab, F, Opts) -> any()

as as_batch/2, but with the ability to pass Opts to rocksdb:write_batch/2

batch_write/2

batch_write(Tab, L) -> any()

batch_write/3

batch_write(Tab, L, Opts) -> any()

current_context/0

current_context() -> any()

delete/2


delete(Tab::ref_or_tab(), Key::key()) -> ok

delete/3


delete(Tab::ref_or_tab(), Key::key(), Opts::write_options()) -> ok

delete_object/2

delete_object(Tab, Obj) -> any()

delete_object/3

delete_object(Tab, Obj, Opts) -> any()

ensure_ref/1


ensure_ref(Ref::ref_or_tab()) -> db_ref()

ensure_ref/2

ensure_ref(Ref, R) -> any()

first/1


first(Tab::ref_or_tab()) -> key() | $end_of_table

first/2


first(Tab::ref_or_tab(), Opts::read_options()) -> key() | $end_of_table

fold/3

fold(Tab, Fun, Acc) -> any()

fold/4

fold(Tab, Fun, Acc, MatchSpec) -> any()

fold/5

fold(Tab, Fun, Acc, MatchSpec, Limit) -> any()

get_batch/1

get_batch(X1) -> any()

get_ref/1


get_ref(Tab::table()) -> db_ref()

index_read/3

index_read(Tab, Val, Ix) -> any()

insert/2


insert(Tab::ref_or_tab(), Obj::obj()) -> ok

insert/3


insert(Tab::ref_or_tab(), Obj0::obj(), Opts::write_options()) -> ok

iterator/1


iterator(Tab::ref_or_tab()) -> {ok, mrdb_iterator()} | {error, term()}

iterator/2


iterator(Tab::ref_or_tab(), Opts::read_options()) -> {ok, mrdb_iterator()} | {error, term()}

iterator_close/1


iterator_close(Mrdb_iter::mrdb_iterator()) -> ok

iterator_move/2


iterator_move(Mrdb_iter::mrdb_iterator(), Dir::iterator_action()) -> {ok, tuple()} | {error, any()}

last/1


last(Tab::ref_or_tab()) -> key() | $end_of_table

last/2


last(Tab::ref_or_tab(), Opts::read_options()) -> key() | $end_of_table

match_delete/2

match_delete(Tab, Pat) -> any()

new_tx/1


new_tx(Tab::table() | db_ref()) -> db_ref()

new_tx/2


new_tx(Tab::ref_or_tab(), Opts::write_options()) -> db_ref()

next/2


next(Tab::ref_or_tab(), K::key()) -> key() | $end_of_table

next/3


next(Tab::ref_or_tab(), K::key(), Opts::read_options()) -> key() | $end_of_table

prev/2


prev(Tab::ref_or_tab(), K::key()) -> key() | $end_of_table

prev/3


prev(Tab::ref_or_tab(), K::key(), Opts::read_options()) -> key() | $end_of_table

rdb_delete/2

rdb_delete(R, K) -> any()

rdb_delete/3

rdb_delete(R, K, Opts) -> any()

rdb_fold/4

rdb_fold(Tab, Fun, Acc, Prefix) -> any()

rdb_fold/5

rdb_fold(Tab, Fun, Acc, Prefix, Limit) -> any()

rdb_get/2

rdb_get(R, K) -> any()

rdb_get/3

rdb_get(R, K, Opts) -> any()

rdb_iterator/1

rdb_iterator(R) -> any()

rdb_iterator/2

rdb_iterator(R, Opts) -> any()

rdb_iterator_move/2

rdb_iterator_move(I, Dir) -> any()

rdb_put/3

rdb_put(R, K, V) -> any()

rdb_put/4

rdb_put(R, K, V, Opts) -> any()

read/2

read(Tab, Key) -> any()

read/3

read(Tab, Key, Opts) -> any()

read_info/1

read_info(Tab) -> any()

read_info/2

read_info(Tab, K) -> any()

release_snapshot/1


release_snapshot(SHandle::snapshot_handle()) -> ok | error()

release a snapshot created by snapshot/1.

select/1

select(Cont) -> any()

select/2

select(Tab, Pat) -> any()

select/3

select(Tab, Pat, Limit) -> any()

snapshot/1


snapshot(Name::alias() | ref_or_tab()) -> {ok, snapshot_handle()} | error()

Create a snapshot of the database instance associated with the table reference, table name or alias.

Snapshots provide consistent read-only views over the entire state of the key-value store.

tx_commit/1


tx_commit(TxH::tx_handle() | db_ref()) -> ok

tx_ref/2


tx_ref(Tab::ref_or_tab() | db_ref() | db_ref(), TxH::tx_handle()) -> db_ref()

update_counter/3

update_counter(Tab, C, Val) -> any()

update_counter/4

update_counter(Tab, C, Val, Opts) -> any()

with_iterator/2


with_iterator(Tab::ref_or_tab(), Fun::fun((mrdb_iterator()) -> Res)) -> Res

with_iterator/3


with_iterator(Tab::ref_or_tab(), Fun::fun((mrdb_iterator()) -> Res), Opts::read_options()) -> Res

with_rdb_iterator/2


with_rdb_iterator(Tab::ref_or_tab(), Fun::fun((itr_handle()) -> Res)) -> Res

with_rdb_iterator/3


with_rdb_iterator(Tab::ref_or_tab(), Fun::fun((itr_handle()) -> Res), Opts::read_options()) -> Res

write_info/3

write_info(Tab, K, V) -> any()