Formatting improvements

This commit is contained in:
Tino Breddin 2019-11-15 16:31:09 +01:00
parent 63bae181e3
commit a2f863804d

View File

@ -64,13 +64,13 @@
%% schema level callbacks %% schema level callbacks
-export([semantics/2, -export([semantics/2,
check_definition/4, check_definition/4,
create_table/3, create_table/3,
load_table/4, load_table/4,
close_table/2, close_table/2,
sync_close_table/2, sync_close_table/2,
delete_table/2, delete_table/2,
info/3]). info/3]).
%% table synch calls %% table synch calls
-export([sender_init/4, -export([sender_init/4,
@ -214,7 +214,6 @@ register(Alias) ->
default_alias() -> default_alias() ->
rocksdb_copies. rocksdb_copies.
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
%% DEBUG API %% DEBUG API
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------
@ -519,13 +518,13 @@ info(_Alias, Tab, memory) ->
end; end;
info(Alias, Tab, size) -> info(Alias, Tab, size) ->
case retrieve_size(Alias, Tab) of case retrieve_size(Alias, Tab) of
{ok, Size} -> {ok, Size} ->
if Size < 10000 -> ok; if Size < 10000 -> ok;
true -> size_warning(Alias, Tab) true -> size_warning(Alias, Tab)
end, end,
Size; Size;
Error -> Error ->
Error Error
end; end;
info(_Alias, Tab, Item) -> info(_Alias, Tab, Item) ->
case try_read_info(Tab, Item, undefined) of case try_read_info(Tab, Item, undefined) of
@ -537,10 +536,10 @@ info(_Alias, Tab, Item) ->
retrieve_size(_Alias, Tab) -> retrieve_size(_Alias, Tab) ->
case try_read_info(Tab, size, 0) of case try_read_info(Tab, size, 0) of
{ok, Size} -> {ok, Size} ->
{ok, Size}; {ok, Size};
Error -> Error ->
Error Error
end. end.
try_read_info(Tab, Item, Default) -> try_read_info(Tab, Item, Default) ->
@ -637,10 +636,10 @@ first(Alias, Tab) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_first(I) -> i_first(I) ->
case ?rocksdb:iterator_move(I, <<?DATA_START>>) of case ?rocksdb:iterator_move(I, <<?DATA_START>>) of
{ok, First, _} -> {ok, First, _} ->
decode_key(First); decode_key(First);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
%% Not relevant for an ordered_set %% Not relevant for an ordered_set
@ -662,12 +661,12 @@ last(Alias, Tab) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_last(I) -> i_last(I) ->
case ?rocksdb:iterator_move(I, last) of case ?rocksdb:iterator_move(I, last) of
{ok, << ?INFO_TAG, _/binary >>, _} -> {ok, << ?INFO_TAG, _/binary >>, _} ->
'$end_of_table'; '$end_of_table';
{ok, Last, _} -> {ok, Last, _} ->
decode_key(Last); decode_key(Last);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
%% Since we replace the key with [] in the record, we have to put it back %% Since we replace the key with [] in the record, we have to put it back
@ -676,33 +675,34 @@ lookup(Alias, Tab, Key) ->
Enc = encode_key(Key), Enc = encode_key(Key),
{Ref, Type} = call(Alias, Tab, get_ref), {Ref, Type} = call(Alias, Tab, get_ref),
case Type of case Type of
bag -> lookup_bag(Ref, Key, Enc, keypos(Tab)); bag ->
_ -> lookup_bag(Ref, Key, Enc, keypos(Tab));
case ?rocksdb:get(Ref, Enc, []) of _ ->
{ok, EncVal} -> case ?rocksdb:get(Ref, Enc, []) of
[setelement(keypos(Tab), decode_val(EncVal), Key)]; {ok, EncVal} ->
_ -> [setelement(keypos(Tab), decode_val(EncVal), Key)];
[] _ ->
end []
end
end. end.
lookup_bag(Ref, K, Enc, KP) -> lookup_bag(Ref, K, Enc, KP) ->
Sz = byte_size(Enc), Sz = byte_size(Enc),
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc),
K, I, KP) K, I, KP)
end). end).
lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) -> lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) ->
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP); lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP);
lookup_bag_(Sz, Enc, Res, K, I, KP) -> lookup_bag_(Sz, Enc, Res, K, I, KP) ->
case Res of case Res of
{ok, <<Enc:Sz/binary, _:?BAG_CNT>>, V} -> {ok, <<Enc:Sz/binary, _:?BAG_CNT>>, V} ->
[setelement(KP, decode_val(V), K)| [setelement(KP, decode_val(V), K)|
lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)];
_ -> _ ->
[] []
end. end.
match_delete(Alias, Tab, Pat) when is_atom(Pat) -> match_delete(Alias, Tab, Pat) when is_atom(Pat) ->
@ -759,23 +759,23 @@ prev(Alias, Tab, Key0) ->
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_prev(I, Key) -> i_prev(I, Key) ->
case ?rocksdb:iterator_move(I, Key) of case ?rocksdb:iterator_move(I, Key) of
{ok, _, _} -> {ok, _, _} ->
i_move_to_prev(I, Key); i_move_to_prev(I, Key);
{error, invalid_iterator} -> {error, invalid_iterator} ->
i_last(I) i_last(I)
end. end.
%% PRIVATE ITERATOR %% PRIVATE ITERATOR
i_move_to_prev(I, Key) -> i_move_to_prev(I, Key) ->
case ?rocksdb:iterator_move(I, prev) of case ?rocksdb:iterator_move(I, prev) of
{ok, << ?INFO_TAG, _/binary >>, _} -> {ok, << ?INFO_TAG, _/binary >>, _} ->
'$end_of_table'; '$end_of_table';
{ok, Prev, _} when Prev < Key -> {ok, Prev, _} when Prev < Key ->
decode_key(Prev); decode_key(Prev);
{ok, _, _} -> {ok, _, _} ->
i_move_to_prev(I, Key); i_move_to_prev(I, Key);
_ -> _ ->
'$end_of_table' '$end_of_table'
end. end.
repair_continuation(Cont, _Ms) -> repair_continuation(Cont, _Ms) ->
@ -831,22 +831,22 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) ->
do_update_counter(C, Val, Ref, St) -> do_update_counter(C, Val, Ref, St) ->
Enc = encode_key(C), Enc = encode_key(C),
case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of
{ok, EncVal} -> {ok, EncVal} ->
case decode_val(EncVal) of case decode_val(EncVal) of
{_, _, Old} = Rec when is_integer(Old) -> {_, _, Old} = Rec when is_integer(Old) ->
Res = Old+Val, Res = Old+Val,
return_catch( return_catch(
fun() -> fun() ->
db_put(Ref, Enc, db_put(Ref, Enc,
encode_val( encode_val(
setelement(3, Rec, Res)), setelement(3, Rec, Res)),
[], St) [], St)
end); end);
_ -> _ ->
badarg badarg
end; end;
_ -> _ ->
badarg badarg
end. end.
%% PRIVATE %% PRIVATE
@ -1208,15 +1208,14 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St
do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) ->
return_catch(fun() -> db_put(Ref, K, V, [], St) end); return_catch(fun() -> db_put(Ref, K, V, [], St) end);
do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
IsNew = IsNew = case ?rocksdb:get(Ref, K, []) of
case ?rocksdb:get(Ref, K, []) of {ok, _} ->
{ok, _} -> false;
false; _ ->
_ -> true
true end,
end,
case IsNew of case IsNew of
true -> true ->
return_catch( return_catch(
fun() -> fun() ->
NewSz = read_info(size, 0, Ets) + 1, NewSz = read_info(size, 0, Ets) + 1,
@ -1226,8 +1225,8 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
write, [Ref, L, []], St), % may throw write, [Ref, L, []], St), % may throw
ets_insert_info(Ets, size, NewSz) ets_insert_info(Ets, size, NewSz)
end); end);
false -> false ->
return_catch(fun() -> db_put(Ref, K, V, [], St) end) return_catch(fun() -> db_put(Ref, K, V, [], St) end)
end, end,
ok. ok.
@ -1235,9 +1234,9 @@ do_insert_bag(Ref, K, V, CurSz, St) ->
KSz = byte_size(K), KSz = byte_size(K),
with_iterator( with_iterator(
Ref, fun(I) -> Ref, fun(I) ->
do_insert_bag_( do_insert_bag_(
KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St)
end). end).
%% There's a potential access pattern that would force counters to %% There's a potential access pattern that would force counters to
@ -1245,21 +1244,21 @@ do_insert_bag(Ref, K, V, CurSz, St) ->
%% with compaction. TODO. %% with compaction. TODO.
do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG ->
case Res of case Res of
{ok, <<K:Sz/binary, _:?BAG_CNT>>, V} -> {ok, <<K:Sz/binary, _:?BAG_CNT>>, V} ->
%% object exists %% object exists
TSz; TSz;
{ok, <<K:Sz/binary, N:?BAG_CNT>>, _} -> {ok, <<K:Sz/binary, N:?BAG_CNT>>, _} ->
do_insert_bag_( do_insert_bag_(
Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St);
_ when TSz =:= false -> _ when TSz =:= false ->
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
db_put(Ref, Key, V, [], St); db_put(Ref, Key, V, [], St);
_ -> _ ->
NewSz = TSz + 1, NewSz = TSz + 1,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
Key = <<K/binary, (Prev+1):?BAG_CNT>>, Key = <<K/binary, (Prev+1):?BAG_CNT>>,
db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St),
NewSz NewSz
end. end.
%% server-side part %% server-side part
@ -1279,7 +1278,7 @@ do_delete(Key, #st{ref = Ref, maintain_size = false} = St) ->
do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of
{ok, _} -> {ok, _} ->
return_catch( return_catch(
fun() -> fun() ->
NewSz = CurSz -1, NewSz = CurSz -1,
@ -1287,85 +1286,84 @@ do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) ->
ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St),
ets_insert_info(Ets, size, NewSz) ets_insert_info(Ets, size, NewSz)
end); end);
not_found -> not_found ->
false false
end. end.
do_delete_bag(Sz, Key, Ref, TSz, St) -> do_delete_bag(Sz, Key, Ref, TSz, St) ->
Found = Found = with_iterator(
with_iterator( Ref, fun(I) ->
Ref, fun(I) -> do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key),
do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), Ref, I)
Ref, I) end),
end),
case {Found, TSz} of case {Found, TSz} of
{[], _} -> {[], _} ->
TSz; TSz;
{_, false} -> {_, false} ->
db_write(Ref, [{delete, K} || K <- Found], [], St); db_write(Ref, [{delete, K} || K <- Found], [], St);
{_, _} -> {_, _} ->
N = length(Found), N = length(Found),
NewSz = TSz - N, NewSz = TSz - N,
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
db_write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Found]], [], St), [{delete, K} || K <- Found]], [], St),
NewSz NewSz
end. end.
do_delete_bag_(Sz, K, Res, Ref, I) -> do_delete_bag_(Sz, K, Res, Ref, I) ->
case Res of case Res of
{ok, K, _} -> {ok, K, _} ->
do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next),
Ref, I); Ref, I);
{ok, <<K:Sz/binary, _:?BAG_CNT>> = Key, _} -> {ok, <<K:Sz/binary, _:?BAG_CNT>> = Key, _} ->
[Key | [Key |
do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next),
Ref, I)]; Ref, I)];
_ -> _ ->
[] []
end. end.
do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type,
maintain_size = MaintainSize} = St) -> maintain_size = MaintainSize} = St) ->
Fun = fun(_, Key, Acc) -> [Key|Acc] end, Fun = fun(_, Key, Acc) -> [Key|Acc] end,
Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30),
case {Keys, MaintainSize} of case {Keys, MaintainSize} of
{[], _} -> {[], _} ->
ok; ok;
{_, false} -> {_, false} ->
db_write(Ref, [{delete, K} || K <- Keys], [], St), db_write(Ref, [{delete, K} || K <- Keys], [], St),
ok; ok;
{_, true} -> {_, true} ->
CurSz = read_info(size, 0, Ets), CurSz = read_info(size, 0, Ets),
NewSz = max(CurSz - length(Keys), 0), NewSz = max(CurSz - length(Keys), 0),
{Ki, Vi} = info_obj(size, NewSz), {Ki, Vi} = info_obj(size, NewSz),
db_write(Ref, [{put, Ki, Vi} | db_write(Ref, [{put, Ki, Vi} |
[{delete, K} || K <- Keys]], [], St), [{delete, K} || K <- Keys]], [], St),
ets_insert_info(Ets, size, NewSz), ets_insert_info(Ets, size, NewSz),
ok ok
end. end.
recover_size_info(#st{ ref = Ref recover_size_info(#st{ ref = Ref
, tab = Tab , tab = Tab
, type = Type , type = Type
, maintain_size = MaintainSize , maintain_size = MaintainSize
} = St) -> } = St) ->
%% TODO: shall_update_size_info is obsolete, remove %% TODO: shall_update_size_info is obsolete, remove
case shall_update_size_info(Tab) of case shall_update_size_info(Tab) of
true -> true ->
Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end,
0, [{'_',[],['$_']}], 3), 0, [{'_',[],['$_']}], 3),
write_info_(size, Sz, St); write_info_(size, Sz, St);
false -> false ->
case MaintainSize of case MaintainSize of
true -> true ->
%% info initialized by rocksdb_to_ets/2 %% info initialized by rocksdb_to_ets/2
%% TODO: if there is no stored size, recompute it %% TODO: if there is no stored size, recompute it
ignore; ignore;
false -> false ->
%% size is not maintained, ensure it's marked accordingly %% size is not maintained, ensure it's marked accordingly
delete_info_(size, St) delete_info_(size, St)
end end
end, end,
St. St.
@ -1453,15 +1451,14 @@ do_select(Ref, Tab, _Type, MS, AccKeys, Limit) when is_boolean(AccKeys) ->
i_do_select(I, #sel{keypat = Pfx, i_do_select(I, #sel{keypat = Pfx,
compiled_ms = MS, compiled_ms = MS,
limit = Limit} = Sel, AccKeys, Acc) -> limit = Limit} = Sel, AccKeys, Acc) ->
StartKey = StartKey = case Pfx of
case Pfx of <<>> ->
<<>> -> <<?DATA_START>>;
<<?DATA_START>>; _ ->
_ -> Pfx
Pfx end,
end,
select_traverse(?rocksdb:iterator_move(I, StartKey), Limit, select_traverse(?rocksdb:iterator_move(I, StartKey), Limit,
Pfx, MS, I, Sel, AccKeys, Acc). Pfx, MS, I, Sel, AccKeys, Acc).
needs_key_only([{HP,_,Body}]) -> needs_key_only([{HP,_,Body}]) ->
BodyVars = lists:flatmap(fun extract_vars/1, Body), BodyVars = lists:flatmap(fun extract_vars/1, Body),
@ -1525,18 +1522,18 @@ map_vars([], _) ->
select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel, select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel,
AccKeys, Acc) -> AccKeys, Acc) ->
case is_prefix(Pfx, K) of case is_prefix(Pfx, K) of
true -> true ->
Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)),
case ets:match_spec_run([Rec], MS) of case ets:match_spec_run([Rec], MS) of
[] -> [] ->
select_traverse( select_traverse(
?rocksdb:iterator_move(I, next), Limit, Pfx, MS, ?rocksdb:iterator_move(I, next), Limit, Pfx, MS,
I, Sel, AccKeys, Acc); I, Sel, AccKeys, Acc);
[Match] -> [Match] ->
Acc1 = if AccKeys -> Acc1 = if AccKeys ->
[{K, Match}|Acc]; [{K, Match}|Acc];
true -> true ->
[Match|Acc] [Match|Acc]
end, end,
traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1)
end; end;
@ -1563,22 +1560,22 @@ decr(infinity) ->
traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) ->
{lists:reverse(Acc), {lists:reverse(Acc),
fun() -> fun() ->
with_iterator(Ref, with_iterator(Ref,
fun(NewI) -> fun(NewI) ->
select_traverse(iterator_next(NewI, K), select_traverse(iterator_next(NewI, K),
Limit, Pfx, MS, NewI, Sel, Limit, Pfx, MS, NewI, Sel,
AccKeys, []) AccKeys, [])
end) end)
end}; end};
traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) ->
select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc).
iterator_next(I, K) -> iterator_next(I, K) ->
case ?rocksdb:iterator_move(I, K) of case ?rocksdb:iterator_move(I, K) of
{ok, K, _} -> {ok, K, _} ->
?rocksdb:iterator_move(I, next); ?rocksdb:iterator_move(I, next);
Other -> Other ->
Other Other
end. end.
keypat([H|T], KeyPos) -> keypat([H|T], KeyPos) ->
@ -1668,9 +1665,9 @@ valid_mnesia_op(Op) ->
; Op==warning ; Op==warning
; Op==error ; Op==error
; Op==fatal -> ; Op==fatal ->
true; true;
true -> true ->
false false
end. end.
%% ---------------------------------------------------------------------------- %% ----------------------------------------------------------------------------