diff --git a/ebin/gmhive_client.app b/ebin/gmhive_client.app index 7fb10f6..88ab795 100644 --- a/ebin/gmhive_client.app +++ b/ebin/gmhive_client.app @@ -1,6 +1,6 @@ {application,gmhive_client, [{description,"Gajumaru Hive Client"}, - {vsn,"0.8.3"}, + {vsn,"0.9.0"}, {registered,[]}, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, gmconfig,gmhive_protocol,gmhive_worker]}, diff --git a/src/gmhc_connector.erl b/src/gmhc_connector.erl index 294d6c9..24ca6a0 100644 --- a/src/gmhc_connector.erl +++ b/src/gmhc_connector.erl @@ -258,7 +258,6 @@ handle_info({timeout, _, recache}, #st{opts = Opts, econn = EConn} = S0) -> false -> {noreply, S}; true -> - ?LOG_NOTICE("Recaching eureka info", []), cache_eureka_info(Opts), {noreply, ensure_recache_timer(S)} end; @@ -409,6 +408,7 @@ notify_connected(#st{id = Id, awaiting_connect = Waiters, opts = Opts} = S) -> [gen_server:reply(From, ok) || {From, _} <- Waiters], gmhc_handler:pool_connected(S#st.id, S#st.opts), cache_eureka_info(Opts), + gmhc_connectors_sup:add_restart_info(Id, Opts), ensure_recache_timer(S#st{awaiting_connect = []}). ensure_recache_timer(#st{recache_timer = T} = S) -> diff --git a/src/gmhc_connectors_sup.erl b/src/gmhc_connectors_sup.erl index 3996d71..67da405 100644 --- a/src/gmhc_connectors_sup.erl +++ b/src/gmhc_connectors_sup.erl @@ -6,10 +6,23 @@ , init/1 ]). -export([ start_first_connector/0 - , start_connector/1]). + , start_connector/1 + , add_restart_info/2 ]). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). + case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of + {ok, _} = Ok -> + RI = gmhc_watchdog:get_restart_info(), + maps:foreach(fun restart_connector/2, RI), + Ok; + Other -> + Other + end. + +restart_connector({?MODULE, _}, Opts) -> + start_connector(Opts); +restart_connector(_, _) -> + ok. start_first_connector() -> start_connector(#{}). @@ -21,6 +34,9 @@ start_connector(Opts0) -> end, supervisor:start_child(?MODULE, [Opts]). +add_restart_info(Id, Opts) -> + gmhc_watchdog:note_started({?MODULE, Id}, Opts). + init([]) -> Mod = gmhc_connector, SupFlags = #{ strategy => simple_one_for_one diff --git a/src/gmhc_eureka.erl b/src/gmhc_eureka.erl index 9059ed7..ec0657b 100644 --- a/src/gmhc_eureka.erl +++ b/src/gmhc_eureka.erl @@ -62,7 +62,6 @@ cache_good_address(#{host := _, ok -> case file:write_file(CacheF, term_to_binary(ToCache)) of ok -> - ?LOG_DEBUG("Cached eureka info in: ~p", [CacheF]), {ok, ToCache}; {error, _} = Err -> ?LOG_DEBUG("Couldn't cache eureka in ~p: ~p", [CacheF, Err]), diff --git a/src/gmhc_server.erl b/src/gmhc_server.erl index ae51c57..8a6444c 100644 --- a/src/gmhc_server.erl +++ b/src/gmhc_server.erl @@ -92,7 +92,9 @@ handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) -> monitor -> stop_workers(S1#st.workers), % shouldn't be any running S1#st{workers = [], working = false}; - worker -> S1#st{working = true} + worker -> + gmhc_watchdog:watch(5*60_000, 1), %% 5 minutes, one-shot + S1#st{working = true} end, {reply, ok, S2}; handle_call(_Req, _From, S) -> @@ -126,6 +128,7 @@ handle_cast({from_pool, #{via := Connector, handle_cast({disconnected, Id}, #st{connected = Conn} = S) -> ?LOG_DEBUG("disconnected: ~p", [Id]), Conn1 = maps:remove(Id, Conn), + gmhc_watchdog:unwatch(), S1 = if map_size(Conn1) == 0 -> Ws = stop_workers(S#st.workers), S#st{connected = Conn1, workers = Ws}; @@ -224,6 +227,7 @@ maybe_request_nonces(S) -> nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) -> case Seq == Seq0 of true -> + wd_ping(), #st{candidate = Cand} = S, S#st{candidate = Cand#{nonces => Nonces}}; false -> @@ -247,6 +251,7 @@ retry_timeout(Floor, Range) -> handle_worker_result({worker_result, Result}, W, S) -> %% ?LOG_DEBUG("worker result: ~p", [Result]), + wd_ping(), case Result of {solutions, Solutions} -> {Cont, S1} = report_solutions_(Solutions, W, S), @@ -438,3 +443,6 @@ worker_result(Pid, Result) -> decode_candidate_hash(#{candidate := C} = Cand) -> {ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C), Cand#{candidate := Hash}. + +wd_ping() -> + gmhc_watchdog:ping(). diff --git a/src/gmhc_sup.erl b/src/gmhc_sup.erl index a2723bf..d1e27a4 100644 --- a/src/gmhc_sup.erl +++ b/src/gmhc_sup.erl @@ -15,10 +15,11 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ChildSpecs = [ worker(gmhc_server) + ChildSpecs = [ worker(gmhc_watchdog) + , worker(gmhc_server) , worker(gmhc_handler) , supervisor(gmhc_connectors_sup) ], - SupFlags = #{ strategy => one_for_one + SupFlags = #{ strategy => rest_for_one , intensity => 1 , period => 5 , auto_shutdown => never }, diff --git a/src/gmhc_watchdog.erl b/src/gmhc_watchdog.erl new file mode 100644 index 0000000..302b867 --- /dev/null +++ b/src/gmhc_watchdog.erl @@ -0,0 +1,146 @@ +-module(gmhc_watchdog). +-behavior(gen_server). + +-export([ watch/2 + , unwatch/0 + , ping/0 ]). + +-export([ note_started/2 + , get_restart_info/0 ]). + +-export([ start_link/0 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 ]). + +-record(st, {services = #{}}). +-record(svc, { n = 5 :: pos_integer() + , n0 = 5 :: pos_integer() + , interval = 5000 :: pos_integer() + , tref :: reference() + , mref :: reference() + }). + +-include_lib("kernel/include/logger.hrl"). + +watch(Interval, N) -> + gen_server:call(?MODULE, {watch, self(), Interval, N}). + +unwatch() -> + gen_server:cast({unwatch, self()}). + +ping() -> + gen_server:cast(?MODULE, {ping, self()}). + +note_started(Id, Info) -> + gen_server:call(?MODULE, {note_started, Id, Info}). + +get_restart_info() -> + gen_server:call(?MODULE, get_restart_info). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #st{}}. + +handle_call({note_started, Id, Info}, _From, S) -> + update_pt(Id, Info), + {reply, ok, S}; +handle_call(get_restart_info, _From, S) -> + {reply, get_pt(), S}; +handle_call({watch, Pid, Interval, N}, _From, S) -> + {reply, ok, add_watch(Pid, Interval, N, S)}; +handle_call(_Req, _From, S) -> + {reply, {error, unknown_method}, S}. + +handle_cast({ping, Pid}, S) -> + {noreply, reset_watch(Pid, S)}; +handle_cast({unwatch, Pid}, S) -> + {noreply, delete_watch(Pid, S)}; +handle_cast(Msg, S) -> + ?LOG_DEBUG("Unknown cast: ~p", [Msg]), + {noreply, S}. + +handle_info({timeout, _, Pid}, S) -> + ?LOG_INFO("Timeout for pid ~p", [Pid]), + {noreply, ping_timeout(Pid, S)}; +handle_info({'DOWN', _, process, Pid, _}, S) -> + {noreply, delete_watch(Pid, S)}; +handle_info(Msg, S) -> + ?LOG_DEBUG("Unknown msg: ~p", [Msg]), + {noreply, S}. + +terminate(_, _) -> + ok. + +code_change(_FromVsn, S, _Extra) -> + {ok, S}. + +add_watch(Pid, Interval, N, #st{services = Svcs} = S) -> + MRef = erlang:monitor(process, Pid), + Svc0 = #svc{ interval = Interval + , mref = MRef + , n = N + , n0 = N}, + Svc = start_timer(Pid, Svc0), + S#st{services = Svcs#{Pid => Svc}}. + +reset_watch(Pid, #st{services = Svcs} = S) -> + case maps:find(Pid, Svcs) of + {ok, #svc{ n0 = N0 } = Svc} -> + Svc1 = restart_timer(Pid, Svc#svc{n = N0}), + S#st{services = Svcs#{Pid := Svc1}}; + error -> + S + end. + +delete_watch(Pid, #st{services = Svcs} = S) -> + case maps:find(Pid, Svcs) of + {ok, #svc{tref = TRef, mref = MRef}} -> + erlang:cancel_timer(TRef), + erlang:demonitor(MRef), + S#st{services = maps:remove(Pid, Svcs)}; + error -> + S + end. + +ping_timeout(Pid, #st{services = Svcs} = S) -> + case maps:find(Pid, Svcs) of + {ok, #svc{ n = N } = Svc} -> + N1 = N-1, + if N1 =< 0 -> + ?LOG_ERROR("Will exit Pid ~p", [Pid]), + exit(Pid, kill), + S#st{services = maps:remove(Pid, Svcs)}; + true -> + Svc1 = restart_timer(Pid, Svc#svc{n = N1}), + S#st{services = Svcs#{Pid := Svc1}} + end; + error -> + S + end. + +start_timer(Pid, #svc{interval = T} = Svc) -> + TRef = erlang:start_timer(T, self(), Pid), + Svc#svc{tref = TRef}. + +restart_timer(Pid, #svc{tref = TRef} = Svc) -> + erlang:cancel_timer(TRef), + start_timer(Pid, Svc#svc{tref = undefined}). + +update_pt(Id, Info) -> + Pt = get_pt(), + put_pt(Pt#{Id => Info}). + +get_pt() -> + persistent_term:get(pt_key(), #{}). + +put_pt(Pt) -> + persistent_term:put(pt_key(), Pt). + +pt_key() -> + {?MODULE, restart_info}. diff --git a/zomp.meta b/zomp.meta index 2f3f445..c03430c 100644 --- a/zomp.meta +++ b/zomp.meta @@ -2,9 +2,9 @@ {type,app}. {modules,[]}. {prefix,"gmhc"}. -{desc,"Gajumaru Hive Client"}. {author,"Ulf Wiger, QPQ AG"}. -{package_id,{"uwiger","gmhive_client",{0,8,3}}}. +{desc,"Gajumaru Hive Client"}. +{package_id,{"uwiger","gmhive_client",{0,9,0}}}. {deps,[{"uwiger","gmhive_protocol",{0,2,0}}, {"uwiger","gmhive_worker",{0,5,1}}, {"uwiger","gmcuckoo",{1,2,4}},