diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index e39563c..01045d5 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -18,7 +18,8 @@ , clear_table/1 ]). --export([ migrate_standalone/2 ]). +-export([ migrate_standalone/2 + , migrate_standalone/3 ]). -export([ start_link/0 , init/1 @@ -296,7 +297,14 @@ write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 -> call(Alias, {write_table_property, Tab, Prop}). migrate_standalone(Alias, Tabs) -> - call(Alias, {migrate, Tabs}). + migrate_standalone(Alias, Tabs, undefined). + +migrate_standalone(Alias, Tabs, Rpt0) -> + Rpt = case Rpt0 of + undefined -> undefined; + To -> #{to => To, tag => migrate_standalone} + end, + call(Alias, {migrate, Tabs, Rpt}). -spec call(alias() | [], req()) -> no_return() | any(). call(Alias, Req) -> @@ -320,6 +328,7 @@ init([]) -> _ = maybe_initial_meta(), %% bootstrap pt Opts = default_opts(), process_flag(trap_exit, true), + ensure_cf_cache(), mnesia:subscribe({table, schema, simple}), {ok, recover_state(#st{default_opts = Opts})}. @@ -588,10 +597,10 @@ handle_req(Alias, {write_table_property, Tab, Prop}, Backend, St) -> _ -> {reply, {error, not_found}, St} end; -handle_req(Alias, {migrate, Tabs0}, Backend, St) -> - case prepare_migration(Alias, Tabs0, St) of +handle_req(Alias, {migrate, Tabs0, Rpt}, Backend, St) -> + case prepare_migration(Alias, Tabs0, Rpt, St) of {ok, Tabs} -> - {Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, St), + {Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, Rpt, St), {reply, Res, St1}; {error, _} = Error -> {reply, Error, St} @@ -772,7 +781,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec, ?log(debug, "will create ~p as standalone and migrate", [Name]), case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of {ok, OldTRec, _} -> - create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, St); + ?log(info, "Migrating ~p to column family", [Name]), + create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St); _Other -> create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St) end @@ -781,11 +791,11 @@ create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) -> {Exists, MP} = table_exists_as_standalone(Name), create_table_as_standalone(Alias, Name, Exists, MP, TRec, St). -create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, St) -> +create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) -> ?log(debug, "Migrate to cf (~p)", [Name]), {ok, NewCf, St1} = create_table_as_cf( Alias, Name, TRec#{db_ref => DbRef}, St), - {ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, St1), + {ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, Rpt, St1), {ok, NewCf, St2}. %% Return {Migrate, MP} iff table exists standalone; just false if it doesn't @@ -802,17 +812,33 @@ should_we_migrate_standalone(#{name := Name}) -> false end. -prepare_migration(Alias, Tabs, St) -> +prepare_migration(Alias, Tabs, Rpt, St) -> Res = lists:map(fun(T) -> prepare_migration_(Alias, T, St) end, Tabs), Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St), case [E || {error, _} = E <- Res1] of - [] -> {ok, Res1}; + [] -> + rpt(Rpt, "Will migrate ~p~n", [[T || {T,_,_} <- Res1]]), + {ok, Res1}; [_|_] = Errors -> + rpt(Rpt, "Errors encountered: ~p~n", [Errors]), {error, Errors} end. +rpt(Rpt, Fmt, Args) -> + rpt(Rpt, erlang:system_time(millisecond), Fmt, Args). + +rpt(undefined, _, _, _) -> ok; +rpt(#{to := Rpt} = R, Time, Fmt, Args) -> + Rpt ! {mnesia_rocksdb, report, R#{time => Time, fmt => Fmt, args => Args}}, + ok. + +maybe_progress(#{to := To}, C) when C rem 100000 =:= 0 -> + To ! {mnesia_rocksdb, report, progress}; +maybe_progress(_, _) -> + ok. + add_related_tabs(Ts, Backend, Alias, St) -> lists:flatmap( fun({error,_} = E) -> [E]; @@ -839,29 +865,31 @@ prepare_migration_(Alias, T, #st{} = St) -> {error, {no_such_table, TName}} end. -do_migrate_tabs(Alias, Tabs, Backend, St) -> +do_migrate_tabs(Alias, Tabs, Backend, Rpt, St) -> lists:mapfoldl(fun(T, St1) -> - do_migrate_table(Alias, T, Backend, St1) + do_migrate_table(Alias, T, Backend, Rpt, St1) end, St, Tabs). -do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, St) when is_map(TRec0) -> +do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, Rpt, St) when is_map(TRec0) -> T0 = erlang:system_time(millisecond), + rpt(Rpt, T0, "Migrate ~p~n", [Name]), TRec = maps:without([encoding, vsn], TRec0), maybe_write_user_props(TRec), {ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec, - TRec, Backend, St), + TRec, Backend, Rpt, St), put_pt(Name, CF), T1 = erlang:system_time(millisecond), + rpt(Rpt, T1, "~nDone (~p)~n", [Name]), Time = T1 - T0, io:fwrite("~p migrated, ~p ms~n", [Name, Time]), {{Name, {ok, Time}}, St1}. migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec, - #st{standalone = Ts} = St) -> + Rpt, #st{standalone = Ts} = St) -> ChunkSz = chunk_size(TRec), KeyPos = mnesia_rocksdb_lib:keypos(T), migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz), - TRec, OldTRec, KeyPos), + TRec, OldTRec, KeyPos, set_count(0, Rpt)), case maps:is_key({Alias,T}, Ts) andalso table_is_empty(OldTRec) of true -> @@ -871,27 +899,36 @@ migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec, {ok, St} end. -migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos) -> - mrdb:as_batch( - Cf, - fun(New) -> - mrdb:as_batch( - DbRec, - fun(Old) -> - lists:foreach( - fun(Obj) -> - mrdb:insert(New, Obj), - mrdb:delete(Old, element(KeyPos,Obj)) - end, L) - end) - end), - migrate_to_cf(cont(Cont), Cf, DbRec, KeyPos); -migrate_to_cf('$end_of_table', _, _, _) -> +migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos, Rpt) -> + Count0 = get_count(Rpt), + Count = mrdb:as_batch( + Cf, + fun(New) -> + mrdb:as_batch( + DbRec, + fun(Old) -> + lists:foldl( + fun(Obj, C) -> + mrdb:insert(New, Obj), + mrdb:delete(Old, element(KeyPos,Obj)), + maybe_progress(Rpt, C), + C + 1 + end, Count0, L) + end) + end), + migrate_to_cf(mrdb_select:select(Cont), Cf, DbRec, KeyPos, set_count(Count, Rpt)); +migrate_to_cf('$end_of_table', _, _, _, _) -> ok. -cont('$end_of_table' = E) -> E; -cont(F) when is_function(F,0) -> - F(). +get_count(undefined) -> + 0; +get_count(R) when is_map(R) -> + maps:get({count}, R, 0). + +set_count(_, undefined) -> + undefined; +set_count(Count, R) when is_map(R) -> + R#{{count} => Count}. chunk_size(_) -> 300. @@ -1251,7 +1288,7 @@ rocksdb_opts_from_trec(TRec) -> create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) -> CfName = tab_to_cf_name(Name), - case rocksdb:create_column_family(DbRef, CfName, cfopts()) of + case create_column_family(DbRef, CfName, cfopts(), R) of {ok, CfH} -> R1 = check_version_and_encoding(R#{ cf_handle => CfH , type => column_family }), @@ -1260,6 +1297,82 @@ create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) -> Error end. +create_column_family(DbRef, CfName, CfOpts, R) -> + Res = case column_family_exists(CfName, R) of + true -> + case find_active_cf_handle(DbRef, CfName) of + error -> + {error, {no_handle_for_existing_cf, CfName}}; + {ok, _} = Ok -> + Ok + end; + false -> + rocksdb:create_column_family(DbRef, CfName, CfOpts) + end, + maybe_note_active_cf(Res, CfName), + Res. + +column_family_exists(CfName, #{mountpoint := MP}) -> + case rocksdb:list_column_families(MP, []) of + {ok, CFs} -> + lists:member(CfName, CFs); + _ -> + false + end; +column_family_exists(CfName, #{alias := Alias}) -> + case get_ref({admin, Alias}, error) of + error -> + false; + Adm -> + column_family_exists(CfName, Adm) + end. + +%% Column family handle caching ====================================================== +%% +%% At least as far as I can tell, there is no way to query erlang-rocksdb for currently +%% active column family handles. This can become an issue e.g. during table migration +%% from standalone to column families, where the meta structures aren't updated until +%% after completed migration. A transient error during migration should be addressable +%% by simply retrying, but if the CF has already been created, and we've lost the handle, +%% there is no easy way to get it back. Unfortunately, if we start caching CFs, we also +%% need to garbage collect them. +%% + +-define(CFH_CACHE, mnesia_rocksdb_cf_handle_cache). + +ensure_cf_cache() -> + case ets:info(?CFH_CACHE, name) of + undefined -> + ets:new(?CFH_CACHE, [ordered_set, public, named_table]); + _ -> + true + end. + +maybe_note_active_cf({ok, CfH}, CfName) -> + ets:insert(?CFH_CACHE, {{CfName, CfH}}); +maybe_note_active_cf(_, _) -> + false. + +find_active_cf_handle(DbRef, CfName) -> + Candidates = ets:select(?CFH_CACHE, [{{{CfName,'$1'}}, [], ['$1']}]), + lists:foldl(fun(CfH, Acc) -> check_cfh(DbRef, CfH, CfName, Acc) end, error, Candidates). + +check_cfh(DbRef, CfH, CfName, Acc) -> + case rocksdb:iterator(DbRef, CfH, []) of + {ok, I} -> + rocksdb:iterator_close(I), + {ok, CfH}; + {error, _} -> + ets:delete(?CFH_CACHE, {CfName, CfH}), + Acc + end. + +drop_cached_cf(CfName, CfH) -> + ets:delete(?CFH_CACHE, {CfName, CfH}). + +%% +%% =================================================================================== + do_prep_close(Name, Backend, St) -> RelTabs = get_related_resources(Name, Backend), erase_pt_list([Name | RelTabs]), @@ -1302,6 +1415,7 @@ do_delete_table(Alias, Name, Backend, #st{} = St) -> case Where of #{db_ref := DbRef, cf_handle := CfH, type := column_family} -> rocksdb:drop_column_family(DbRef, CfH), + drop_cached_cf(tab_to_cf_name(Name), CfH), rocksdb:destroy_column_family(DbRef, CfH), {ok, delete_cf(Alias, Name, St)}; #{type := standalone} = R -> diff --git a/test/mnesia_rocksdb_migration_SUITE.erl b/test/mnesia_rocksdb_migration_SUITE.erl index b218e29..7e81cc2 100644 --- a/test/mnesia_rocksdb_migration_SUITE.erl +++ b/test/mnesia_rocksdb_migration_SUITE.erl @@ -149,8 +149,9 @@ create_migrateable_db(Config) -> Config. fill_tabs(Tabs) -> + %% Fill with more than 300, since that's the currently hard-coded chunk size lists:foreach(fun(Tab) -> - [mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,3)] + [mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,500)] end, Tabs). create_tabs(Tabs, Config) ->