Protect against accidental multiple concurrent connections

This commit is contained in:
Ulf Wiger 2025-10-24 20:30:41 +02:00
parent 0e4b6d7873
commit bfccda2c3f
5 changed files with 82 additions and 8 deletions

View File

@ -1,6 +1,6 @@
{application,gmhive_client, {application,gmhive_client,
[{description,"Gajumaru Hive Client"}, [{description,"Gajumaru Hive Client"},
{vsn,"0.9.1"}, {vsn,"0.9.2"},
{registered,[]}, {registered,[]},
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
gmconfig,gmhive_protocol,gmhive_worker]}, gmconfig,gmhive_protocol,gmhive_worker]},

View File

@ -442,12 +442,14 @@ protocol_connect(Opts, #st{econn = EConn} = S) ->
Type = to_atom(opt(type, Opts, [<<"type">>])), Type = to_atom(opt(type, Opts, [<<"type">>])),
RId = erlang:unique_integer(), RId = erlang:unique_integer(),
Vsns = gmhp_msgs:versions(), Vsns = gmhp_msgs:versions(),
%% Client = client_name(),
Protocols = gmhp_msgs:protocols(hd(Vsns)), Protocols = gmhp_msgs:protocols(hd(Vsns)),
ConnectReq = #{ protocols => Protocols ConnectReq = #{ protocols => Protocols
, versions => Vsns , versions => Vsns
, pool_id => PoolId , pool_id => PoolId
, pubkey => Pubkey , pubkey => Pubkey
, extra_pubkeys => Extra , extra_pubkeys => Extra
%% , client => Client
, type => Type , type => Type
, nonces => gmhc_server:total_nonces() , nonces => gmhc_server:total_nonces()
, signature => ""}, , signature => ""},
@ -491,6 +493,38 @@ send_connect(EConn, RId, Msg, #{pubkey := Pubkey,
error(protocol_connect_timeout) error(protocol_connect_timeout)
end. end.
%% client_name() ->
%% MyStr = app_string(gmhive_client),
%% case app_string(gajumine) of
%% <<>> -> MyStr;
%% GMStr -> <<MyStr/binary,",",GMStr/binary>>
%% end.
%% %% {ok, Vsn} = application:get_key(gmhive_client, vsn),
%% %% maybe_add_gajumine(unicode:characters_to_binary(["gmhive_client-", Vsn])).
%% maybe_add_gajumine(Str) ->
%% case setup_zomp:is_zomp_context() of
%% true ->
%% case zx_daemon:meta() of
%% #{package_id := {"qpq","gajumine",_} = PId} ->
%% {ok, PStr} = zx_lib:package_string(PId),
%% GMStr = unicode:characters_to_binary([",", PStr]),
%% <<Str/binary, GMStr/binary>>;
%% _ ->
%% Str
%% end;
%% false ->
%% Str
%% end.
%% app_string(App) ->
%% case application:get_key(App, vsn) of
%% undefined ->
%% <<>>;
%% {ok, Vsn} ->
%% unicode:characters_to_binary([atom_to_binary(App),"-",Vsn])
%% end.
to_bin(A) when is_atom(A) -> to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8); atom_to_binary(A, utf8);
to_bin(S) -> to_bin(S) ->

View File

@ -9,10 +9,19 @@
, start_connector/1 , start_connector/1
, add_restart_info/2 ]). , add_restart_info/2 ]).
-export([ sort_restart_info/1 ]).
-include_lib("kernel/include/logger.hrl").
start_link() -> start_link() ->
case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of
{ok, _} = Ok -> {ok, _} = Ok ->
RI = gmhc_watchdog:get_restart_info(), RI0 = gmhc_watchdog:get_restart_info(),
?LOG_ERROR("Restart info: ~p", [RI0]),
RI = sort_restart_info(RI0),
RemoveIds = maps:keys(maps:without(maps:keys(RI), RI0)),
gmhc_watchdog:remove_restart_info(RemoveIds),
?LOG_ERROR("Sorted restart info: ~p", [RI]),
maps:foreach(fun restart_connector/2, RI), maps:foreach(fun restart_connector/2, RI),
Ok; Ok;
Other -> Other ->
@ -49,3 +58,13 @@ init([]) ->
, shutdown => 5000 , shutdown => 5000
, modules => [Mod] } ], , modules => [Mod] } ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
sort_restart_info(RI) ->
L = lists:sort(
[{Id, {T,H,P}, I}
|| {{?MODULE,_} = Id, #{type := T, host := H, port := P} = I} <- maps:to_list(RI)]),
ConnTypes = [CT || {_, CT, _} <- lists:ukeysort(2, L)],
lists:foldl(fun(CT, Acc) ->
{Id, _, I} = lists:keyfind(CT, 2, L),
Acc#{Id => I}
end, #{}, ConnTypes).

View File

@ -6,6 +6,7 @@
, ping/0 ]). , ping/0 ]).
-export([ note_started/2 -export([ note_started/2
, remove_restart_info/1
, get_restart_info/0 ]). , get_restart_info/0 ]).
-export([ start_link/0 -export([ start_link/0
@ -41,6 +42,11 @@ note_started(Id, Info) ->
get_restart_info() -> get_restart_info() ->
gen_server:call(?MODULE, get_restart_info). gen_server:call(?MODULE, get_restart_info).
remove_restart_info([]) ->
ok;
remove_restart_info(IDs) ->
gen_server:call(?MODULE, {remove_restart_info, IDs}).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -50,6 +56,9 @@ init([]) ->
handle_call({note_started, Id, Info}, _From, S) -> handle_call({note_started, Id, Info}, _From, S) ->
update_pt(Id, Info), update_pt(Id, Info),
{reply, ok, S}; {reply, ok, S};
handle_call({remove_restart_info, IDs}, _From, S) ->
remove_restart_info_(IDs),
{reply, ok, S};
handle_call(get_restart_info, _From, S) -> handle_call(get_restart_info, _From, S) ->
{reply, get_pt(), S}; {reply, get_pt(), S};
handle_call({watch, Pid, Interval, N}, _From, S) -> handle_call({watch, Pid, Interval, N}, _From, S) ->
@ -65,9 +74,9 @@ handle_cast(Msg, S) ->
?LOG_DEBUG("Unknown cast: ~p", [Msg]), ?LOG_DEBUG("Unknown cast: ~p", [Msg]),
{noreply, S}. {noreply, S}.
handle_info({timeout, _, Pid}, S) -> handle_info({timeout, TRef, Pid}, S) ->
?LOG_INFO("Timeout for pid ~p", [Pid]), ?LOG_INFO("Timeout for pid ~p", [Pid]),
{noreply, ping_timeout(Pid, S)}; {noreply, ping_timeout(Pid, TRef, S)};
handle_info({'DOWN', _, process, Pid, _}, S) -> handle_info({'DOWN', _, process, Pid, _}, S) ->
{noreply, delete_watch(Pid, S)}; {noreply, delete_watch(Pid, S)};
handle_info(Msg, S) -> handle_info(Msg, S) ->
@ -108,9 +117,9 @@ delete_watch(Pid, #st{services = Svcs} = S) ->
S S
end. end.
ping_timeout(Pid, #st{services = Svcs} = S) -> ping_timeout(Pid, TRef, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of case maps:find(Pid, Svcs) of
{ok, #svc{ n = N } = Svc} -> {ok, #svc{ n = N, tref = TRef} = Svc} ->
N1 = N-1, N1 = N-1,
if N1 =< 0 -> if N1 =< 0 ->
?LOG_ERROR("Will exit Pid ~p", [Pid]), ?LOG_ERROR("Will exit Pid ~p", [Pid]),
@ -120,7 +129,10 @@ ping_timeout(Pid, #st{services = Svcs} = S) ->
Svc1 = restart_timer(Pid, Svc#svc{n = N1}), Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
S#st{services = Svcs#{Pid := Svc1}} S#st{services = Svcs#{Pid := Svc1}}
end; end;
error -> {ok, _} ->
?LOG_DEBUG("Timeout didn't match TRef - ignoring", []),
S;
_ ->
S S
end. end.
@ -136,6 +148,15 @@ update_pt(Id, Info) ->
Pt = get_pt(), Pt = get_pt(),
put_pt(Pt#{Id => Info}). put_pt(Pt#{Id => Info}).
remove_restart_info_(IDs) ->
RI = get_pt(),
case maps:without(IDs, RI) of
RI ->
ok;
NewRI ->
put_pt(NewRI)
end.
get_pt() -> get_pt() ->
persistent_term:get(pt_key(), #{}). persistent_term:get(pt_key(), #{}).

View File

@ -4,7 +4,7 @@
{prefix,"gmhc"}. {prefix,"gmhc"}.
{desc,"Gajumaru Hive Client"}. {desc,"Gajumaru Hive Client"}.
{author,"Ulf Wiger, QPQ AG"}. {author,"Ulf Wiger, QPQ AG"}.
{package_id,{"uwiger","gmhive_client",{0,9,1}}}. {package_id,{"uwiger","gmhive_client",{0,9,2}}}.
{deps,[{"uwiger","gmhive_protocol",{0,2,0}}, {deps,[{"uwiger","gmhive_protocol",{0,2,0}},
{"uwiger","gmhive_worker",{0,5,1}}, {"uwiger","gmhive_worker",{0,5,1}},
{"uwiger","gmcuckoo",{1,2,4}}, {"uwiger","gmcuckoo",{1,2,4}},