diff --git a/src/mnesia_rocksdb.erl b/src/mnesia_rocksdb.erl index 02bb474..df58cea 100644 --- a/src/mnesia_rocksdb.erl +++ b/src/mnesia_rocksdb.erl @@ -64,13 +64,13 @@ %% schema level callbacks -export([semantics/2, - check_definition/4, - create_table/3, - load_table/4, - close_table/2, - sync_close_table/2, - delete_table/2, - info/3]). + check_definition/4, + create_table/3, + load_table/4, + close_table/2, + sync_close_table/2, + delete_table/2, + info/3]). %% table synch calls -export([sender_init/4, @@ -214,7 +214,6 @@ register(Alias) -> default_alias() -> rocksdb_copies. - %% ---------------------------------------------------------------------------- %% DEBUG API %% ---------------------------------------------------------------------------- @@ -519,13 +518,13 @@ info(_Alias, Tab, memory) -> end; info(Alias, Tab, size) -> case retrieve_size(Alias, Tab) of - {ok, Size} -> - if Size < 10000 -> ok; - true -> size_warning(Alias, Tab) - end, - Size; - Error -> - Error + {ok, Size} -> + if Size < 10000 -> ok; + true -> size_warning(Alias, Tab) + end, + Size; + Error -> + Error end; info(_Alias, Tab, Item) -> case try_read_info(Tab, Item, undefined) of @@ -537,10 +536,10 @@ info(_Alias, Tab, Item) -> retrieve_size(_Alias, Tab) -> case try_read_info(Tab, size, 0) of - {ok, Size} -> - {ok, Size}; - Error -> - Error + {ok, Size} -> + {ok, Size}; + Error -> + Error end. try_read_info(Tab, Item, Default) -> @@ -637,10 +636,10 @@ first(Alias, Tab) -> %% PRIVATE ITERATOR i_first(I) -> case ?rocksdb:iterator_move(I, <>) of - {ok, First, _} -> - decode_key(First); - _ -> - '$end_of_table' + {ok, First, _} -> + decode_key(First); + _ -> + '$end_of_table' end. %% Not relevant for an ordered_set @@ -662,12 +661,12 @@ last(Alias, Tab) -> %% PRIVATE ITERATOR i_last(I) -> case ?rocksdb:iterator_move(I, last) of - {ok, << ?INFO_TAG, _/binary >>, _} -> - '$end_of_table'; - {ok, Last, _} -> - decode_key(Last); - _ -> - '$end_of_table' + {ok, << ?INFO_TAG, _/binary >>, _} -> + '$end_of_table'; + {ok, Last, _} -> + decode_key(Last); + _ -> + '$end_of_table' end. %% Since we replace the key with [] in the record, we have to put it back @@ -676,33 +675,34 @@ lookup(Alias, Tab, Key) -> Enc = encode_key(Key), {Ref, Type} = call(Alias, Tab, get_ref), case Type of - bag -> lookup_bag(Ref, Key, Enc, keypos(Tab)); - _ -> - case ?rocksdb:get(Ref, Enc, []) of - {ok, EncVal} -> - [setelement(keypos(Tab), decode_val(EncVal), Key)]; - _ -> - [] - end + bag -> + lookup_bag(Ref, Key, Enc, keypos(Tab)); + _ -> + case ?rocksdb:get(Ref, Enc, []) of + {ok, EncVal} -> + [setelement(keypos(Tab), decode_val(EncVal), Key)]; + _ -> + [] + end end. lookup_bag(Ref, K, Enc, KP) -> Sz = byte_size(Enc), with_iterator( Ref, fun(I) -> - lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), - K, I, KP) - end). + lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, Enc), + K, I, KP) + end). lookup_bag_(Sz, Enc, {ok, Enc, _}, K, I, KP) -> lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP); lookup_bag_(Sz, Enc, Res, K, I, KP) -> case Res of - {ok, <>, V} -> - [setelement(KP, decode_val(V), K)| - lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; - _ -> - [] + {ok, <>, V} -> + [setelement(KP, decode_val(V), K)| + lookup_bag_(Sz, Enc, ?rocksdb:iterator_move(I, next), K, I, KP)]; + _ -> + [] end. match_delete(Alias, Tab, Pat) when is_atom(Pat) -> @@ -759,23 +759,23 @@ prev(Alias, Tab, Key0) -> %% PRIVATE ITERATOR i_prev(I, Key) -> case ?rocksdb:iterator_move(I, Key) of - {ok, _, _} -> - i_move_to_prev(I, Key); - {error, invalid_iterator} -> - i_last(I) + {ok, _, _} -> + i_move_to_prev(I, Key); + {error, invalid_iterator} -> + i_last(I) end. %% PRIVATE ITERATOR i_move_to_prev(I, Key) -> case ?rocksdb:iterator_move(I, prev) of - {ok, << ?INFO_TAG, _/binary >>, _} -> - '$end_of_table'; - {ok, Prev, _} when Prev < Key -> - decode_key(Prev); - {ok, _, _} -> - i_move_to_prev(I, Key); - _ -> - '$end_of_table' + {ok, << ?INFO_TAG, _/binary >>, _} -> + '$end_of_table'; + {ok, Prev, _} when Prev < Key -> + decode_key(Prev); + {ok, _, _} -> + i_move_to_prev(I, Key); + _ -> + '$end_of_table' end. repair_continuation(Cont, _Ms) -> @@ -831,22 +831,22 @@ update_counter(Alias, Tab, C, Val) when is_integer(Val) -> do_update_counter(C, Val, Ref, St) -> Enc = encode_key(C), case ?rocksdb:get(Ref, Enc, [{fill_cache, true}]) of - {ok, EncVal} -> - case decode_val(EncVal) of - {_, _, Old} = Rec when is_integer(Old) -> - Res = Old+Val, - return_catch( + {ok, EncVal} -> + case decode_val(EncVal) of + {_, _, Old} = Rec when is_integer(Old) -> + Res = Old+Val, + return_catch( fun() -> db_put(Ref, Enc, encode_val( setelement(3, Rec, Res)), [], St) end); - _ -> - badarg - end; - _ -> - badarg + _ -> + badarg + end; + _ -> + badarg end. %% PRIVATE @@ -1208,15 +1208,14 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, type = bag, maintain_size = true} = St do_insert(K, V, #st{ref = Ref, maintain_size = false} = St) -> return_catch(fun() -> db_put(Ref, K, V, [], St) end); do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> - IsNew = - case ?rocksdb:get(Ref, K, []) of - {ok, _} -> - false; - _ -> - true - end, + IsNew = case ?rocksdb:get(Ref, K, []) of + {ok, _} -> + false; + _ -> + true + end, case IsNew of - true -> + true -> return_catch( fun() -> NewSz = read_info(size, 0, Ets) + 1, @@ -1226,8 +1225,8 @@ do_insert(K, V, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> write, [Ref, L, []], St), % may throw ets_insert_info(Ets, size, NewSz) end); - false -> - return_catch(fun() -> db_put(Ref, K, V, [], St) end) + false -> + return_catch(fun() -> db_put(Ref, K, V, [], St) end) end, ok. @@ -1235,9 +1234,9 @@ do_insert_bag(Ref, K, V, CurSz, St) -> KSz = byte_size(K), with_iterator( Ref, fun(I) -> - do_insert_bag_( - KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) - end). + do_insert_bag_( + KSz, K, ?rocksdb:iterator_move(I, K), I, V, 0, Ref, CurSz, St) + end). %% There's a potential access pattern that would force counters to @@ -1245,21 +1244,21 @@ do_insert_bag(Ref, K, V, CurSz, St) -> %% with compaction. TODO. do_insert_bag_(Sz, K, Res, I, V, Prev, Ref, TSz, St) when Prev < ?MAX_BAG -> case Res of - {ok, <>, V} -> - %% object exists - TSz; - {ok, <>, _} -> - do_insert_bag_( - Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); - _ when TSz =:= false -> - Key = <>, - db_put(Ref, Key, V, [], St); - _ -> - NewSz = TSz + 1, - {Ki, Vi} = info_obj(size, NewSz), - Key = <>, - db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), - NewSz + {ok, <>, V} -> + %% object exists + TSz; + {ok, <>, _} -> + do_insert_bag_( + Sz, K, ?rocksdb:iterator_move(I, next), I, V, N, Ref, TSz, St); + _ when TSz =:= false -> + Key = <>, + db_put(Ref, Key, V, [], St); + _ -> + NewSz = TSz + 1, + {Ki, Vi} = info_obj(size, NewSz), + Key = <>, + db_write(Ref, [{put, Ki, Vi}, {put, Key, V}], [], St), + NewSz end. %% server-side part @@ -1279,7 +1278,7 @@ do_delete(Key, #st{ref = Ref, maintain_size = false} = St) -> do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> CurSz = read_info(size, 0, Ets), case ?rocksdb:get(Ref, Key, [{fill_cache,true}]) of - {ok, _} -> + {ok, _} -> return_catch( fun() -> NewSz = CurSz -1, @@ -1287,85 +1286,84 @@ do_delete(Key, #st{ets = Ets, ref = Ref, maintain_size = true} = St) -> ok = db_write(Ref, [{delete, Key}, {put, Ki, Vi}], [], St), ets_insert_info(Ets, size, NewSz) end); - not_found -> - false + not_found -> + false end. do_delete_bag(Sz, Key, Ref, TSz, St) -> - Found = - with_iterator( - Ref, fun(I) -> - do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), - Ref, I) - end), + Found = with_iterator( + Ref, fun(I) -> + do_delete_bag_(Sz, Key, ?rocksdb:iterator_move(I, Key), + Ref, I) + end), case {Found, TSz} of - {[], _} -> - TSz; - {_, false} -> - db_write(Ref, [{delete, K} || K <- Found], [], St); - {_, _} -> - N = length(Found), - NewSz = TSz - N, - {Ki, Vi} = info_obj(size, NewSz), - db_write(Ref, [{put, Ki, Vi} | + {[], _} -> + TSz; + {_, false} -> + db_write(Ref, [{delete, K} || K <- Found], [], St); + {_, _} -> + N = length(Found), + NewSz = TSz - N, + {Ki, Vi} = info_obj(size, NewSz), + db_write(Ref, [{put, Ki, Vi} | [{delete, K} || K <- Found]], [], St), - NewSz + NewSz end. do_delete_bag_(Sz, K, Res, Ref, I) -> case Res of - {ok, K, _} -> - do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), - Ref, I); - {ok, <> = Key, _} -> - [Key | - do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), - Ref, I)]; - _ -> - [] + {ok, K, _} -> + do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), + Ref, I); + {ok, <> = Key, _} -> + [Key | + do_delete_bag_(Sz, K, ?rocksdb:iterator_move(I, next), + Ref, I)]; + _ -> + [] end. do_match_delete(Pat, #st{ets = Ets, ref = Ref, tab = Tab, type = Type, - maintain_size = MaintainSize} = St) -> + maintain_size = MaintainSize} = St) -> Fun = fun(_, Key, Acc) -> [Key|Acc] end, Keys = do_fold(Ref, Tab, Type, Fun, [], [{Pat,[],['$_']}], 30), case {Keys, MaintainSize} of - {[], _} -> - ok; - {_, false} -> - db_write(Ref, [{delete, K} || K <- Keys], [], St), - ok; - {_, true} -> - CurSz = read_info(size, 0, Ets), - NewSz = max(CurSz - length(Keys), 0), - {Ki, Vi} = info_obj(size, NewSz), - db_write(Ref, [{put, Ki, Vi} | + {[], _} -> + ok; + {_, false} -> + db_write(Ref, [{delete, K} || K <- Keys], [], St), + ok; + {_, true} -> + CurSz = read_info(size, 0, Ets), + NewSz = max(CurSz - length(Keys), 0), + {Ki, Vi} = info_obj(size, NewSz), + db_write(Ref, [{put, Ki, Vi} | [{delete, K} || K <- Keys]], [], St), - ets_insert_info(Ets, size, NewSz), - ok + ets_insert_info(Ets, size, NewSz), + ok end. recover_size_info(#st{ ref = Ref - , tab = Tab - , type = Type - , maintain_size = MaintainSize - } = St) -> + , tab = Tab + , type = Type + , maintain_size = MaintainSize + } = St) -> %% TODO: shall_update_size_info is obsolete, remove case shall_update_size_info(Tab) of - true -> - Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, - 0, [{'_',[],['$_']}], 3), - write_info_(size, Sz, St); - false -> - case MaintainSize of - true -> - %% info initialized by rocksdb_to_ets/2 - %% TODO: if there is no stored size, recompute it - ignore; - false -> - %% size is not maintained, ensure it's marked accordingly - delete_info_(size, St) - end + true -> + Sz = do_fold(Ref, Tab, Type, fun(_, Acc) -> Acc+1 end, + 0, [{'_',[],['$_']}], 3), + write_info_(size, Sz, St); + false -> + case MaintainSize of + true -> + %% info initialized by rocksdb_to_ets/2 + %% TODO: if there is no stored size, recompute it + ignore; + false -> + %% size is not maintained, ensure it's marked accordingly + delete_info_(size, St) + end end, St. @@ -1453,15 +1451,14 @@ do_select(Ref, Tab, _Type, MS, AccKeys, Limit) when is_boolean(AccKeys) -> i_do_select(I, #sel{keypat = Pfx, compiled_ms = MS, limit = Limit} = Sel, AccKeys, Acc) -> - StartKey = - case Pfx of - <<>> -> - <>; - _ -> - Pfx - end, + StartKey = case Pfx of + <<>> -> + <>; + _ -> + Pfx + end, select_traverse(?rocksdb:iterator_move(I, StartKey), Limit, - Pfx, MS, I, Sel, AccKeys, Acc). + Pfx, MS, I, Sel, AccKeys, Acc). needs_key_only([{HP,_,Body}]) -> BodyVars = lists:flatmap(fun extract_vars/1, Body), @@ -1525,18 +1522,18 @@ map_vars([], _) -> select_traverse({ok, K, V}, Limit, Pfx, MS, I, #sel{tab = Tab} = Sel, AccKeys, Acc) -> case is_prefix(Pfx, K) of - true -> - Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), - case ets:match_spec_run([Rec], MS) of - [] -> - select_traverse( - ?rocksdb:iterator_move(I, next), Limit, Pfx, MS, - I, Sel, AccKeys, Acc); - [Match] -> + true -> + Rec = setelement(keypos(Tab), decode_val(V), decode_key(K)), + case ets:match_spec_run([Rec], MS) of + [] -> + select_traverse( + ?rocksdb:iterator_move(I, next), Limit, Pfx, MS, + I, Sel, AccKeys, Acc); + [Match] -> Acc1 = if AccKeys -> - [{K, Match}|Acc]; + [{K, Match}|Acc]; true -> - [Match|Acc] + [Match|Acc] end, traverse_continue(K, decr(Limit), Pfx, MS, I, Sel, AccKeys, Acc1) end; @@ -1563,22 +1560,22 @@ decr(infinity) -> traverse_continue(K, 0, Pfx, MS, _I, #sel{limit = Limit, ref = Ref} = Sel, AccKeys, Acc) -> {lists:reverse(Acc), fun() -> - with_iterator(Ref, - fun(NewI) -> + with_iterator(Ref, + fun(NewI) -> select_traverse(iterator_next(NewI, K), Limit, Pfx, MS, NewI, Sel, AccKeys, []) - end) + end) end}; traverse_continue(_K, Limit, Pfx, MS, I, Sel, AccKeys, Acc) -> select_traverse(?rocksdb:iterator_move(I, next), Limit, Pfx, MS, I, Sel, AccKeys, Acc). iterator_next(I, K) -> case ?rocksdb:iterator_move(I, K) of - {ok, K, _} -> - ?rocksdb:iterator_move(I, next); - Other -> - Other + {ok, K, _} -> + ?rocksdb:iterator_move(I, next); + Other -> + Other end. keypat([H|T], KeyPos) -> @@ -1668,9 +1665,9 @@ valid_mnesia_op(Op) -> ; Op==warning ; Op==error ; Op==fatal -> - true; + true; true -> - false + false end. %% ----------------------------------------------------------------------------