Remove legacy support for vsn 1 standalone tabs
This commit is contained in:
parent
d8b6ab788e
commit
323fcea7a7
@ -262,18 +262,9 @@ i_show_table(_, _, 0, _) ->
|
|||||||
i_show_table(I, Move, Limit, Ref) ->
|
i_show_table(I, Move, Limit, Ref) ->
|
||||||
case rocksdb:iterator_move(I, Move) of
|
case rocksdb:iterator_move(I, Move) of
|
||||||
{ok, EncKey, EncVal} ->
|
{ok, EncKey, EncVal} ->
|
||||||
{Type,Val} =
|
K = decode_key(EncKey, Ref),
|
||||||
case EncKey of
|
Val = decode_val(EncVal, K, Ref),
|
||||||
<< ?INFO_TAG, K/binary >> ->
|
io:fwrite("~p~n", [Val]),
|
||||||
K1 = decode_key(K, Ref),
|
|
||||||
V = decode_val(EncVal, K1, Ref),
|
|
||||||
{info,V};
|
|
||||||
_ ->
|
|
||||||
K = decode_key(EncKey, Ref),
|
|
||||||
V = decode_val(EncVal, K, Ref),
|
|
||||||
{data,V}
|
|
||||||
end,
|
|
||||||
io:fwrite("~p: ~p~n", [Type, Val]),
|
|
||||||
i_show_table(I, next, Limit-1, Ref);
|
i_show_table(I, next, Limit-1, Ref);
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
|
@ -250,31 +250,6 @@ get_info_res(Res, Default) ->
|
|||||||
error(E)
|
error(E)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Admin info: metadata written by the admin proc to keep track of
|
|
||||||
%% the derived status of tables (such as detected version and encoding
|
|
||||||
%% of existing standalone tables.)
|
|
||||||
%%
|
|
||||||
write_admin_info(K, V, Alias, Name) ->
|
|
||||||
mrdb:rdb_put(get_ref({admin, Alias}),
|
|
||||||
admin_info_key(K, Name),
|
|
||||||
term_to_binary(V)).
|
|
||||||
|
|
||||||
read_admin_info(K, Alias, Name) ->
|
|
||||||
EncK = admin_info_key(K, Name),
|
|
||||||
case mrdb:rdb_get(get_ref({admin,Alias}), EncK) of
|
|
||||||
{ok, Bin} ->
|
|
||||||
{ok, binary_to_term(Bin)};
|
|
||||||
_ ->
|
|
||||||
error
|
|
||||||
end.
|
|
||||||
|
|
||||||
delete_admin_info(K, Alias, Name) ->
|
|
||||||
EncK = admin_info_key(K, Name),
|
|
||||||
mrdb:rdb_delete(get_ref({admin, Alias}), EncK).
|
|
||||||
|
|
||||||
admin_info_key(K, Name) ->
|
|
||||||
mnesia_rocksdb_lib:encode_key({admin_info, Name, K}, sext).
|
|
||||||
|
|
||||||
%% Table metadata info maintained by users
|
%% Table metadata info maintained by users
|
||||||
%%
|
%%
|
||||||
write_info(Alias, Tab, K, V) ->
|
write_info(Alias, Tab, K, V) ->
|
||||||
@ -790,10 +765,10 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
|||||||
false ->
|
false ->
|
||||||
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
|
create_table_as_cf(Alias, Name, TRec#{db_ref => DbRef}, St);
|
||||||
{false, MP} ->
|
{false, MP} ->
|
||||||
create_table_as_standalone(Alias, Name, true, MP, TRec, St);
|
create_table_as_standalone(Alias, Name, MP, TRec, St);
|
||||||
{true, MP} ->
|
{true, MP} ->
|
||||||
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
?log(debug, "will create ~p as standalone and migrate", [Name]),
|
||||||
case create_table_as_standalone(Alias, Name, false, MP, TRec, St) of
|
case create_table_as_standalone(Alias, Name, MP, TRec, St) of
|
||||||
{ok, OldTRec, _} ->
|
{ok, OldTRec, _} ->
|
||||||
?log(info, "Migrating ~p to column family", [Name]),
|
?log(info, "Migrating ~p to column family", [Name]),
|
||||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
create_cf_and_migrate(Alias, Name, OldTRec, TRec, Backend, undefined, St);
|
||||||
@ -802,8 +777,8 @@ create_table_from_trec(Alias, Name, #{type := column_family} = TRec,
|
|||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
create_table_from_trec(Alias, Name, #{type := standalone} = TRec, _, St) ->
|
||||||
{Exists, MP} = table_exists_as_standalone(Name),
|
{_, MP} = table_exists_as_standalone(Name),
|
||||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St).
|
create_table_as_standalone(Alias, Name, MP, TRec, St).
|
||||||
|
|
||||||
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
create_cf_and_migrate(Alias, Name, OldTRec, TRec, #{db_ref := DbRef}, Rpt, St) ->
|
||||||
?log(debug, "Migrate to cf (~p)", [Name]),
|
?log(debug, "Migrate to cf (~p)", [Name]),
|
||||||
@ -1083,24 +1058,12 @@ table_exists_as_standalone(Name) ->
|
|||||||
end,
|
end,
|
||||||
{Exists, MP}.
|
{Exists, MP}.
|
||||||
|
|
||||||
create_table_as_standalone(Alias, Name, Exists, MP, TRec, St) ->
|
create_table_as_standalone(Alias, Name, MP, TRec, St) ->
|
||||||
case create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) of
|
|
||||||
{ok, #{ type := standalone, vsn := Vsn1
|
|
||||||
, encoding := Enc1}
|
|
||||||
, _St1} = Ok ->
|
|
||||||
write_admin_info(standalone_vsn_and_enc, {Vsn1, Enc1},
|
|
||||||
Alias, Name),
|
|
||||||
Ok;
|
|
||||||
Other ->
|
|
||||||
Other
|
|
||||||
end.
|
|
||||||
|
|
||||||
create_table_as_standalone_(Alias, Name, Exists, MP, TRec, St) ->
|
|
||||||
Vsn = check_version(TRec),
|
Vsn = check_version(TRec),
|
||||||
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
|
TRec1 = TRec#{vsn => Vsn, encoding => get_encoding(Vsn, TRec)},
|
||||||
do_open_standalone(true, Alias, Name, Exists, MP, TRec1, St).
|
do_open_standalone(true, Alias, Name, MP, TRec1, St).
|
||||||
|
|
||||||
do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
|
do_open_standalone(CreateIfMissing, Alias, Name, MP, TRec0,
|
||||||
#st{standalone = Ts} = St) ->
|
#st{standalone = Ts} = St) ->
|
||||||
Opts = rocksdb_opts_from_trec(TRec0),
|
Opts = rocksdb_opts_from_trec(TRec0),
|
||||||
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
|
case open_db_(MP, Alias, Opts, [], CreateIfMissing) of
|
||||||
@ -1110,97 +1073,12 @@ do_open_standalone(CreateIfMissing, Alias, Name, Exists, MP, TRec0,
|
|||||||
DbRec1 = DbRec#{ cfs => CfNames,
|
DbRec1 = DbRec#{ cfs => CfNames,
|
||||||
mountpoint => MP },
|
mountpoint => MP },
|
||||||
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
|
TRec = maps:merge(TRec0, DbRec#{type => standalone}),
|
||||||
TRec1 = guess_table_vsn_and_encoding(Exists, TRec),
|
{ok, TRec, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
||||||
{ok, TRec1, St#st{standalone = Ts#{{Alias, Name} => DbRec1}}};
|
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
?log(debug, "open_db error: ~p", [Err]),
|
?log(debug, "open_db error: ~p", [Err]),
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% When opening a standalone table, chances are it's a legacy table
|
|
||||||
%% where legacy encoding is already in place. We try to read the
|
|
||||||
%% first object and apply legacy encoding. If successful, we set
|
|
||||||
%% legacy encoding in the TRec. If we migrate the data to a column
|
|
||||||
%% family, we should apply the defined encoding for the cf.
|
|
||||||
%%
|
|
||||||
%% The first object can either be an info object (in the legacy case)
|
|
||||||
%% or a data object, with a sext-encoded key, and a term_to_binary-
|
|
||||||
%% encoded object as value, where the key position is set to [].
|
|
||||||
%% The info objects would be sext-encoded key + term-encoded value.
|
|
||||||
guess_table_vsn_and_encoding(false, TRec) ->
|
|
||||||
TRec;
|
|
||||||
guess_table_vsn_and_encoding(true, #{properties := #{attributes := As},
|
|
||||||
alias := Alias, name := Name} = R) ->
|
|
||||||
case read_admin_info(standalone_vsn_and_enc, Alias, Name) of
|
|
||||||
{ok, {V, E}} ->
|
|
||||||
R#{vsn => V, encoding => E};
|
|
||||||
error ->
|
|
||||||
R1 = set_default_guess(R),
|
|
||||||
mrdb:with_rdb_iterator(
|
|
||||||
R1, fun(I) ->
|
|
||||||
guess_table_vsn_and_encoding_(
|
|
||||||
mrdb:rdb_iterator_move(I, first), I, As, R1)
|
|
||||||
end)
|
|
||||||
end.
|
|
||||||
|
|
||||||
set_default_guess(#{type := standalone} = R) ->
|
|
||||||
case application:get_env(mnesia_rocksdb, standalone_default_vsn, ?VSN) of
|
|
||||||
1 ->
|
|
||||||
error({unsupported_vsn, 1});
|
|
||||||
V ->
|
|
||||||
R#{vsn => V}
|
|
||||||
end.
|
|
||||||
|
|
||||||
guess_table_vsn_and_encoding_({ok, K, V}, _I, As, R) ->
|
|
||||||
Arity = length(As) + 1,
|
|
||||||
case K of
|
|
||||||
<<?INFO_TAG, EncK/binary>> ->
|
|
||||||
try _ = {mnesia_rocksdb_lib:decode(EncK, sext),
|
|
||||||
mnesia_rocksdb_lib:decode(V, term)},
|
|
||||||
%% This is a vsn 1 standalone table
|
|
||||||
R#{vsn => 1, encoding => {sext, {object, term}}}
|
|
||||||
catch
|
|
||||||
error:_ ->
|
|
||||||
R
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
Enc = guess_obj_encoding(K, V, Arity),
|
|
||||||
R#{encoding => Enc}
|
|
||||||
end;
|
|
||||||
guess_table_vsn_and_encoding_(_Other, _, _, R) ->
|
|
||||||
R.
|
|
||||||
|
|
||||||
guess_obj_encoding(K, V, Arity) ->
|
|
||||||
{guess_key_encoding(K), guess_val_encoding(V, Arity)}.
|
|
||||||
|
|
||||||
guess_encoding(Bin) ->
|
|
||||||
try {sext, sext:decode(Bin)}
|
|
||||||
catch
|
|
||||||
error:_ ->
|
|
||||||
try {term, binary_to_term(Bin)}
|
|
||||||
catch
|
|
||||||
error:_ -> raw
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
guess_key_encoding(Bin) ->
|
|
||||||
case guess_encoding(Bin) of
|
|
||||||
raw -> raw;
|
|
||||||
{Enc, _} -> Enc
|
|
||||||
end.
|
|
||||||
|
|
||||||
guess_val_encoding(Bin, Arity) ->
|
|
||||||
case guess_encoding(Bin) of
|
|
||||||
raw -> {value, raw};
|
|
||||||
{Enc, Term} ->
|
|
||||||
if is_tuple(Term), size(Term) == Arity,
|
|
||||||
element(2, Term) == [] ->
|
|
||||||
{object, Enc};
|
|
||||||
true ->
|
|
||||||
{value, Enc}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
|
%% This is slightly different from `rocksdb:is_empty/1`, since it allows
|
||||||
%% for the presence of some metadata, and still considers it empty if there
|
%% for the presence of some metadata, and still considers it empty if there
|
||||||
%% is no user data.
|
%% is no user data.
|
||||||
@ -1542,7 +1420,6 @@ close_and_delete_standalone(#{alias := Alias,
|
|||||||
case get_table_mountpoint(Alias, Name, St) of
|
case get_table_mountpoint(Alias, Name, St) of
|
||||||
{ok, MP} ->
|
{ok, MP} ->
|
||||||
close_and_delete(DbRef, MP),
|
close_and_delete(DbRef, MP),
|
||||||
delete_admin_info(standalone_vsn_and_enc, Alias, Name),
|
|
||||||
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
|
St#st{standalone = maps:remove({Alias,Name}, St#st.standalone)};
|
||||||
error ->
|
error ->
|
||||||
St
|
St
|
||||||
|
39
src/mrdb.erl
39
src/mrdb.erl
@ -1384,15 +1384,7 @@ valid_limit(L) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
write_info(Tab, K, V) ->
|
write_info(Tab, K, V) ->
|
||||||
R = ensure_ref(Tab),
|
#{alias := Alias} = ensure_ref(Tab),
|
||||||
Alias = case R of
|
|
||||||
#{type := standalone, vsn := 1, alias := A} = TRef ->
|
|
||||||
%% Also write on legacy info format
|
|
||||||
write_info_standalone(TRef, K, V),
|
|
||||||
A;
|
|
||||||
#{alias := A} ->
|
|
||||||
A
|
|
||||||
end,
|
|
||||||
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
|
write_info_(ensure_ref({admin, Alias}), Tab, K, V).
|
||||||
|
|
||||||
write_info_(#{} = R, Tab, K, V) ->
|
write_info_(#{} = R, Tab, K, V) ->
|
||||||
@ -1409,13 +1401,8 @@ read_info(Tab, K) ->
|
|||||||
read_info(Tab, K, Default) when K==size; K==memory ->
|
read_info(Tab, K, Default) when K==size; K==memory ->
|
||||||
read_direct_info_(ensure_ref(Tab), K, Default);
|
read_direct_info_(ensure_ref(Tab), K, Default);
|
||||||
read_info(Tab, K, Default) ->
|
read_info(Tab, K, Default) ->
|
||||||
#{alias := Alias} = R = ensure_ref(Tab),
|
#{alias := Alias} = ensure_ref(Tab),
|
||||||
case R of
|
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default).
|
||||||
#{type := standalone, vsn := 1} = TRef ->
|
|
||||||
read_info_standalone(TRef, K, Default);
|
|
||||||
#{alias := Alias} ->
|
|
||||||
mnesia_rocksdb_admin:read_info(Alias, Tab, K, Default)
|
|
||||||
end.
|
|
||||||
|
|
||||||
read_direct_info_(R, memory, _Def) ->
|
read_direct_info_(R, memory, _Def) ->
|
||||||
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
|
get_property(R, <<"rocksdb.total-sst-files-size">>, integer, 0);
|
||||||
@ -1439,26 +1426,6 @@ get_property(#{db_ref := R, cf_handle := CfH}, Prop, Type, Default) ->
|
|||||||
%%rocksdb_boolean(<<"1">>) -> true;
|
%%rocksdb_boolean(<<"1">>) -> true;
|
||||||
%%rocksdb_boolean(<<"0">>) -> false.
|
%%rocksdb_boolean(<<"0">>) -> false.
|
||||||
|
|
||||||
write_info_standalone(#{} = R, K, V) ->
|
|
||||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
|
||||||
EncV = term_to_binary(V),
|
|
||||||
rdb_put(R, EncK, EncV, write_opts(R, [])).
|
|
||||||
|
|
||||||
read_info_standalone(#{} = R, K, Default) ->
|
|
||||||
EncK = <<?INFO_TAG, (encode_key(K, sext))/binary>>,
|
|
||||||
get_info_res(rdb_get(R, EncK, read_opts(R, [])), Default).
|
|
||||||
|
|
||||||
get_info_res(Res, Default) ->
|
|
||||||
case Res of
|
|
||||||
not_found ->
|
|
||||||
Default;
|
|
||||||
{ok, Bin} ->
|
|
||||||
%% no fancy tricks when encoding/decoding info values
|
|
||||||
binary_to_term(Bin);
|
|
||||||
{error, E} ->
|
|
||||||
error(E)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% insert_bag_v2(Ref, K, V, Opts) ->
|
%% insert_bag_v2(Ref, K, V, Opts) ->
|
||||||
%% rdb_merge(Ref, K, {list_append, [V]}
|
%% rdb_merge(Ref, K, {list_append, [V]}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user