5 Commits

Author SHA1 Message Date
Ulf Wiger 302aa1252b Merge pull request #26 from aeternity/uw-fix-db-migration
Migration chunk handling was broken. Add progress reporting support
2022-07-13 12:50:38 +02:00
Ulf Wiger fde2e1194e Fix type to satisfy Dialyzer 2022-07-13 11:35:40 +02:00
Ulf Wiger c4f7b7ac02 Migration chunk handling was broken. Add progress reporting support 2022-07-13 11:19:47 +02:00
Ulf Wiger 226a3b8e91 Merge pull request #25 from shahryarjb/patch-1
Add Erlang flag for README block code
2022-07-11 16:15:13 +02:00
Shahryar Tavakkoli 73784fe765 Add Erlang flag for README block code 2022-07-11 18:35:45 +04:30
3 changed files with 160 additions and 43 deletions
+5 -5
View File
@@ -80,7 +80,7 @@ RocksDB supports a number of customization options. These can be specified
by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`, by providing a `{Key, Value}` list named `rocksdb_opts` under `user_properties`,
for example: for example:
``` ```erlang
mnesia:create_table(foo, [{rocksdb_copies, [node()]}, mnesia:create_table(foo, [{rocksdb_copies, [node()]},
... ...
{user_properties, {user_properties,
@@ -93,7 +93,7 @@ for information on configuration parameters. Also see the section below on handl
The default configuration for tables in `mnesia_rocksdb` is: The default configuration for tables in `mnesia_rocksdb` is:
``` ```erlang
default_open_opts() -> default_open_opts() ->
[ {create_if_missing, true} [ {create_if_missing, true}
, {cache_size, , {cache_size,
@@ -195,7 +195,7 @@ our example. It returns a list of index terms.
Given the following index plugin implementation: Given the following index plugin implementation:
``` ```erlang
-module(words). -module(words).
-export([words_f/3]). -export([words_f/3]).
@@ -212,7 +212,7 @@ words_(_) ->
We can register the plugin and use it in table definitions: We can register the plugin and use it in table definitions:
``` ```erlang
Eshell V12.1.3 (abort with ^G) Eshell V12.1.3 (abort with ^G)
1> mnesia:start(). 1> mnesia:start().
ok ok
@@ -228,7 +228,7 @@ as an exported function along the node's code path.
To see what happens when we insert an object, we can turn on call trace. To see what happens when we insert an object, we can turn on call trace.
``` ```erlang
4> dbg:tracer(). 4> dbg:tracer().
{ok,<0.108.0>} {ok,<0.108.0>}
5> dbg:tp(words, x). 5> dbg:tp(words, x).
+153 -37
View File
@@ -18,7 +18,8 @@
, clear_table/1 , clear_table/1
]). ]).
-export([ migrate_standalone/2 ]). -export([ migrate_standalone/2
, migrate_standalone/3 ]).
-export([ start_link/0 -export([ start_link/0
, init/1 , init/1
@@ -66,6 +67,8 @@
-type cf() :: mrdb:db_ref(). -type cf() :: mrdb:db_ref().
-type rpt() :: undefined | map().
-type req() :: {create_table, table(), properties()} -type req() :: {create_table, table(), properties()}
| {delete_table, table()} | {delete_table, table()}
| {load_table, table(), properties()} | {load_table, table(), properties()}
@@ -74,7 +77,7 @@
| {add_aliases, [alias()]} | {add_aliases, [alias()]}
| {write_table_property, tabname(), tuple()} | {write_table_property, tabname(), tuple()}
| {remove_aliases, [alias()]} | {remove_aliases, [alias()]}
| {migrate, [{tabname(), map()}]} | {migrate, [{tabname(), map()}], rpt()}
| {prep_close, table()} | {prep_close, table()}
| {close_table, table()} | {close_table, table()}
| {clear_table, table() | cf() }. | {clear_table, table() | cf() }.
@@ -296,7 +299,14 @@ write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
call(Alias, {write_table_property, Tab, Prop}). call(Alias, {write_table_property, Tab, Prop}).
migrate_standalone(Alias, Tabs) -> migrate_standalone(Alias, Tabs) ->
call(Alias, {migrate, Tabs}). migrate_standalone(Alias, Tabs, undefined).
migrate_standalone(Alias, Tabs, Rpt0) ->
Rpt = case Rpt0 of
undefined -> undefined;
To -> #{to => To, tag => migrate_standalone}
end,
call(Alias, {migrate, Tabs, Rpt}).
-spec call(alias() | [], req()) -> no_return() | any(). -spec call(alias() | [], req()) -> no_return() | any().
call(Alias, Req) -> call(Alias, Req) ->
@@ -320,6 +330,7 @@ init([]) ->
_ = maybe_initial_meta(), %% bootstrap pt _ = maybe_initial_meta(), %% bootstrap pt
Opts = default_opts(), Opts = default_opts(),
process_flag(trap_exit, true), process_flag(trap_exit, true),
ensure_cf_cache(),
mnesia:subscribe({table, schema, simple}), mnesia:subscribe({table, schema, simple}),
{ok, recover_state(#st{default_opts = Opts})}. {ok, recover_state(#st{default_opts = Opts})}.
@@ -588,10 +599,10 @@ handle_req(Alias, {write_table_property, Tab, Prop}, Backend, St) ->
_ -> _ ->
{reply, {error, not_found}, St} {reply, {error, not_found}, St}
end; end;
handle_req(Alias, {migrate, Tabs0}, Backend, St) -> handle_req(Alias, {migrate, Tabs0, Rpt}, Backend, St) ->
case prepare_migration(Alias, Tabs0, St) of case prepare_migration(Alias, Tabs0, Rpt, St) of
{ok, Tabs} -> {ok, Tabs} ->
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, St), {Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, Rpt, St),
{reply, Res, St1}; {reply, Res, St1};
{error, _} = Error -> {error, _} = Error ->
{reply, Error, St} {reply, Error, St}
@@ -772,7 +783,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
?log(debug, "will create ~p as standalone and migrate", [Name]), ?log(debug, "will create ~p as standalone and migrate", [Name]),
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
{ok, OldTRec, _} -> {ok, OldTRec, _} ->
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, St); ?log(info, "Migrating ~p to column family", [Name]),
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
_Other -> _Other ->
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St) create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St)
end end
@@ -781,11 +793,11 @@ create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
{Exists, MP} = table_exists_as_standalone(Name), {Exists, MP} = table_exists_as_standalone(Name),
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St). create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, St) -> create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
?log(debug, "Migrate to cf (~p)", [Name]), ?log(debug, "Migrate to cf (~p)", [Name]),
{ok, NewCf, St1} = create_table_as_cf( {ok, NewCf, St1} = create_table_as_cf(
Alias, Name, TRec#{db_ref => DbRef}, St), Alias, Name, TRec#{db_ref => DbRef}, St),
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, St1), {ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, Rpt, St1),
{ok, NewCf, St2}. {ok, NewCf, St2}.
%% Return {Migrate, MP} iff table exists standalone; just false if it doesn't %% Return {Migrate, MP} iff table exists standalone; just false if it doesn't
@@ -802,17 +814,33 @@ should_we_migrate_standalone(#{name := Name}) ->
false false
end. end.
prepare_migration(Alias, Tabs, St) -> prepare_migration(Alias, Tabs, Rpt, St) ->
Res = lists:map(fun(T) -> Res = lists:map(fun(T) ->
prepare_migration_(Alias, T, St) prepare_migration_(Alias, T, St)
end, Tabs), end, Tabs),
Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St), Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St),
case [E || {error, _} = E <- Res1] of case [E || {error, _} = E <- Res1] of
[] -> {ok, Res1}; [] ->
rpt(Rpt, "Will migrate ~p~n", [[T || {T,_,_} <- Res1]]),
{ok, Res1};
[_|_] = Errors -> [_|_] = Errors ->
rpt(Rpt, "Errors encountered: ~p~n", [Errors]),
{error, Errors} {error, Errors}
end. end.
rpt(Rpt, Fmt, Args) ->
rpt(Rpt, erlang:system_time(millisecond), Fmt, Args).
rpt(undefined, _, _, _) -> ok;
rpt(#{to := Rpt} = R, Time, Fmt, Args) ->
Rpt ! {mnesia_rocksdb, report, R#{time => Time, fmt => Fmt, args => Args}},
ok.
maybe_progress(#{to := To}, C) when C rem 100000 =:= 0 ->
To ! {mnesia_rocksdb, report, progress};
maybe_progress(_, _) ->
ok.
add_related_tabs(Ts, Backend, Alias, St) -> add_related_tabs(Ts, Backend, Alias, St) ->
lists:flatmap( lists:flatmap(
fun({error,_} = E) -> [E]; fun({error,_} = E) -> [E];
@@ -839,29 +867,31 @@ prepare_migration_(Alias, T, #st{} = St) ->
{error, {no_such_table, TName}} {error, {no_such_table, TName}}
end. end.
do_migrate_tabs(Alias, Tabs, Backend, St) -> do_migrate_tabs(Alias, Tabs, Backend, Rpt, St) ->
lists:mapfoldl(fun(T, St1) -> lists:mapfoldl(fun(T, St1) ->
do_migrate_table(Alias, T, Backend, St1) do_migrate_table(Alias, T, Backend, Rpt, St1)
end, St, Tabs). end, St, Tabs).
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, St) when is_map(TRec0) -> do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, Rpt, St) when is_map(TRec0) ->
T0 = erlang:system_time(millisecond), T0 = erlang:system_time(millisecond),
rpt(Rpt, T0, "Migrate ~p~n", [Name]),
TRec = maps:without([encoding, vsn], TRec0), TRec = maps:without([encoding, vsn], TRec0),
maybe_write_user_props(TRec), maybe_write_user_props(TRec),
{ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec, {ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec,
TRec, Backend, St), TRec, Backend, Rpt, St),
put_pt(Name, CF), put_pt(Name, CF),
T1 = erlang:system_time(millisecond), T1 = erlang:system_time(millisecond),
rpt(Rpt, T1, "~nDone (~p)~n", [Name]),
Time = T1 - T0, Time = T1 - T0,
io:fwrite("~p migrated, ~p ms~n", [Name, Time]), io:fwrite("~p migrated, ~p ms~n", [Name, Time]),
{{Name, {ok, Time}}, St1}. {{Name, {ok, Time}}, St1}.
migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec, migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
#st{standalone = Ts} = St) -> Rpt, #st{standalone = Ts} = St) ->
ChunkSz = chunk_size(TRec), ChunkSz = chunk_size(TRec),
KeyPos = mnesia_rocksdb_lib:keypos(T), KeyPos = mnesia_rocksdb_lib:keypos(T),
migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz), migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz),
TRec, OldTRec, KeyPos), TRec, OldTRec, KeyPos, set_count(0, Rpt)),
case maps:is_key({Alias,T}, Ts) case maps:is_key({Alias,T}, Ts)
andalso table_is_empty(OldTRec) of andalso table_is_empty(OldTRec) of
true -> true ->
@@ -871,27 +901,36 @@ migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
{ok, St} {ok, St}
end. end.
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos) -> migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos, Rpt) ->
mrdb:as_batch( Count0 = get_count(Rpt),
Cf, Count = mrdb:as_batch(
fun(New) -> Cf,
mrdb:as_batch( fun(New) ->
DbRec, mrdb:as_batch(
fun(Old) -> DbRec,
lists:foreach( fun(Old) ->
fun(Obj) -> lists:foldl(
mrdb:insert(New, Obj), fun(Obj, C) ->
mrdb:delete(Old, element(KeyPos,Obj)) mrdb:insert(New, Obj),
end, L) mrdb:delete(Old, element(KeyPos,Obj)),
end) maybe_progress(Rpt, C),
end), C + 1
migrate_to_cf(cont(Cont), Cf, DbRec, KeyPos); end, Count0, L)
migrate_to_cf('$end_of_table', _, _, _) -> end)
end),
migrate_to_cf(mrdb_select:select(Cont), Cf, DbRec, KeyPos, set_count(Count, Rpt));
migrate_to_cf('$end_of_table', _, _, _, _) ->
ok. ok.
cont('$end_of_table' = E) -> E; get_count(undefined) ->
cont(F) when is_function(F,0) -> 0;
F(). get_count(R) when is_map(R) ->
maps:get({count}, R, 0).
set_count(_, undefined) ->
undefined;
set_count(Count, R) when is_map(R) ->
R#{{count} => Count}.
chunk_size(_) -> chunk_size(_) ->
300. 300.
@@ -1251,7 +1290,7 @@ rocksdb_opts_from_trec(TRec) ->
create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) -> create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
CfName = tab_to_cf_name(Name), CfName = tab_to_cf_name(Name),
case rocksdb:create_column_family(DbRef, CfName, cfopts()) of case create_column_family(DbRef, CfName, cfopts(), R) of
{ok, CfH} -> {ok, CfH} ->
R1 = check_version_and_encoding(R#{ cf_handle => CfH R1 = check_version_and_encoding(R#{ cf_handle => CfH
, type => column_family }), , type => column_family }),
@@ -1260,6 +1299,82 @@ create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
Error Error
end. end.
create_column_family(DbRef, CfName, CfOpts, R) ->
Res = case column_family_exists(CfName, R) of
true ->
case find_active_cf_handle(DbRef, CfName) of
error ->
{error, {no_handle_for_existing_cf, CfName}};
{ok, _} = Ok ->
Ok
end;
false ->
rocksdb:create_column_family(DbRef, CfName, CfOpts)
end,
maybe_note_active_cf(Res, CfName),
Res.
column_family_exists(CfName, #{mountpoint := MP}) ->
case rocksdb:list_column_families(MP, []) of
{ok, CFs} ->
lists:member(CfName, CFs);
_ ->
false
end;
column_family_exists(CfName, #{alias := Alias}) ->
case get_ref({admin, Alias}, error) of
error ->
false;
Adm ->
column_family_exists(CfName, Adm)
end.
%% Column family handle caching ======================================================
%%
%% At least as far as I can tell, there is no way to query erlang-rocksdb for currently
%% active column family handles. This can become an issue e.g. during table migration
%% from standalone to column families, where the meta structures aren't updated until
%% after completed migration. A transient error during migration should be addressable
%% by simply retrying, but if the CF has already been created, and we've lost the handle,
%% there is no easy way to get it back. Unfortunately, if we start caching CFs, we also
%% need to garbage collect them.
%%
-define(CFH_CACHE, mnesia_rocksdb_cf_handle_cache).
ensure_cf_cache() ->
case ets:info(?CFH_CACHE, name) of
undefined ->
ets:new(?CFH_CACHE, [ordered_set, public, named_table]);
_ ->
true
end.
maybe_note_active_cf({ok, CfH}, CfName) ->
ets:insert(?CFH_CACHE, {{CfName, CfH}});
maybe_note_active_cf(_, _) ->
false.
find_active_cf_handle(DbRef, CfName) ->
Candidates = ets:select(?CFH_CACHE, [{{{CfName,'$1'}}, [], ['$1']}]),
lists:foldl(fun(CfH, Acc) -> check_cfh(DbRef, CfH, CfName, Acc) end, error, Candidates).
check_cfh(DbRef, CfH, CfName, Acc) ->
case rocksdb:iterator(DbRef, CfH, []) of
{ok, I} ->
rocksdb:iterator_close(I),
{ok, CfH};
{error, _} ->
ets:delete(?CFH_CACHE, {CfName, CfH}),
Acc
end.
drop_cached_cf(CfName, CfH) ->
ets:delete(?CFH_CACHE, {CfName, CfH}).
%%
%% ===================================================================================
do_prep_close(Name, Backend, St) -> do_prep_close(Name, Backend, St) ->
RelTabs = get_related_resources(Name, Backend), RelTabs = get_related_resources(Name, Backend),
erase_pt_list([Name | RelTabs]), erase_pt_list([Name | RelTabs]),
@@ -1302,6 +1417,7 @@ do_delete_table(Alias, Name, Backend, #st{} = St) ->
case Where of case Where of
#{db_ref := DbRef, cf_handle := CfH, type := column_family} -> #{db_ref := DbRef, cf_handle := CfH, type := column_family} ->
rocksdb:drop_column_family(DbRef, CfH), rocksdb:drop_column_family(DbRef, CfH),
drop_cached_cf(tab_to_cf_name(Name), CfH),
rocksdb:destroy_column_family(DbRef, CfH), rocksdb:destroy_column_family(DbRef, CfH),
{ok, delete_cf(Alias, Name, St)}; {ok, delete_cf(Alias, Name, St)};
#{type := standalone} = R -> #{type := standalone} = R ->
+2 -1
View File
@@ -149,8 +149,9 @@ create_migrateable_db(Config) ->
Config. Config.
fill_tabs(Tabs) -> fill_tabs(Tabs) ->
%% Fill with more than 300, since that's the currently hard-coded chunk size
lists:foreach(fun(Tab) -> lists:foreach(fun(Tab) ->
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,3)] [mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,500)]
end, Tabs). end, Tabs).
create_tabs(Tabs, Config) -> create_tabs(Tabs, Config) ->