diff --git a/src/mnesia_rocksdb_admin.erl b/src/mnesia_rocksdb_admin.erl index 5c1cd43..aa4662a 100644 --- a/src/mnesia_rocksdb_admin.erl +++ b/src/mnesia_rocksdb_admin.erl @@ -326,7 +326,6 @@ call(Alias, Req, Timeout) -> end. start_link() -> - mrdb_mutex:ensure_tab(), gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> diff --git a/src/mnesia_rocksdb_sup.erl b/src/mnesia_rocksdb_sup.erl index 1b1706a..4d33130 100644 --- a/src/mnesia_rocksdb_sup.erl +++ b/src/mnesia_rocksdb_sup.erl @@ -42,4 +42,5 @@ start_link() -> %% =================================================================== init([]) -> - {ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }. + {ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker) + , ?CHILD(mnesia_rocksdb_params, worker)]} }. diff --git a/src/mrdb_mutex.erl b/src/mrdb_mutex.erl index 97f67eb..8f5777e 100644 --- a/src/mrdb_mutex.erl +++ b/src/mrdb_mutex.erl @@ -3,10 +3,6 @@ -export([ do/2 ]). --export([ ensure_tab/0 ]). - --define(LOCK_TAB, ?MODULE). - -include_lib("eunit/include/eunit.hrl"). %% We use a duplicate_bag ets table as a lock queue, @@ -26,66 +22,12 @@ %% of serialization of requests. do(Rsrc, F) when is_function(F, 0) -> - ets:insert(?LOCK_TAB, {Rsrc, self()}), - case have_lock(Rsrc) of - true -> - try F() - after - release(Rsrc) - end; - false -> - busy_wait(Rsrc, 5000) + {ok, Ref} = mrdb_mutex_serializer:wait(Rsrc), + try F() + after + mrdb_mutex_serializer:done(Rsrc, Ref) end. -have_lock(Rsrc) -> - case ets:lookup(?LOCK_TAB, Rsrc) of - [{_, P}|_] -> - P =:= self(); - [] -> - false - end. - -release(Rsrc) -> - ets:delete_object(?LOCK_TAB, {Rsrc, self()}). - - -%% The busy-wait function makes use of the fact that we can read a timer to find out -%% if it still has time remaining. This reduces the need for selective receive, looking -%% for a timeout message. We yield, then retry the claim op. Yielding at least used to -%% also be necessary for the `read_timer/1` value to refresh. -%% -busy_wait(Rsrc, Timeout) -> - Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}), - do_wait(Rsrc, Ref). - -do_wait(Rsrc, Ref) -> - erlang:yield(), - case erlang:read_timer(Ref) of - false -> - erlang:cancel_timer(Ref), - error(lock_wait_timeout); - _ -> - case have_lock(Rsrc) of - true -> - erlang:cancel_timer(Ref), - ok; - false -> - do_wait(Rsrc, Ref) - end - end. - -%% Called by the process holding the ets table. -ensure_tab() -> - case ets:info(?LOCK_TAB, name) of - undefined -> - ets:new(?LOCK_TAB, [duplicate_bag, public, named_table, - {read_concurrency, true}, - {write_concurrency, true}]); - _ -> - true - end. - - -ifdef(TEST). mutex_test_() -> @@ -97,19 +39,47 @@ mutex_test_() -> ]}. setup() -> - ensure_tab(). + case whereis(mrdb_mutex_serializer) of + undefined -> + {ok, Pid} = mrdb_mutex_serializer:start_link(), + Pid; + Pid -> + Pid + end. -cleanup(_) -> - ets:delete(?LOCK_TAB). +cleanup(Pid) -> + unlink(Pid), + exit(Pid, kill). swarm_do() -> - K = ?LINE, + Rsrc = ?LINE, + Pid = spawn(fun() -> collect([]) end), + L = lists:seq(1, 1000), + Evens = [X || X <- L, is_even(X)], Pids = [spawn_monitor(fun() -> - write_evens(my_rsrc, K, N) - end) || N <- lists:seq(1,25)], + send_even(Rsrc, N, Pid) + end) || N <- lists:seq(1,1000)], await_pids(Pids), - Written = ets:lookup(?LOCK_TAB, K), - true = lists:all(fun is_even/1, [X || {_, X} <- Written]). + Results = fetch(Pid), + {incorrect_results, []} = {incorrect_results, Results -- Evens}, + {missing_correct_results, []} = {missing_correct_results, Evens -- Results}, + ok. + +collect(Acc) -> + receive + {_, result, N} -> + collect([N|Acc]); + {From, fetch} -> + From ! {fetch_reply, Acc}, + done + end. + +fetch(Pid) -> + Pid ! {self(), fetch}, + receive + {fetch_reply, Result} -> + Result + end. is_even(N) -> (N rem 2) =:= 0. @@ -124,11 +94,11 @@ await_pids([{_, MRef}|Pids]) -> await_pids([]) -> ok. -write_evens(Rsrc, K, N) -> +send_even(Rsrc, N, Pid) -> do(Rsrc, fun() -> case is_even(N) of true -> - ets:insert(?LOCK_TAB, {K, N}); + Pid ! {self(), result, N}; false -> exit(not_even) end diff --git a/src/mrdb_mutex_serializer.erl b/src/mrdb_mutex_serializer.erl new file mode 100644 index 0000000..eb2e5ac --- /dev/null +++ b/src/mrdb_mutex_serializer.erl @@ -0,0 +1,98 @@ +-module(mrdb_mutex_serializer). + +-behavior(gen_server). + +-export([wait/1, + done/2]). + +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(st, { queues = #{} + , empty_q = queue:new() }). %% perhaps silly optimization + +wait(Rsrc) -> + gen_server:call(?MODULE, {wait, Rsrc}, infinity). + +done(Rsrc, Ref) -> + gen_server:cast(?MODULE, {done, Rsrc, Ref}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #st{}}. + +handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues + , empty_q = NewQ } = St) -> + MRef = erlang:monitor(process, Pid), + Q0 = maps:get(Rsrc, Queues, NewQ), + WasEmpty = queue:is_empty(Q0), + Q1 = queue:in({From, MRef}, Q0), + St1 = St#st{ queues = Queues#{Rsrc => Q1} }, + case WasEmpty of + true -> + {reply, {ok, MRef}, St1}; + false -> + {noreply, St1} + end. + +handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) -> + case maps:find(Rsrc, Queues) of + {ok, Q} -> + case queue:out(Q) of + {{value, {_From, MRef}}, Q1} -> + erlang:demonitor(MRef), + ok = maybe_dispatch_one(Q1), + {noreply, St#st{ queues = Queues#{Rsrc := Q1} }}; + {_, _} -> + %% Not the lock holder + {noreply, St} + end; + error -> + {noreply, St} + end. + +handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) -> + Queues1 = + maps:map( + fun(_Rsrc, Q) -> + drop_or_filter(Q, MRef) + end, Queues), + {noreply, St#st{ queues = Queues1 }}; +handle_info(_, St) -> + {noreply, St}. + +terminate(_, _) -> + ok. + +code_change(_FromVsn, St, _Extra) -> + {ok, St}. + +%% In this function, we don't actually pop +maybe_dispatch_one(Q) -> + case queue:peek(Q) of + empty -> + ok; + {value, {From, MRef}} -> + gen_server:reply(From, {ok, MRef}), + ok + end. + +drop_or_filter(Q, MRef) -> + case queue:peek(Q) of + {value, {_, MRef}} -> + Q1 = queue:drop(Q), + ok = maybe_dispatch_one(Q1), + Q1; + {value, _Other} -> + queue:filter(fun({_,M}) -> M =/= MRef end, Q); + empty -> + Q + end.