Fix restart logic and add watchdog
This commit is contained in:
parent
847ffc810a
commit
1f6066705c
@ -1,6 +1,6 @@
|
|||||||
{application,gmhive_client,
|
{application,gmhive_client,
|
||||||
[{description,"Gajumaru Hive Client"},
|
[{description,"Gajumaru Hive Client"},
|
||||||
{vsn,"0.8.3"},
|
{vsn,"0.9.0"},
|
||||||
{registered,[]},
|
{registered,[]},
|
||||||
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
|
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
|
||||||
gmconfig,gmhive_protocol,gmhive_worker]},
|
gmconfig,gmhive_protocol,gmhive_worker]},
|
||||||
|
|||||||
@ -258,7 +258,6 @@ handle_info({timeout, _, recache}, #st{opts = Opts, econn = EConn} = S0) ->
|
|||||||
false ->
|
false ->
|
||||||
{noreply, S};
|
{noreply, S};
|
||||||
true ->
|
true ->
|
||||||
?LOG_NOTICE("Recaching eureka info", []),
|
|
||||||
cache_eureka_info(Opts),
|
cache_eureka_info(Opts),
|
||||||
{noreply, ensure_recache_timer(S)}
|
{noreply, ensure_recache_timer(S)}
|
||||||
end;
|
end;
|
||||||
@ -409,6 +408,7 @@ notify_connected(#st{id = Id, awaiting_connect = Waiters, opts = Opts} = S) ->
|
|||||||
[gen_server:reply(From, ok) || {From, _} <- Waiters],
|
[gen_server:reply(From, ok) || {From, _} <- Waiters],
|
||||||
gmhc_handler:pool_connected(S#st.id, S#st.opts),
|
gmhc_handler:pool_connected(S#st.id, S#st.opts),
|
||||||
cache_eureka_info(Opts),
|
cache_eureka_info(Opts),
|
||||||
|
gmhc_connectors_sup:add_restart_info(Id, Opts),
|
||||||
ensure_recache_timer(S#st{awaiting_connect = []}).
|
ensure_recache_timer(S#st{awaiting_connect = []}).
|
||||||
|
|
||||||
ensure_recache_timer(#st{recache_timer = T} = S) ->
|
ensure_recache_timer(#st{recache_timer = T} = S) ->
|
||||||
|
|||||||
@ -6,10 +6,23 @@
|
|||||||
, init/1 ]).
|
, init/1 ]).
|
||||||
|
|
||||||
-export([ start_first_connector/0
|
-export([ start_first_connector/0
|
||||||
, start_connector/1]).
|
, start_connector/1
|
||||||
|
, add_restart_info/2 ]).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of
|
||||||
|
{ok, _} = Ok ->
|
||||||
|
RI = gmhc_watchdog:get_restart_info(),
|
||||||
|
maps:foreach(fun restart_connector/2, RI),
|
||||||
|
Ok;
|
||||||
|
Other ->
|
||||||
|
Other
|
||||||
|
end.
|
||||||
|
|
||||||
|
restart_connector({?MODULE, _}, Opts) ->
|
||||||
|
start_connector(Opts);
|
||||||
|
restart_connector(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
start_first_connector() ->
|
start_first_connector() ->
|
||||||
start_connector(#{}).
|
start_connector(#{}).
|
||||||
@ -21,6 +34,9 @@ start_connector(Opts0) ->
|
|||||||
end,
|
end,
|
||||||
supervisor:start_child(?MODULE, [Opts]).
|
supervisor:start_child(?MODULE, [Opts]).
|
||||||
|
|
||||||
|
add_restart_info(Id, Opts) ->
|
||||||
|
gmhc_watchdog:note_started({?MODULE, Id}, Opts).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Mod = gmhc_connector,
|
Mod = gmhc_connector,
|
||||||
SupFlags = #{ strategy => simple_one_for_one
|
SupFlags = #{ strategy => simple_one_for_one
|
||||||
|
|||||||
@ -62,7 +62,6 @@ cache_good_address(#{host := _,
|
|||||||
ok ->
|
ok ->
|
||||||
case file:write_file(CacheF, term_to_binary(ToCache)) of
|
case file:write_file(CacheF, term_to_binary(ToCache)) of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG_DEBUG("Cached eureka info in: ~p", [CacheF]),
|
|
||||||
{ok, ToCache};
|
{ok, ToCache};
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
?LOG_DEBUG("Couldn't cache eureka in ~p: ~p", [CacheF, Err]),
|
?LOG_DEBUG("Couldn't cache eureka in ~p: ~p", [CacheF, Err]),
|
||||||
|
|||||||
@ -92,7 +92,9 @@ handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) ->
|
|||||||
monitor ->
|
monitor ->
|
||||||
stop_workers(S1#st.workers), % shouldn't be any running
|
stop_workers(S1#st.workers), % shouldn't be any running
|
||||||
S1#st{workers = [], working = false};
|
S1#st{workers = [], working = false};
|
||||||
worker -> S1#st{working = true}
|
worker ->
|
||||||
|
gmhc_watchdog:watch(5*60_000, 1), %% 5 minutes, one-shot
|
||||||
|
S1#st{working = true}
|
||||||
end,
|
end,
|
||||||
{reply, ok, S2};
|
{reply, ok, S2};
|
||||||
handle_call(_Req, _From, S) ->
|
handle_call(_Req, _From, S) ->
|
||||||
@ -126,6 +128,7 @@ handle_cast({from_pool, #{via := Connector,
|
|||||||
handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
|
handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
|
||||||
?LOG_DEBUG("disconnected: ~p", [Id]),
|
?LOG_DEBUG("disconnected: ~p", [Id]),
|
||||||
Conn1 = maps:remove(Id, Conn),
|
Conn1 = maps:remove(Id, Conn),
|
||||||
|
gmhc_watchdog:unwatch(),
|
||||||
S1 = if map_size(Conn1) == 0 ->
|
S1 = if map_size(Conn1) == 0 ->
|
||||||
Ws = stop_workers(S#st.workers),
|
Ws = stop_workers(S#st.workers),
|
||||||
S#st{connected = Conn1, workers = Ws};
|
S#st{connected = Conn1, workers = Ws};
|
||||||
@ -224,6 +227,7 @@ maybe_request_nonces(S) ->
|
|||||||
nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
|
nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
|
||||||
case Seq == Seq0 of
|
case Seq == Seq0 of
|
||||||
true ->
|
true ->
|
||||||
|
wd_ping(),
|
||||||
#st{candidate = Cand} = S,
|
#st{candidate = Cand} = S,
|
||||||
S#st{candidate = Cand#{nonces => Nonces}};
|
S#st{candidate = Cand#{nonces => Nonces}};
|
||||||
false ->
|
false ->
|
||||||
@ -247,6 +251,7 @@ retry_timeout(Floor, Range) ->
|
|||||||
|
|
||||||
handle_worker_result({worker_result, Result}, W, S) ->
|
handle_worker_result({worker_result, Result}, W, S) ->
|
||||||
%% ?LOG_DEBUG("worker result: ~p", [Result]),
|
%% ?LOG_DEBUG("worker result: ~p", [Result]),
|
||||||
|
wd_ping(),
|
||||||
case Result of
|
case Result of
|
||||||
{solutions, Solutions} ->
|
{solutions, Solutions} ->
|
||||||
{Cont, S1} = report_solutions_(Solutions, W, S),
|
{Cont, S1} = report_solutions_(Solutions, W, S),
|
||||||
@ -438,3 +443,6 @@ worker_result(Pid, Result) ->
|
|||||||
decode_candidate_hash(#{candidate := C} = Cand) ->
|
decode_candidate_hash(#{candidate := C} = Cand) ->
|
||||||
{ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C),
|
{ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C),
|
||||||
Cand#{candidate := Hash}.
|
Cand#{candidate := Hash}.
|
||||||
|
|
||||||
|
wd_ping() ->
|
||||||
|
gmhc_watchdog:ping().
|
||||||
|
|||||||
@ -15,10 +15,11 @@ start_link() ->
|
|||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ChildSpecs = [ worker(gmhc_server)
|
ChildSpecs = [ worker(gmhc_watchdog)
|
||||||
|
, worker(gmhc_server)
|
||||||
, worker(gmhc_handler)
|
, worker(gmhc_handler)
|
||||||
, supervisor(gmhc_connectors_sup) ],
|
, supervisor(gmhc_connectors_sup) ],
|
||||||
SupFlags = #{ strategy => one_for_one
|
SupFlags = #{ strategy => rest_for_one
|
||||||
, intensity => 1
|
, intensity => 1
|
||||||
, period => 5
|
, period => 5
|
||||||
, auto_shutdown => never },
|
, auto_shutdown => never },
|
||||||
|
|||||||
146
src/gmhc_watchdog.erl
Normal file
146
src/gmhc_watchdog.erl
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
-module(gmhc_watchdog).
|
||||||
|
-behavior(gen_server).
|
||||||
|
|
||||||
|
-export([ watch/2
|
||||||
|
, unwatch/0
|
||||||
|
, ping/0 ]).
|
||||||
|
|
||||||
|
-export([ note_started/2
|
||||||
|
, get_restart_info/0 ]).
|
||||||
|
|
||||||
|
-export([ start_link/0
|
||||||
|
, init/1
|
||||||
|
, handle_call/3
|
||||||
|
, handle_cast/2
|
||||||
|
, handle_info/2
|
||||||
|
, terminate/2
|
||||||
|
, code_change/3 ]).
|
||||||
|
|
||||||
|
-record(st, {services = #{}}).
|
||||||
|
-record(svc, { n = 5 :: pos_integer()
|
||||||
|
, n0 = 5 :: pos_integer()
|
||||||
|
, interval = 5000 :: pos_integer()
|
||||||
|
, tref :: reference()
|
||||||
|
, mref :: reference()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
|
watch(Interval, N) ->
|
||||||
|
gen_server:call(?MODULE, {watch, self(), Interval, N}).
|
||||||
|
|
||||||
|
unwatch() ->
|
||||||
|
gen_server:cast({unwatch, self()}).
|
||||||
|
|
||||||
|
ping() ->
|
||||||
|
gen_server:cast(?MODULE, {ping, self()}).
|
||||||
|
|
||||||
|
note_started(Id, Info) ->
|
||||||
|
gen_server:call(?MODULE, {note_started, Id, Info}).
|
||||||
|
|
||||||
|
get_restart_info() ->
|
||||||
|
gen_server:call(?MODULE, get_restart_info).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
{ok, #st{}}.
|
||||||
|
|
||||||
|
handle_call({note_started, Id, Info}, _From, S) ->
|
||||||
|
update_pt(Id, Info),
|
||||||
|
{reply, ok, S};
|
||||||
|
handle_call(get_restart_info, _From, S) ->
|
||||||
|
{reply, get_pt(), S};
|
||||||
|
handle_call({watch, Pid, Interval, N}, _From, S) ->
|
||||||
|
{reply, ok, add_watch(Pid, Interval, N, S)};
|
||||||
|
handle_call(_Req, _From, S) ->
|
||||||
|
{reply, {error, unknown_method}, S}.
|
||||||
|
|
||||||
|
handle_cast({ping, Pid}, S) ->
|
||||||
|
{noreply, reset_watch(Pid, S)};
|
||||||
|
handle_cast({unwatch, Pid}, S) ->
|
||||||
|
{noreply, delete_watch(Pid, S)};
|
||||||
|
handle_cast(Msg, S) ->
|
||||||
|
?LOG_DEBUG("Unknown cast: ~p", [Msg]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info({timeout, _, Pid}, S) ->
|
||||||
|
?LOG_INFO("Timeout for pid ~p", [Pid]),
|
||||||
|
{noreply, ping_timeout(Pid, S)};
|
||||||
|
handle_info({'DOWN', _, process, Pid, _}, S) ->
|
||||||
|
{noreply, delete_watch(Pid, S)};
|
||||||
|
handle_info(Msg, S) ->
|
||||||
|
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_FromVsn, S, _Extra) ->
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
add_watch(Pid, Interval, N, #st{services = Svcs} = S) ->
|
||||||
|
MRef = erlang:monitor(process, Pid),
|
||||||
|
Svc0 = #svc{ interval = Interval
|
||||||
|
, mref = MRef
|
||||||
|
, n = N
|
||||||
|
, n0 = N},
|
||||||
|
Svc = start_timer(Pid, Svc0),
|
||||||
|
S#st{services = Svcs#{Pid => Svc}}.
|
||||||
|
|
||||||
|
reset_watch(Pid, #st{services = Svcs} = S) ->
|
||||||
|
case maps:find(Pid, Svcs) of
|
||||||
|
{ok, #svc{ n0 = N0 } = Svc} ->
|
||||||
|
Svc1 = restart_timer(Pid, Svc#svc{n = N0}),
|
||||||
|
S#st{services = Svcs#{Pid := Svc1}};
|
||||||
|
error ->
|
||||||
|
S
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_watch(Pid, #st{services = Svcs} = S) ->
|
||||||
|
case maps:find(Pid, Svcs) of
|
||||||
|
{ok, #svc{tref = TRef, mref = MRef}} ->
|
||||||
|
erlang:cancel_timer(TRef),
|
||||||
|
erlang:demonitor(MRef),
|
||||||
|
S#st{services = maps:remove(Pid, Svcs)};
|
||||||
|
error ->
|
||||||
|
S
|
||||||
|
end.
|
||||||
|
|
||||||
|
ping_timeout(Pid, #st{services = Svcs} = S) ->
|
||||||
|
case maps:find(Pid, Svcs) of
|
||||||
|
{ok, #svc{ n = N } = Svc} ->
|
||||||
|
N1 = N-1,
|
||||||
|
if N1 =< 0 ->
|
||||||
|
?LOG_ERROR("Will exit Pid ~p", [Pid]),
|
||||||
|
exit(Pid, kill),
|
||||||
|
S#st{services = maps:remove(Pid, Svcs)};
|
||||||
|
true ->
|
||||||
|
Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
|
||||||
|
S#st{services = Svcs#{Pid := Svc1}}
|
||||||
|
end;
|
||||||
|
error ->
|
||||||
|
S
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_timer(Pid, #svc{interval = T} = Svc) ->
|
||||||
|
TRef = erlang:start_timer(T, self(), Pid),
|
||||||
|
Svc#svc{tref = TRef}.
|
||||||
|
|
||||||
|
restart_timer(Pid, #svc{tref = TRef} = Svc) ->
|
||||||
|
erlang:cancel_timer(TRef),
|
||||||
|
start_timer(Pid, Svc#svc{tref = undefined}).
|
||||||
|
|
||||||
|
update_pt(Id, Info) ->
|
||||||
|
Pt = get_pt(),
|
||||||
|
put_pt(Pt#{Id => Info}).
|
||||||
|
|
||||||
|
get_pt() ->
|
||||||
|
persistent_term:get(pt_key(), #{}).
|
||||||
|
|
||||||
|
put_pt(Pt) ->
|
||||||
|
persistent_term:put(pt_key(), Pt).
|
||||||
|
|
||||||
|
pt_key() ->
|
||||||
|
{?MODULE, restart_info}.
|
||||||
@ -2,9 +2,9 @@
|
|||||||
{type,app}.
|
{type,app}.
|
||||||
{modules,[]}.
|
{modules,[]}.
|
||||||
{prefix,"gmhc"}.
|
{prefix,"gmhc"}.
|
||||||
{desc,"Gajumaru Hive Client"}.
|
|
||||||
{author,"Ulf Wiger, QPQ AG"}.
|
{author,"Ulf Wiger, QPQ AG"}.
|
||||||
{package_id,{"uwiger","gmhive_client",{0,8,3}}}.
|
{desc,"Gajumaru Hive Client"}.
|
||||||
|
{package_id,{"uwiger","gmhive_client",{0,9,0}}}.
|
||||||
{deps,[{"uwiger","gmhive_protocol",{0,2,0}},
|
{deps,[{"uwiger","gmhive_protocol",{0,2,0}},
|
||||||
{"uwiger","gmhive_worker",{0,5,1}},
|
{"uwiger","gmhive_worker",{0,5,1}},
|
||||||
{"uwiger","gmcuckoo",{1,2,4}},
|
{"uwiger","gmcuckoo",{1,2,4}},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user