Compare commits
4 Commits
0e4b6d7873
...
e85df7c27c
| Author | SHA1 | Date | |
|---|---|---|---|
| e85df7c27c | |||
|
|
2078ae7115 | ||
|
|
8e795da00d | ||
|
|
bfccda2c3f |
@ -1,6 +1,6 @@
|
|||||||
{application,gmhive_client,
|
{application,gmhive_client,
|
||||||
[{description,"Gajumaru Hive Client"},
|
[{description,"Gajumaru Hive Client"},
|
||||||
{vsn,"0.9.1"},
|
{vsn,"0.10.0"},
|
||||||
{registered,[]},
|
{registered,[]},
|
||||||
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
|
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
|
||||||
gmconfig,gmhive_protocol,gmhive_worker]},
|
gmconfig,gmhive_protocol,gmhive_worker]},
|
||||||
|
|||||||
@ -17,11 +17,11 @@
|
|||||||
-spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}.
|
-spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}.
|
||||||
start(Opts) ->
|
start(Opts) ->
|
||||||
application:load(gmhive_client),
|
application:load(gmhive_client),
|
||||||
{error,_} = application:stop(gmhive_client),
|
_ = application:stop(gmhive_client),
|
||||||
_ = lists:foreach(fun({K, V}) ->
|
_ = lists:foreach(fun({K, V}) ->
|
||||||
application:set_env(gmhive_client, K, V)
|
application:set_env(gmhive_client, K, V)
|
||||||
end, Opts),
|
end, Opts),
|
||||||
application:ensure_all_started(gmhive_client).
|
application:ensure_all_started(gmhive_client, permanent).
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
set_things_up(),
|
set_things_up(),
|
||||||
|
|||||||
@ -57,6 +57,7 @@
|
|||||||
, auto_connect = true :: boolean()
|
, auto_connect = true :: boolean()
|
||||||
, econn
|
, econn
|
||||||
, connected = false :: boolean()
|
, connected = false :: boolean()
|
||||||
|
, status = disconnected :: disconnected | connecting | connected
|
||||||
, reconnect = true :: boolean()
|
, reconnect = true :: boolean()
|
||||||
, reconnect_timer :: timer_ref() | 'undefined'
|
, reconnect_timer :: timer_ref() | 'undefined'
|
||||||
, recache_timer :: reference() | 'undefined'
|
, recache_timer :: reference() | 'undefined'
|
||||||
@ -295,8 +296,9 @@ try_connect_(Opts0, S) ->
|
|||||||
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
|
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
|
||||||
{ok, EConn, Opts1} ->
|
{ok, EConn, Opts1} ->
|
||||||
S1 = protocol_connect(Opts1, S#st{ econn = EConn
|
S1 = protocol_connect(Opts1, S#st{ econn = EConn
|
||||||
|
, status = connecting
|
||||||
, reconnect_timer = undefined }),
|
, reconnect_timer = undefined }),
|
||||||
{ok, S1};
|
{ok, S1#st{status = connected}};
|
||||||
{error, _} = Error ->
|
{error, _} = Error ->
|
||||||
Error
|
Error
|
||||||
end
|
end
|
||||||
@ -442,12 +444,14 @@ protocol_connect(Opts, #st{econn = EConn} = S) ->
|
|||||||
Type = to_atom(opt(type, Opts, [<<"type">>])),
|
Type = to_atom(opt(type, Opts, [<<"type">>])),
|
||||||
RId = erlang:unique_integer(),
|
RId = erlang:unique_integer(),
|
||||||
Vsns = gmhp_msgs:versions(),
|
Vsns = gmhp_msgs:versions(),
|
||||||
|
Client = client_name(),
|
||||||
Protocols = gmhp_msgs:protocols(hd(Vsns)),
|
Protocols = gmhp_msgs:protocols(hd(Vsns)),
|
||||||
ConnectReq = #{ protocols => Protocols
|
ConnectReq = #{ protocols => Protocols
|
||||||
, versions => Vsns
|
, versions => Vsns
|
||||||
, pool_id => PoolId
|
, pool_id => PoolId
|
||||||
, pubkey => Pubkey
|
, pubkey => Pubkey
|
||||||
, extra_pubkeys => Extra
|
, extra_pubkeys => Extra
|
||||||
|
, client => Client
|
||||||
, type => Type
|
, type => Type
|
||||||
, nonces => gmhc_server:total_nonces()
|
, nonces => gmhc_server:total_nonces()
|
||||||
, signature => ""},
|
, signature => ""},
|
||||||
@ -482,7 +486,10 @@ send_connect(EConn, RId, Msg, #{pubkey := Pubkey,
|
|||||||
notify_connected(S1#st{protocol = P, version = V, opts = Opts1});
|
notify_connected(S1#st{protocol = P, version = V, opts = Opts1});
|
||||||
#{error := #{code := _, message := ErrMsg}} = ErrReply ->
|
#{error := #{code := _, message := ErrMsg}} = ErrReply ->
|
||||||
?LOG_ERROR("Connect error: ~s", [ErrMsg]),
|
?LOG_ERROR("Connect error: ~s", [ErrMsg]),
|
||||||
disconnected(S#st.id, ErrReply, S),
|
%% TODO: fix the flow so that we send one disconnected event,
|
||||||
|
%% and set the reconnect in the right place. For now, stuff
|
||||||
|
%% the `reconnect = false`.
|
||||||
|
disconnected(S#st.id, ErrReply, S#st{reconnect = false}),
|
||||||
gmhc_eureka:invalidate_cache(),
|
gmhc_eureka:invalidate_cache(),
|
||||||
error(rejected)
|
error(rejected)
|
||||||
end
|
end
|
||||||
@ -491,6 +498,21 @@ send_connect(EConn, RId, Msg, #{pubkey := Pubkey,
|
|||||||
error(protocol_connect_timeout)
|
error(protocol_connect_timeout)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
client_name() ->
|
||||||
|
MyStr = app_string(gmhive_client),
|
||||||
|
case app_string(gajumine) of
|
||||||
|
<<>> -> MyStr;
|
||||||
|
GMStr -> <<MyStr/binary,",",GMStr/binary>>
|
||||||
|
end.
|
||||||
|
|
||||||
|
app_string(App) ->
|
||||||
|
case application:get_key(App, vsn) of
|
||||||
|
undefined ->
|
||||||
|
<<>>;
|
||||||
|
{ok, Vsn} ->
|
||||||
|
unicode:characters_to_binary([atom_to_binary(App),"-",Vsn])
|
||||||
|
end.
|
||||||
|
|
||||||
to_bin(A) when is_atom(A) ->
|
to_bin(A) when is_atom(A) ->
|
||||||
atom_to_binary(A, utf8);
|
atom_to_binary(A, utf8);
|
||||||
to_bin(S) ->
|
to_bin(S) ->
|
||||||
@ -507,10 +529,11 @@ connected(Id, Type, S) when Type==worker; Type==monitor ->
|
|||||||
disconnected(Id, S) ->
|
disconnected(Id, S) ->
|
||||||
disconnected(Id, #{}, S).
|
disconnected(Id, #{}, S).
|
||||||
|
|
||||||
disconnected(Id, Msg, S) ->
|
disconnected(_, _, #st{status = disconnected} = S) -> S;
|
||||||
gmhc_events:publish(disconnected, Msg#{id => Id}),
|
disconnected(Id, Msg, #st{reconnect = Bool} = S) ->
|
||||||
|
gmhc_events:publish(disconnected, Msg#{id => Id, reconnecting => Bool}),
|
||||||
gmhc_server:disconnected(Id),
|
gmhc_server:disconnected(Id),
|
||||||
S#st{connected = false}.
|
S#st{connected = false, status = disconnected}.
|
||||||
|
|
||||||
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
|
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
|
||||||
Bool;
|
Bool;
|
||||||
|
|||||||
@ -9,10 +9,19 @@
|
|||||||
, start_connector/1
|
, start_connector/1
|
||||||
, add_restart_info/2 ]).
|
, add_restart_info/2 ]).
|
||||||
|
|
||||||
|
-export([ sort_restart_info/1 ]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of
|
case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of
|
||||||
{ok, _} = Ok ->
|
{ok, _} = Ok ->
|
||||||
RI = gmhc_watchdog:get_restart_info(),
|
RI0 = gmhc_watchdog:get_restart_info(),
|
||||||
|
?LOG_ERROR("Restart info: ~p", [RI0]),
|
||||||
|
RI = sort_restart_info(RI0),
|
||||||
|
RemoveIds = maps:keys(maps:without(maps:keys(RI), RI0)),
|
||||||
|
gmhc_watchdog:remove_restart_info(RemoveIds),
|
||||||
|
?LOG_ERROR("Sorted restart info: ~p", [RI]),
|
||||||
maps:foreach(fun restart_connector/2, RI),
|
maps:foreach(fun restart_connector/2, RI),
|
||||||
Ok;
|
Ok;
|
||||||
Other ->
|
Other ->
|
||||||
@ -40,8 +49,8 @@ add_restart_info(Id, Opts) ->
|
|||||||
init([]) ->
|
init([]) ->
|
||||||
Mod = gmhc_connector,
|
Mod = gmhc_connector,
|
||||||
SupFlags = #{ strategy => simple_one_for_one
|
SupFlags = #{ strategy => simple_one_for_one
|
||||||
, intensity => 3
|
, intensity => 5
|
||||||
, period => 10 },
|
, period => 60 },
|
||||||
ChildSpecs = [ #{ id => Mod
|
ChildSpecs = [ #{ id => Mod
|
||||||
, start => {Mod, start_link, []}
|
, start => {Mod, start_link, []}
|
||||||
, type => worker
|
, type => worker
|
||||||
@ -49,3 +58,13 @@ init([]) ->
|
|||||||
, shutdown => 5000
|
, shutdown => 5000
|
||||||
, modules => [Mod] } ],
|
, modules => [Mod] } ],
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
|
sort_restart_info(RI) ->
|
||||||
|
L = lists:sort(
|
||||||
|
[{Id, {T,H,P}, I}
|
||||||
|
|| {{?MODULE,_} = Id, #{type := T, host := H, port := P} = I} <- maps:to_list(RI)]),
|
||||||
|
ConnTypes = [CT || {_, CT, _} <- lists:ukeysort(2, L)],
|
||||||
|
lists:foldl(fun(CT, Acc) ->
|
||||||
|
{Id, _, I} = lists:keyfind(CT, 2, L),
|
||||||
|
Acc#{Id => I}
|
||||||
|
end, #{}, ConnTypes).
|
||||||
|
|||||||
@ -28,7 +28,7 @@
|
|||||||
|
|
||||||
-record(st, {pools = [], opts = #{}}).
|
-record(st, {pools = [], opts = #{}}).
|
||||||
|
|
||||||
-define(CALL_TIMEOUT, 5000).
|
-define(CALL_TIMEOUT, 10_000).
|
||||||
|
|
||||||
-include_lib("kernel/include/logger.hrl").
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
|
|||||||
@ -20,8 +20,8 @@ init([]) ->
|
|||||||
, worker(gmhc_handler)
|
, worker(gmhc_handler)
|
||||||
, supervisor(gmhc_connectors_sup) ],
|
, supervisor(gmhc_connectors_sup) ],
|
||||||
SupFlags = #{ strategy => rest_for_one
|
SupFlags = #{ strategy => rest_for_one
|
||||||
, intensity => 1
|
, intensity => 5 %% We really want the hive client to sort itself out
|
||||||
, period => 5
|
, period => 5*60 %% Timemout issues can happen infrequently
|
||||||
, auto_shutdown => never },
|
, auto_shutdown => never },
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,7 @@
|
|||||||
, ping/0 ]).
|
, ping/0 ]).
|
||||||
|
|
||||||
-export([ note_started/2
|
-export([ note_started/2
|
||||||
|
, remove_restart_info/1
|
||||||
, get_restart_info/0 ]).
|
, get_restart_info/0 ]).
|
||||||
|
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
@ -41,6 +42,11 @@ note_started(Id, Info) ->
|
|||||||
get_restart_info() ->
|
get_restart_info() ->
|
||||||
gen_server:call(?MODULE, get_restart_info).
|
gen_server:call(?MODULE, get_restart_info).
|
||||||
|
|
||||||
|
remove_restart_info([]) ->
|
||||||
|
ok;
|
||||||
|
remove_restart_info(IDs) ->
|
||||||
|
gen_server:call(?MODULE, {remove_restart_info, IDs}).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
@ -50,6 +56,9 @@ init([]) ->
|
|||||||
handle_call({note_started, Id, Info}, _From, S) ->
|
handle_call({note_started, Id, Info}, _From, S) ->
|
||||||
update_pt(Id, Info),
|
update_pt(Id, Info),
|
||||||
{reply, ok, S};
|
{reply, ok, S};
|
||||||
|
handle_call({remove_restart_info, IDs}, _From, S) ->
|
||||||
|
remove_restart_info_(IDs),
|
||||||
|
{reply, ok, S};
|
||||||
handle_call(get_restart_info, _From, S) ->
|
handle_call(get_restart_info, _From, S) ->
|
||||||
{reply, get_pt(), S};
|
{reply, get_pt(), S};
|
||||||
handle_call({watch, Pid, Interval, N}, _From, S) ->
|
handle_call({watch, Pid, Interval, N}, _From, S) ->
|
||||||
@ -65,9 +74,9 @@ handle_cast(Msg, S) ->
|
|||||||
?LOG_DEBUG("Unknown cast: ~p", [Msg]),
|
?LOG_DEBUG("Unknown cast: ~p", [Msg]),
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
handle_info({timeout, _, Pid}, S) ->
|
handle_info({timeout, TRef, Pid}, S) ->
|
||||||
?LOG_INFO("Timeout for pid ~p", [Pid]),
|
?LOG_INFO("Timeout for pid ~p", [Pid]),
|
||||||
{noreply, ping_timeout(Pid, S)};
|
{noreply, ping_timeout(Pid, TRef, S)};
|
||||||
handle_info({'DOWN', _, process, Pid, _}, S) ->
|
handle_info({'DOWN', _, process, Pid, _}, S) ->
|
||||||
{noreply, delete_watch(Pid, S)};
|
{noreply, delete_watch(Pid, S)};
|
||||||
handle_info(Msg, S) ->
|
handle_info(Msg, S) ->
|
||||||
@ -108,9 +117,9 @@ delete_watch(Pid, #st{services = Svcs} = S) ->
|
|||||||
S
|
S
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ping_timeout(Pid, #st{services = Svcs} = S) ->
|
ping_timeout(Pid, TRef, #st{services = Svcs} = S) ->
|
||||||
case maps:find(Pid, Svcs) of
|
case maps:find(Pid, Svcs) of
|
||||||
{ok, #svc{ n = N } = Svc} ->
|
{ok, #svc{ n = N, tref = TRef} = Svc} ->
|
||||||
N1 = N-1,
|
N1 = N-1,
|
||||||
if N1 =< 0 ->
|
if N1 =< 0 ->
|
||||||
?LOG_ERROR("Will exit Pid ~p", [Pid]),
|
?LOG_ERROR("Will exit Pid ~p", [Pid]),
|
||||||
@ -120,7 +129,10 @@ ping_timeout(Pid, #st{services = Svcs} = S) ->
|
|||||||
Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
|
Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
|
||||||
S#st{services = Svcs#{Pid := Svc1}}
|
S#st{services = Svcs#{Pid := Svc1}}
|
||||||
end;
|
end;
|
||||||
error ->
|
{ok, _} ->
|
||||||
|
?LOG_DEBUG("Timeout didn't match TRef - ignoring", []),
|
||||||
|
S;
|
||||||
|
_ ->
|
||||||
S
|
S
|
||||||
end.
|
end.
|
||||||
|
|
||||||
@ -136,6 +148,15 @@ update_pt(Id, Info) ->
|
|||||||
Pt = get_pt(),
|
Pt = get_pt(),
|
||||||
put_pt(Pt#{Id => Info}).
|
put_pt(Pt#{Id => Info}).
|
||||||
|
|
||||||
|
remove_restart_info_(IDs) ->
|
||||||
|
RI = get_pt(),
|
||||||
|
case maps:without(IDs, RI) of
|
||||||
|
RI ->
|
||||||
|
ok;
|
||||||
|
NewRI ->
|
||||||
|
put_pt(NewRI)
|
||||||
|
end.
|
||||||
|
|
||||||
get_pt() ->
|
get_pt() ->
|
||||||
persistent_term:get(pt_key(), #{}).
|
persistent_term:get(pt_key(), #{}).
|
||||||
|
|
||||||
|
|||||||
@ -2,10 +2,10 @@
|
|||||||
{type,app}.
|
{type,app}.
|
||||||
{modules,[]}.
|
{modules,[]}.
|
||||||
{prefix,"gmhc"}.
|
{prefix,"gmhc"}.
|
||||||
{desc,"Gajumaru Hive Client"}.
|
|
||||||
{author,"Ulf Wiger, QPQ AG"}.
|
{author,"Ulf Wiger, QPQ AG"}.
|
||||||
{package_id,{"uwiger","gmhive_client",{0,9,1}}}.
|
{desc,"Gajumaru Hive Client"}.
|
||||||
{deps,[{"uwiger","gmhive_protocol",{0,2,0}},
|
{package_id,{"uwiger","gmhive_client",{0,10,0}}}.
|
||||||
|
{deps,[{"uwiger","gmhive_protocol",{0,3,1}},
|
||||||
{"uwiger","gmhive_worker",{0,5,1}},
|
{"uwiger","gmhive_worker",{0,5,1}},
|
||||||
{"uwiger","gmcuckoo",{1,2,4}},
|
{"uwiger","gmcuckoo",{1,2,4}},
|
||||||
{"otpr","eblake2",{1,0,1}},
|
{"otpr","eblake2",{1,0,1}},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user