Use a serializing mutex

This commit is contained in:
Ulf Wiger 2022-10-24 14:38:43 +02:00
parent 4489e5d743
commit 7c729bd932

View File

@ -7,40 +7,48 @@
-define(LOCK_TAB, ?MODULE).
%% We use a wrapping ets counter (default: 0) as a form of semaphor.
%% The claim operation is done using an atomic list of two updates:
%% first, incrementing with 0 - this returns the previous value
%% then, incrementing with 1, but wrapping at 1, ensuring that we get 1 back,
%% regardless of previous value. This means that if [0,1] is returned, the resource
%% was not locked previously; if [1,1] is returned, it was.
-include_lib("eunit/include/eunit.hrl").
%% We use a duplicate_bag ets table as a lock queue,
%% relying on the characteristic that a lookup on a key (the resource name)
%% returns the objects in the order in which they were inserted.
%% We try to claim the lock by inserting our own pid under the Rrsc key, then
%% checking which pid is at the head of the list. If it's our pid, we have the
%% lock, and proceed with calling our fun, then delecting our table entry.
%% If another pid is at the head of the list, we busy-wait on the table.
%%
%% Releasing the resource is done by deleting the resource. If we just decrement,
%% we will end up with lingering unlocked resources, so we might as well delete.
%% Either operation is atomic, and the claim op creates the object if it's missing.
%% Another, perhaps cheaper, way of implementing a mutex would be to use a counter
%% object, but we also care about avoiding starvation, and this way, we get a form
%% of serialization of requests.
do(Rsrc, F) when is_function(F, 0) ->
true = claim(Rsrc),
try F()
after
release(Rsrc)
ets:insert(?LOCK_TAB, {Rsrc, self()}),
case have_lock(Rsrc) of
true ->
try F()
after
release(Rsrc)
end;
false ->
busy_wait(Rsrc, 5000)
end.
claim(Rsrc) ->
case claim_(Rsrc) of
true -> true;
false -> busy_wait(Rsrc, 1000)
end.
claim_(Rsrc) ->
case ets:update_counter(?LOCK_TAB, Rsrc, [{2, 0},
{2, 1, 1, 1}], {Rsrc, 0}) of
[0, 1] ->
%% have lock
true;
[1, 1] ->
have_lock(Rsrc) ->
case ets:lookup(?LOCK_TAB, Rsrc) of
[{_, P}|_] ->
P =:= self();
[] ->
false
end.
release(Rsrc) ->
ets:delete_object(?LOCK_TAB, {Rsrc, self()}).
%% The busy-wait function makes use of the fact that we can read a timer to find out
%% if it still has time remaining. This reduces the need for selective receive, looking
%% for a timeout message. We yield, then retry the claim op. Yielding at least used to
@ -57,7 +65,7 @@ do_wait(Rsrc, Ref) ->
erlang:cancel_timer(Ref),
error(lock_wait_timeout);
_ ->
case claim_(Rsrc) of
case have_lock(Rsrc) of
true ->
erlang:cancel_timer(Ref),
ok;
@ -66,16 +74,64 @@ do_wait(Rsrc, Ref) ->
end
end.
release(Rsrc) ->
ets:delete(?LOCK_TAB, Rsrc),
ok.
%% Called by the process holding the ets table.
ensure_tab() ->
case ets:info(?LOCK_TAB, name) of
undefined ->
ets:new(?LOCK_TAB, [set, public, named_table, {write_concurrency, true}]);
ets:new(?LOCK_TAB, [duplicate_bag, public, named_table,
{read_concurrency, true},
{write_concurrency, true}]);
_ ->
true
end.
-ifdef(TEST).
mutex_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
{"Check that all operations complete", fun swarm_do/0}
]}.
setup() ->
ensure_tab().
cleanup(_) ->
ets:delete(?LOCK_TAB).
swarm_do() ->
K = ?LINE,
Pids = [spawn_monitor(fun() ->
write_evens(my_rsrc, K, N)
end) || N <- lists:seq(1,25)],
await_pids(Pids),
Written = ets:lookup(?LOCK_TAB, K),
true = lists:all(fun is_even/1, [X || {_, X} <- Written]).
is_even(N) ->
(N rem 2) =:= 0.
await_pids([{_, MRef}|Pids]) ->
receive
{'DOWN', MRef, _, _, _} ->
await_pids(Pids)
after 10000 ->
error(timeout)
end;
await_pids([]) ->
ok.
write_evens(Rsrc, K, N) ->
do(Rsrc, fun() ->
case is_even(N) of
true ->
ets:insert(?LOCK_TAB, {K, N});
false ->
exit(not_even)
end
end).
-endif.