Files
gmhive_client/src/gmhc_watchdog.erl
T

168 lines
4.4 KiB
Erlang

-module(gmhc_watchdog).
-behavior(gen_server).
-export([ watch/2
, unwatch/0
, ping/0 ]).
-export([ note_started/2
, remove_restart_info/1
, get_restart_info/0 ]).
-export([ start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3 ]).
-record(st, {services = #{}}).
-record(svc, { n = 5 :: pos_integer()
, n0 = 5 :: pos_integer()
, interval = 5000 :: pos_integer()
, tref :: reference()
, mref :: reference()
}).
-include_lib("kernel/include/logger.hrl").
watch(Interval, N) ->
gen_server:call(?MODULE, {watch, self(), Interval, N}).
unwatch() ->
gen_server:cast(?MODULE, {unwatch, self()}).
ping() ->
gen_server:cast(?MODULE, {ping, self()}).
note_started(Id, Info) ->
gen_server:call(?MODULE, {note_started, Id, Info}).
get_restart_info() ->
gen_server:call(?MODULE, get_restart_info).
remove_restart_info([]) ->
ok;
remove_restart_info(IDs) ->
gen_server:call(?MODULE, {remove_restart_info, IDs}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({note_started, Id, Info}, _From, S) ->
update_pt(Id, Info),
{reply, ok, S};
handle_call({remove_restart_info, IDs}, _From, S) ->
remove_restart_info_(IDs),
{reply, ok, S};
handle_call(get_restart_info, _From, S) ->
{reply, get_pt(), S};
handle_call({watch, Pid, Interval, N}, _From, S) ->
{reply, ok, add_watch(Pid, Interval, N, S)};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_method}, S}.
handle_cast({ping, Pid}, S) ->
{noreply, reset_watch(Pid, S)};
handle_cast({unwatch, Pid}, S) ->
{noreply, delete_watch(Pid, S)};
handle_cast(Msg, S) ->
?LOG_DEBUG("Unknown cast: ~p", [Msg]),
{noreply, S}.
handle_info({timeout, TRef, Pid}, S) ->
?LOG_INFO("Timeout for pid ~p", [Pid]),
{noreply, ping_timeout(Pid, TRef, S)};
handle_info({'DOWN', _, process, Pid, _}, S) ->
{noreply, delete_watch(Pid, S)};
handle_info(Msg, S) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, S}.
terminate(_, _) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
add_watch(Pid, Interval, N, #st{services = Svcs} = S) ->
MRef = erlang:monitor(process, Pid),
Svc0 = #svc{ interval = Interval
, mref = MRef
, n = N
, n0 = N},
Svc = start_timer(Pid, Svc0),
S#st{services = Svcs#{Pid => Svc}}.
reset_watch(Pid, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{ n0 = N0 } = Svc} ->
Svc1 = restart_timer(Pid, Svc#svc{n = N0}),
S#st{services = Svcs#{Pid := Svc1}};
error ->
S
end.
delete_watch(Pid, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{tref = TRef, mref = MRef}} ->
erlang:cancel_timer(TRef),
erlang:demonitor(MRef),
S#st{services = maps:remove(Pid, Svcs)};
error ->
S
end.
ping_timeout(Pid, TRef, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{ n = N, tref = TRef} = Svc} ->
N1 = N-1,
if N1 =< 0 ->
?LOG_ERROR("Will exit Pid ~p", [Pid]),
exit(Pid, kill),
S#st{services = maps:remove(Pid, Svcs)};
true ->
Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
S#st{services = Svcs#{Pid := Svc1}}
end;
{ok, _} ->
?LOG_DEBUG("Timeout didn't match TRef - ignoring", []),
S;
_ ->
S
end.
start_timer(Pid, #svc{interval = T} = Svc) ->
TRef = erlang:start_timer(T, self(), Pid),
Svc#svc{tref = TRef}.
restart_timer(Pid, #svc{tref = TRef} = Svc) ->
erlang:cancel_timer(TRef),
start_timer(Pid, Svc#svc{tref = undefined}).
update_pt(Id, Info) ->
Pt = get_pt(),
put_pt(Pt#{Id => Info}).
remove_restart_info_(IDs) ->
RI = get_pt(),
case maps:without(IDs, RI) of
RI ->
ok;
NewRI ->
put_pt(NewRI)
end.
get_pt() ->
persistent_term:get(pt_key(), #{}).
put_pt(Pt) ->
persistent_term:put(pt_key(), Pt).
pt_key() ->
{?MODULE, restart_info}.