diff --git a/ebin/gmhive_client.app b/ebin/gmhive_client.app index e65ce89..3314520 100644 --- a/ebin/gmhive_client.app +++ b/ebin/gmhive_client.app @@ -1,6 +1,6 @@ {application,gmhive_client, [{description,"Gajumaru Hive Client"}, - {vsn,"0.9.1"}, + {vsn,"0.10.0"}, {registered,[]}, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, gmconfig,gmhive_protocol,gmhive_worker]}, diff --git a/src/gmhc_app.erl b/src/gmhc_app.erl index 51fe8f9..ba914d7 100644 --- a/src/gmhc_app.erl +++ b/src/gmhc_app.erl @@ -17,11 +17,11 @@ -spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}. start(Opts) -> application:load(gmhive_client), - {error,_} = application:stop(gmhive_client), + _ = application:stop(gmhive_client), _ = lists:foreach(fun({K, V}) -> application:set_env(gmhive_client, K, V) end, Opts), - application:ensure_all_started(gmhive_client). + application:ensure_all_started(gmhive_client, permanent). start(_StartType, _StartArgs) -> set_things_up(), diff --git a/src/gmhc_connector.erl b/src/gmhc_connector.erl index 24ca6a0..2b2d90f 100644 --- a/src/gmhc_connector.erl +++ b/src/gmhc_connector.erl @@ -57,6 +57,7 @@ , auto_connect = true :: boolean() , econn , connected = false :: boolean() + , status = disconnected :: disconnected | connecting | connected , reconnect = true :: boolean() , reconnect_timer :: timer_ref() | 'undefined' , recache_timer :: reference() | 'undefined' @@ -295,8 +296,9 @@ try_connect_(Opts0, S) -> case try_noise_connect(maps:merge(Opts0, PoolOpts)) of {ok, EConn, Opts1} -> S1 = protocol_connect(Opts1, S#st{ econn = EConn + , status = connecting , reconnect_timer = undefined }), - {ok, S1}; + {ok, S1#st{status = connected}}; {error, _} = Error -> Error end @@ -442,12 +444,14 @@ protocol_connect(Opts, #st{econn = EConn} = S) -> Type = to_atom(opt(type, Opts, [<<"type">>])), RId = erlang:unique_integer(), Vsns = gmhp_msgs:versions(), + Client = client_name(), Protocols = gmhp_msgs:protocols(hd(Vsns)), ConnectReq = #{ protocols => Protocols , versions => Vsns , pool_id => PoolId , pubkey => Pubkey , extra_pubkeys => Extra + , client => Client , type => Type , nonces => gmhc_server:total_nonces() , signature => ""}, @@ -482,7 +486,10 @@ send_connect(EConn, RId, Msg, #{pubkey := Pubkey, notify_connected(S1#st{protocol = P, version = V, opts = Opts1}); #{error := #{code := _, message := ErrMsg}} = ErrReply -> ?LOG_ERROR("Connect error: ~s", [ErrMsg]), - disconnected(S#st.id, ErrReply, S), + %% TODO: fix the flow so that we send one disconnected event, + %% and set the reconnect in the right place. For now, stuff + %% the `reconnect = false`. + disconnected(S#st.id, ErrReply, S#st{reconnect = false}), gmhc_eureka:invalidate_cache(), error(rejected) end @@ -491,6 +498,21 @@ send_connect(EConn, RId, Msg, #{pubkey := Pubkey, error(protocol_connect_timeout) end. +client_name() -> + MyStr = app_string(gmhive_client), + case app_string(gajumine) of + <<>> -> MyStr; + GMStr -> <> + end. + +app_string(App) -> + case application:get_key(App, vsn) of + undefined -> + <<>>; + {ok, Vsn} -> + unicode:characters_to_binary([atom_to_binary(App),"-",Vsn]) + end. + to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8); to_bin(S) -> @@ -507,10 +529,11 @@ connected(Id, Type, S) when Type==worker; Type==monitor -> disconnected(Id, S) -> disconnected(Id, #{}, S). -disconnected(Id, Msg, S) -> - gmhc_events:publish(disconnected, Msg#{id => Id}), +disconnected(_, _, #st{status = disconnected} = S) -> S; +disconnected(Id, Msg, #st{reconnect = Bool} = S) -> + gmhc_events:publish(disconnected, Msg#{id => Id, reconnecting => Bool}), gmhc_server:disconnected(Id), - S#st{connected = false}. + S#st{connected = false, status = disconnected}. opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) -> Bool; diff --git a/src/gmhc_connectors_sup.erl b/src/gmhc_connectors_sup.erl index 67da405..e30f33b 100644 --- a/src/gmhc_connectors_sup.erl +++ b/src/gmhc_connectors_sup.erl @@ -9,10 +9,19 @@ , start_connector/1 , add_restart_info/2 ]). +-export([ sort_restart_info/1 ]). + +-include_lib("kernel/include/logger.hrl"). + start_link() -> case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of {ok, _} = Ok -> - RI = gmhc_watchdog:get_restart_info(), + RI0 = gmhc_watchdog:get_restart_info(), + ?LOG_ERROR("Restart info: ~p", [RI0]), + RI = sort_restart_info(RI0), + RemoveIds = maps:keys(maps:without(maps:keys(RI), RI0)), + gmhc_watchdog:remove_restart_info(RemoveIds), + ?LOG_ERROR("Sorted restart info: ~p", [RI]), maps:foreach(fun restart_connector/2, RI), Ok; Other -> @@ -40,8 +49,8 @@ add_restart_info(Id, Opts) -> init([]) -> Mod = gmhc_connector, SupFlags = #{ strategy => simple_one_for_one - , intensity => 3 - , period => 10 }, + , intensity => 5 + , period => 60 }, ChildSpecs = [ #{ id => Mod , start => {Mod, start_link, []} , type => worker @@ -49,3 +58,13 @@ init([]) -> , shutdown => 5000 , modules => [Mod] } ], {ok, {SupFlags, ChildSpecs}}. + +sort_restart_info(RI) -> + L = lists:sort( + [{Id, {T,H,P}, I} + || {{?MODULE,_} = Id, #{type := T, host := H, port := P} = I} <- maps:to_list(RI)]), + ConnTypes = [CT || {_, CT, _} <- lists:ukeysort(2, L)], + lists:foldl(fun(CT, Acc) -> + {Id, _, I} = lists:keyfind(CT, 2, L), + Acc#{Id => I} + end, #{}, ConnTypes). diff --git a/src/gmhc_handler.erl b/src/gmhc_handler.erl index f32bcd9..79ed001 100644 --- a/src/gmhc_handler.erl +++ b/src/gmhc_handler.erl @@ -28,7 +28,7 @@ -record(st, {pools = [], opts = #{}}). --define(CALL_TIMEOUT, 5000). +-define(CALL_TIMEOUT, 10_000). -include_lib("kernel/include/logger.hrl"). diff --git a/src/gmhc_sup.erl b/src/gmhc_sup.erl index d1e27a4..e419c5a 100644 --- a/src/gmhc_sup.erl +++ b/src/gmhc_sup.erl @@ -20,8 +20,8 @@ init([]) -> , worker(gmhc_handler) , supervisor(gmhc_connectors_sup) ], SupFlags = #{ strategy => rest_for_one - , intensity => 1 - , period => 5 + , intensity => 5 %% We really want the hive client to sort itself out + , period => 5*60 %% Timemout issues can happen infrequently , auto_shutdown => never }, {ok, {SupFlags, ChildSpecs}}. diff --git a/src/gmhc_watchdog.erl b/src/gmhc_watchdog.erl index b02b7b3..36d8213 100644 --- a/src/gmhc_watchdog.erl +++ b/src/gmhc_watchdog.erl @@ -6,6 +6,7 @@ , ping/0 ]). -export([ note_started/2 + , remove_restart_info/1 , get_restart_info/0 ]). -export([ start_link/0 @@ -41,6 +42,11 @@ note_started(Id, Info) -> get_restart_info() -> gen_server:call(?MODULE, get_restart_info). +remove_restart_info([]) -> + ok; +remove_restart_info(IDs) -> + gen_server:call(?MODULE, {remove_restart_info, IDs}). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -50,6 +56,9 @@ init([]) -> handle_call({note_started, Id, Info}, _From, S) -> update_pt(Id, Info), {reply, ok, S}; +handle_call({remove_restart_info, IDs}, _From, S) -> + remove_restart_info_(IDs), + {reply, ok, S}; handle_call(get_restart_info, _From, S) -> {reply, get_pt(), S}; handle_call({watch, Pid, Interval, N}, _From, S) -> @@ -65,9 +74,9 @@ handle_cast(Msg, S) -> ?LOG_DEBUG("Unknown cast: ~p", [Msg]), {noreply, S}. -handle_info({timeout, _, Pid}, S) -> +handle_info({timeout, TRef, Pid}, S) -> ?LOG_INFO("Timeout for pid ~p", [Pid]), - {noreply, ping_timeout(Pid, S)}; + {noreply, ping_timeout(Pid, TRef, S)}; handle_info({'DOWN', _, process, Pid, _}, S) -> {noreply, delete_watch(Pid, S)}; handle_info(Msg, S) -> @@ -108,9 +117,9 @@ delete_watch(Pid, #st{services = Svcs} = S) -> S end. -ping_timeout(Pid, #st{services = Svcs} = S) -> +ping_timeout(Pid, TRef, #st{services = Svcs} = S) -> case maps:find(Pid, Svcs) of - {ok, #svc{ n = N } = Svc} -> + {ok, #svc{ n = N, tref = TRef} = Svc} -> N1 = N-1, if N1 =< 0 -> ?LOG_ERROR("Will exit Pid ~p", [Pid]), @@ -120,7 +129,10 @@ ping_timeout(Pid, #st{services = Svcs} = S) -> Svc1 = restart_timer(Pid, Svc#svc{n = N1}), S#st{services = Svcs#{Pid := Svc1}} end; - error -> + {ok, _} -> + ?LOG_DEBUG("Timeout didn't match TRef - ignoring", []), + S; + _ -> S end. @@ -136,6 +148,15 @@ update_pt(Id, Info) -> Pt = get_pt(), put_pt(Pt#{Id => Info}). +remove_restart_info_(IDs) -> + RI = get_pt(), + case maps:without(IDs, RI) of + RI -> + ok; + NewRI -> + put_pt(NewRI) + end. + get_pt() -> persistent_term:get(pt_key(), #{}). diff --git a/zomp.meta b/zomp.meta index a9569d9..b3392e8 100644 --- a/zomp.meta +++ b/zomp.meta @@ -2,10 +2,10 @@ {type,app}. {modules,[]}. {prefix,"gmhc"}. -{desc,"Gajumaru Hive Client"}. {author,"Ulf Wiger, QPQ AG"}. -{package_id,{"uwiger","gmhive_client",{0,9,1}}}. -{deps,[{"uwiger","gmhive_protocol",{0,2,0}}, +{desc,"Gajumaru Hive Client"}. +{package_id,{"uwiger","gmhive_client",{0,10,0}}}. +{deps,[{"uwiger","gmhive_protocol",{0,3,1}}, {"uwiger","gmhive_worker",{0,5,1}}, {"uwiger","gmcuckoo",{1,2,4}}, {"otpr","eblake2",{1,0,1}},