Mutex server with fifo queues

This commit is contained in:
Ulf Wiger 2022-10-25 10:17:35 +02:00
parent 7c729bd932
commit 95abe4e36e
4 changed files with 142 additions and 74 deletions

View File

@ -326,7 +326,6 @@ call(Alias, Req, Timeout) ->
end. end.
start_link() -> start_link() ->
mrdb_mutex:ensure_tab(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) -> init([]) ->

View File

@ -42,4 +42,5 @@ start_link() ->
%% =================================================================== %% ===================================================================
init([]) -> init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(mnesia_rocksdb_params, worker)]} }. {ok, { {one_for_one, 5, 10}, [ ?CHILD(mrdb_mutex_serializer, worker)
, ?CHILD(mnesia_rocksdb_params, worker)]} }.

View File

@ -3,10 +3,6 @@
-export([ do/2 ]). -export([ do/2 ]).
-export([ ensure_tab/0 ]).
-define(LOCK_TAB, ?MODULE).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
%% We use a duplicate_bag ets table as a lock queue, %% We use a duplicate_bag ets table as a lock queue,
@ -26,66 +22,12 @@
%% of serialization of requests. %% of serialization of requests.
do(Rsrc, F) when is_function(F, 0) -> do(Rsrc, F) when is_function(F, 0) ->
ets:insert(?LOCK_TAB, {Rsrc, self()}), {ok, Ref} = mrdb_mutex_serializer:wait(Rsrc),
case have_lock(Rsrc) of try F()
true -> after
try F() mrdb_mutex_serializer:done(Rsrc, Ref)
after
release(Rsrc)
end;
false ->
busy_wait(Rsrc, 5000)
end. end.
have_lock(Rsrc) ->
case ets:lookup(?LOCK_TAB, Rsrc) of
[{_, P}|_] ->
P =:= self();
[] ->
false
end.
release(Rsrc) ->
ets:delete_object(?LOCK_TAB, {Rsrc, self()}).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
%% also be necessary for the `read_timer/1` value to refresh.
%%
busy_wait(Rsrc, Timeout) ->
Ref = erlang:send_after(Timeout, self(), {claim, Rsrc}),
do_wait(Rsrc, Ref).
do_wait(Rsrc, Ref) ->
erlang:yield(),
case erlang:read_timer(Ref) of
false ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case have_lock(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
false ->
do_wait(Rsrc, Ref)
end
end.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [duplicate_bag, public, named_table,
{read_concurrency, true},
{write_concurrency, true}]);
_ ->
true
end.
-ifdef(TEST). -ifdef(TEST).
mutex_test_() -> mutex_test_() ->
@ -97,19 +39,47 @@ mutex_test_() ->
]}. ]}.
setup() -> setup() ->
ensure_tab(). case whereis(mrdb_mutex_serializer) of
undefined ->
{ok, Pid} = mrdb_mutex_serializer:start_link(),
Pid;
Pid ->
Pid
end.
cleanup(_) -> cleanup(Pid) ->
ets:delete(?LOCK_TAB). unlink(Pid),
exit(Pid, kill).
swarm_do() -> swarm_do() ->
K = ?LINE, Rsrc = ?LINE,
Pid = spawn(fun() -> collect([]) end),
L = lists:seq(1, 1000),
Evens = [X || X <- L, is_even(X)],
Pids = [spawn_monitor(fun() -> Pids = [spawn_monitor(fun() ->
write_evens(my_rsrc, K, N) send_even(Rsrc, N, Pid)
end) || N <- lists:seq(1,25)], end) || N <- lists:seq(1,1000)],
await_pids(Pids), await_pids(Pids),
Written = ets:lookup(?LOCK_TAB, K), Results = fetch(Pid),
true = lists:all(fun is_even/1, [X || {_, X} <- Written]). {incorrect_results, []} = {incorrect_results, Results -- Evens},
{missing_correct_results, []} = {missing_correct_results, Evens -- Results},
ok.
collect(Acc) ->
receive
{_, result, N} ->
collect([N|Acc]);
{From, fetch} ->
From ! {fetch_reply, Acc},
done
end.
fetch(Pid) ->
Pid ! {self(), fetch},
receive
{fetch_reply, Result} ->
Result
end.
is_even(N) -> is_even(N) ->
(N rem 2) =:= 0. (N rem 2) =:= 0.
@ -124,11 +94,11 @@ await_pids([{_, MRef}|Pids]) ->
await_pids([]) -> await_pids([]) ->
ok. ok.
write_evens(Rsrc, K, N) -> send_even(Rsrc, N, Pid) ->
do(Rsrc, fun() -> do(Rsrc, fun() ->
case is_even(N) of case is_even(N) of
true -> true ->
ets:insert(?LOCK_TAB, {K, N}); Pid ! {self(), result, N};
false -> false ->
exit(not_even) exit(not_even)
end end

View File

@ -0,0 +1,98 @@
-module(mrdb_mutex_serializer).
-behavior(gen_server).
-export([wait/1,
done/2]).
-export([start_link/0]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(st, { queues = #{}
, empty_q = queue:new() }). %% perhaps silly optimization
wait(Rsrc) ->
gen_server:call(?MODULE, {wait, Rsrc}, infinity).
done(Rsrc, Ref) ->
gen_server:cast(?MODULE, {done, Rsrc, Ref}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({wait, Rsrc}, {Pid, _} = From, #st{ queues = Queues
, empty_q = NewQ } = St) ->
MRef = erlang:monitor(process, Pid),
Q0 = maps:get(Rsrc, Queues, NewQ),
WasEmpty = queue:is_empty(Q0),
Q1 = queue:in({From, MRef}, Q0),
St1 = St#st{ queues = Queues#{Rsrc => Q1} },
case WasEmpty of
true ->
{reply, {ok, MRef}, St1};
false ->
{noreply, St1}
end.
handle_cast({done, Rsrc, MRef}, #st{ queues = Queues } = St) ->
case maps:find(Rsrc, Queues) of
{ok, Q} ->
case queue:out(Q) of
{{value, {_From, MRef}}, Q1} ->
erlang:demonitor(MRef),
ok = maybe_dispatch_one(Q1),
{noreply, St#st{ queues = Queues#{Rsrc := Q1} }};
{_, _} ->
%% Not the lock holder
{noreply, St}
end;
error ->
{noreply, St}
end.
handle_info({'DOWN', MRef, process, _, _}, #st{queues = Queues} = St) ->
Queues1 =
maps:map(
fun(_Rsrc, Q) ->
drop_or_filter(Q, MRef)
end, Queues),
{noreply, St#st{ queues = Queues1 }};
handle_info(_, St) ->
{noreply, St}.
terminate(_, _) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
%% In this function, we don't actually pop
maybe_dispatch_one(Q) ->
case queue:peek(Q) of
empty ->
ok;
{value, {From, MRef}} ->
gen_server:reply(From, {ok, MRef}),
ok
end.
drop_or_filter(Q, MRef) ->
case queue:peek(Q) of
{value, {_, MRef}} ->
Q1 = queue:drop(Q),
ok = maybe_dispatch_one(Q1),
Q1;
{value, _Other} ->
queue:filter(fun({_,M}) -> M =/= MRef end, Q);
empty ->
Q
end.