Migration chunk handling was broken. Add progress reporting support
This commit is contained in:
parent
7fa3c2a58d
commit
c4f7b7ac02
@ -18,7 +18,8 @@
|
||||
, clear_table/1
|
||||
]).
|
||||
|
||||
-export([ migrate_standalone/2 ]).
|
||||
-export([ migrate_standalone/2
|
||||
, migrate_standalone/3 ]).
|
||||
|
||||
-export([ start_link/0
|
||||
, init/1
|
||||
@ -296,7 +297,14 @@ write_table_property(Alias, Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
|
||||
call(Alias, {write_table_property, Tab, Prop}).
|
||||
|
||||
migrate_standalone(Alias, Tabs) ->
|
||||
call(Alias, {migrate, Tabs}).
|
||||
migrate_standalone(Alias, Tabs, undefined).
|
||||
|
||||
migrate_standalone(Alias, Tabs, Rpt0) ->
|
||||
Rpt = case Rpt0 of
|
||||
undefined -> undefined;
|
||||
To -> #{to => To, tag => migrate_standalone}
|
||||
end,
|
||||
call(Alias, {migrate, Tabs, Rpt}).
|
||||
|
||||
-spec call(alias() | [], req()) -> no_return() | any().
|
||||
call(Alias, Req) ->
|
||||
@ -320,6 +328,7 @@ init([]) ->
|
||||
_ = maybe_initial_meta(), %% bootstrap pt
|
||||
Opts = default_opts(),
|
||||
process_flag(trap_exit, true),
|
||||
ensure_cf_cache(),
|
||||
mnesia:subscribe({table, schema, simple}),
|
||||
{ok, recover_state(#st{default_opts = Opts})}.
|
||||
|
||||
@ -588,10 +597,10 @@ handle_req(Alias, {write_table_property, Tab, Prop}, Backend, St) ->
|
||||
_ ->
|
||||
{reply, {error, not_found}, St}
|
||||
end;
|
||||
handle_req(Alias, {migrate, Tabs0}, Backend, St) ->
|
||||
case prepare_migration(Alias, Tabs0, St) of
|
||||
handle_req(Alias, {migrate, Tabs0, Rpt}, Backend, St) ->
|
||||
case prepare_migration(Alias, Tabs0, Rpt, St) of
|
||||
{ok, Tabs} ->
|
||||
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, St),
|
||||
{Res, St1} = do_migrate_tabs(Alias, Tabs, Backend, Rpt, St),
|
||||
{reply, Res, St1};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, St}
|
||||
@ -772,7 +781,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
||||
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
||||
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
|
||||
{ok, OldTRec, _} ->
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, St);
|
||||
?log(info, "Migrating ~p to column family", [Name]),
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
||||
_Other ->
|
||||
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St)
|
||||
end
|
||||
@ -781,11 +791,11 @@ create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
||||
{Exists, MP} = table_exists_as_standalone(Name),
|
||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
|
||||
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, St) ->
|
||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
||||
?log(debug, "Migrate to cf (~p)", [Name]),
|
||||
{ok, NewCf, St1} = create_table_as_cf(
|
||||
Alias, Name, TRec#{db_ref => DbRef}, St),
|
||||
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, St1),
|
||||
{ok, St2} = migrate_standalone_to_cf(OldTRec, NewCf, Rpt, St1),
|
||||
{ok, NewCf, St2}.
|
||||
|
||||
%% Return {Migrate, MP} iff table exists standalone; just false if it doesn't
|
||||
@ -802,17 +812,33 @@ should_we_migrate_standalone(#{name := Name}) ->
|
||||
false
|
||||
end.
|
||||
|
||||
prepare_migration(Alias, Tabs, St) ->
|
||||
prepare_migration(Alias, Tabs, Rpt, St) ->
|
||||
Res = lists:map(fun(T) ->
|
||||
prepare_migration_(Alias, T, St)
|
||||
end, Tabs),
|
||||
Res1 = add_related_tabs(Res, maps:get(Alias, St#st.backends), Alias, St),
|
||||
case [E || {error, _} = E <- Res1] of
|
||||
[] -> {ok, Res1};
|
||||
[] ->
|
||||
rpt(Rpt, "Will migrate ~p~n", [[T || {T,_,_} <- Res1]]),
|
||||
{ok, Res1};
|
||||
[_|_] = Errors ->
|
||||
rpt(Rpt, "Errors encountered: ~p~n", [Errors]),
|
||||
{error, Errors}
|
||||
end.
|
||||
|
||||
rpt(Rpt, Fmt, Args) ->
|
||||
rpt(Rpt, erlang:system_time(millisecond), Fmt, Args).
|
||||
|
||||
rpt(undefined, _, _, _) -> ok;
|
||||
rpt(#{to := Rpt} = R, Time, Fmt, Args) ->
|
||||
Rpt ! {mnesia_rocksdb, report, R#{time => Time, fmt => Fmt, args => Args}},
|
||||
ok.
|
||||
|
||||
maybe_progress(#{to := To}, C) when C rem 100000 =:= 0 ->
|
||||
To ! {mnesia_rocksdb, report, progress};
|
||||
maybe_progress(_, _) ->
|
||||
ok.
|
||||
|
||||
add_related_tabs(Ts, Backend, Alias, St) ->
|
||||
lists:flatmap(
|
||||
fun({error,_} = E) -> [E];
|
||||
@ -839,29 +865,31 @@ prepare_migration_(Alias, T, #st{} = St) ->
|
||||
{error, {no_such_table, TName}}
|
||||
end.
|
||||
|
||||
do_migrate_tabs(Alias, Tabs, Backend, St) ->
|
||||
do_migrate_tabs(Alias, Tabs, Backend, Rpt, St) ->
|
||||
lists:mapfoldl(fun(T, St1) ->
|
||||
do_migrate_table(Alias, T, Backend, St1)
|
||||
do_migrate_table(Alias, T, Backend, Rpt, St1)
|
||||
end, St, Tabs).
|
||||
|
||||
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, St) when is_map(TRec0) ->
|
||||
do_migrate_table(Alias, {Name, OldTRec, TRec0}, Backend, Rpt, St) when is_map(TRec0) ->
|
||||
T0 = erlang:system_time(millisecond),
|
||||
rpt(Rpt, T0, "Migrate ~p~n", [Name]),
|
||||
TRec = maps:without([encoding, vsn], TRec0),
|
||||
maybe_write_user_props(TRec),
|
||||
{ok, CF, St1} = create_cf_and_migrate(Alias, Name, OldTRec,
|
||||
TRec, Backend, St),
|
||||
TRec, Backend, Rpt, St),
|
||||
put_pt(Name, CF),
|
||||
T1 = erlang:system_time(millisecond),
|
||||
rpt(Rpt, T1, "~nDone (~p)~n", [Name]),
|
||||
Time = T1 - T0,
|
||||
io:fwrite("~p migrated, ~p ms~n", [Name, Time]),
|
||||
{{Name, {ok, Time}}, St1}.
|
||||
|
||||
migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
|
||||
#st{standalone = Ts} = St) ->
|
||||
Rpt, #st{standalone = Ts} = St) ->
|
||||
ChunkSz = chunk_size(TRec),
|
||||
KeyPos = mnesia_rocksdb_lib:keypos(T),
|
||||
migrate_to_cf(mrdb:select(OldTRec, [{'_',[],['$_']}], ChunkSz),
|
||||
TRec, OldTRec, KeyPos),
|
||||
TRec, OldTRec, KeyPos, set_count(0, Rpt)),
|
||||
case maps:is_key({Alias,T}, Ts)
|
||||
andalso table_is_empty(OldTRec) of
|
||||
true ->
|
||||
@ -871,27 +899,36 @@ migrate_standalone_to_cf(OldTRec, #{name := T, alias := Alias} = TRec,
|
||||
{ok, St}
|
||||
end.
|
||||
|
||||
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos) ->
|
||||
mrdb:as_batch(
|
||||
migrate_to_cf({L, Cont}, Cf, DbRec, KeyPos, Rpt) ->
|
||||
Count0 = get_count(Rpt),
|
||||
Count = mrdb:as_batch(
|
||||
Cf,
|
||||
fun(New) ->
|
||||
mrdb:as_batch(
|
||||
DbRec,
|
||||
fun(Old) ->
|
||||
lists:foreach(
|
||||
fun(Obj) ->
|
||||
lists:foldl(
|
||||
fun(Obj, C) ->
|
||||
mrdb:insert(New, Obj),
|
||||
mrdb:delete(Old, element(KeyPos,Obj))
|
||||
end, L)
|
||||
mrdb:delete(Old, element(KeyPos,Obj)),
|
||||
maybe_progress(Rpt, C),
|
||||
C + 1
|
||||
end, Count0, L)
|
||||
end)
|
||||
end),
|
||||
migrate_to_cf(cont(Cont), Cf, DbRec, KeyPos);
|
||||
migrate_to_cf('$end_of_table', _, _, _) ->
|
||||
migrate_to_cf(mrdb_select:select(Cont), Cf, DbRec, KeyPos, set_count(Count, Rpt));
|
||||
migrate_to_cf('$end_of_table', _, _, _, _) ->
|
||||
ok.
|
||||
|
||||
cont('$end_of_table' = E) -> E;
|
||||
cont(F) when is_function(F,0) ->
|
||||
F().
|
||||
get_count(undefined) ->
|
||||
0;
|
||||
get_count(R) when is_map(R) ->
|
||||
maps:get({count}, R, 0).
|
||||
|
||||
set_count(_, undefined) ->
|
||||
undefined;
|
||||
set_count(Count, R) when is_map(R) ->
|
||||
R#{{count} => Count}.
|
||||
|
||||
chunk_size(_) ->
|
||||
300.
|
||||
@ -1251,7 +1288,7 @@ rocksdb_opts_from_trec(TRec) ->
|
||||
|
||||
create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
|
||||
CfName = tab_to_cf_name(Name),
|
||||
case rocksdb:create_column_family(DbRef, CfName, cfopts()) of
|
||||
case create_column_family(DbRef, CfName, cfopts(), R) of
|
||||
{ok, CfH} ->
|
||||
R1 = check_version_and_encoding(R#{ cf_handle => CfH
|
||||
, type => column_family }),
|
||||
@ -1260,6 +1297,82 @@ create_table_as_cf(Alias, Name, #{db_ref := DbRef} = R, St) ->
|
||||
Error
|
||||
end.
|
||||
|
||||
create_column_family(DbRef, CfName, CfOpts, R) ->
|
||||
Res = case column_family_exists(CfName, R) of
|
||||
true ->
|
||||
case find_active_cf_handle(DbRef, CfName) of
|
||||
error ->
|
||||
{error, {no_handle_for_existing_cf, CfName}};
|
||||
{ok, _} = Ok ->
|
||||
Ok
|
||||
end;
|
||||
false ->
|
||||
rocksdb:create_column_family(DbRef, CfName, CfOpts)
|
||||
end,
|
||||
maybe_note_active_cf(Res, CfName),
|
||||
Res.
|
||||
|
||||
column_family_exists(CfName, #{mountpoint := MP}) ->
|
||||
case rocksdb:list_column_families(MP, []) of
|
||||
{ok, CFs} ->
|
||||
lists:member(CfName, CFs);
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
column_family_exists(CfName, #{alias := Alias}) ->
|
||||
case get_ref({admin, Alias}, error) of
|
||||
error ->
|
||||
false;
|
||||
Adm ->
|
||||
column_family_exists(CfName, Adm)
|
||||
end.
|
||||
|
||||
%% Column family handle caching ======================================================
|
||||
%%
|
||||
%% At least as far as I can tell, there is no way to query erlang-rocksdb for currently
|
||||
%% active column family handles. This can become an issue e.g. during table migration
|
||||
%% from standalone to column families, where the meta structures aren't updated until
|
||||
%% after completed migration. A transient error during migration should be addressable
|
||||
%% by simply retrying, but if the CF has already been created, and we've lost the handle,
|
||||
%% there is no easy way to get it back. Unfortunately, if we start caching CFs, we also
|
||||
%% need to garbage collect them.
|
||||
%%
|
||||
|
||||
-define(CFH_CACHE, mnesia_rocksdb_cf_handle_cache).
|
||||
|
||||
ensure_cf_cache() ->
|
||||
case ets:info(?CFH_CACHE, name) of
|
||||
undefined ->
|
||||
ets:new(?CFH_CACHE, [ordered_set, public, named_table]);
|
||||
_ ->
|
||||
true
|
||||
end.
|
||||
|
||||
maybe_note_active_cf({ok, CfH}, CfName) ->
|
||||
ets:insert(?CFH_CACHE, {{CfName, CfH}});
|
||||
maybe_note_active_cf(_, _) ->
|
||||
false.
|
||||
|
||||
find_active_cf_handle(DbRef, CfName) ->
|
||||
Candidates = ets:select(?CFH_CACHE, [{{{CfName,'$1'}}, [], ['$1']}]),
|
||||
lists:foldl(fun(CfH, Acc) -> check_cfh(DbRef, CfH, CfName, Acc) end, error, Candidates).
|
||||
|
||||
check_cfh(DbRef, CfH, CfName, Acc) ->
|
||||
case rocksdb:iterator(DbRef, CfH, []) of
|
||||
{ok, I} ->
|
||||
rocksdb:iterator_close(I),
|
||||
{ok, CfH};
|
||||
{error, _} ->
|
||||
ets:delete(?CFH_CACHE, {CfName, CfH}),
|
||||
Acc
|
||||
end.
|
||||
|
||||
drop_cached_cf(CfName, CfH) ->
|
||||
ets:delete(?CFH_CACHE, {CfName, CfH}).
|
||||
|
||||
%%
|
||||
%% ===================================================================================
|
||||
|
||||
do_prep_close(Name, Backend, St) ->
|
||||
RelTabs = get_related_resources(Name, Backend),
|
||||
erase_pt_list([Name | RelTabs]),
|
||||
@ -1302,6 +1415,7 @@ do_delete_table(Alias, Name, Backend, #st{} = St) ->
|
||||
case Where of
|
||||
#{db_ref := DbRef, cf_handle := CfH, type := column_family} ->
|
||||
rocksdb:drop_column_family(DbRef, CfH),
|
||||
drop_cached_cf(tab_to_cf_name(Name), CfH),
|
||||
rocksdb:destroy_column_family(DbRef, CfH),
|
||||
{ok, delete_cf(Alias, Name, St)};
|
||||
#{type := standalone} = R ->
|
||||
|
@ -149,8 +149,9 @@ create_migrateable_db(Config) ->
|
||||
Config.
|
||||
|
||||
fill_tabs(Tabs) ->
|
||||
%% Fill with more than 300, since that's the currently hard-coded chunk size
|
||||
lists:foreach(fun(Tab) ->
|
||||
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,3)]
|
||||
[mrdb:insert(Tab, {Tab, X, a}) || X <- lists:seq(1,500)]
|
||||
end, Tabs).
|
||||
|
||||
create_tabs(Tabs, Config) ->
|
||||
|
Loading…
x
Reference in New Issue
Block a user