gmhive_client/src/gmhc_connector.erl
2025-05-14 08:22:43 +02:00

469 lines
15 KiB
Erlang

-module(gmhc_connector).
-behaviour(gen_server).
-export([ start_link/1
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
connect/1
, disconnect/1
, status/0
, status/1
, send/2
]).
-export([ whereis_id/1 ]).
%% for outcoing messages
%% -export([ solution/3
%% , no_solution/2
%% , nonces/2
%% ]).
-include_lib("kernel/include/logger.hrl").
-define(MAX_RETRY_INTERVAL, 30000).
-type retry_opts() :: #{ deadline => pos_integer() }.
-type timer_ref() :: {reference(), non_neg_integer(), non_neg_integer(), retry_opts()}.
-type id() :: non_neg_integer().
-type connect_opts() :: #{ host => string()
, port => pos_integer()
, timeout => pos_integer() %% overrides deadline
, deadline => pos_integer() %% ms monotonic must be in the future
, nowait => boolean() %% default: false
, tcp_opts => list()
, enoise_opts => list() %% if present, MUST contain {'noise', _}
, tcp_opts => list()
, connect_timeout => pos_integer()
}.
-export_type([ id/0
, connect_opts/0 ]).
-record(st, {
id :: non_neg_integer()
, auto_connect = true :: boolean()
, econn
, reconnect_timer :: timer_ref() | 'undefined'
, awaiting_connect = [] :: list()
, protocol :: binary() | 'undefined'
, version :: binary() | 'undefined'
, opts = #{} :: map()
}).
-spec connect(connect_opts()) -> ok | {error, any()}.
connect(Opts) when is_map(Opts) ->
{Timeout, Opts1} = manual_connect_timeout(Opts),
gen_server:call(?MODULE, {connect, Opts1}, Timeout).
manual_connect_timeout(#{timeout := T} = Opts) ->
Deadline = erlang:monotonic_time(millisecond) + T,
{T, maps:remove(timeout, Opts#{deadline => Deadline})};
manual_connect_timeout(#{deadline := D} = Opts) ->
Timeout = D - erlang:monotonic_time(millisecond),
if Timeout < 0 ->
error(invalid_deadline);
true ->
{Timeout, Opts}
end;
manual_connect_timeout(Opts) ->
DefaultT = 10000,
Deadline = erlang:monotonic_time(millisecond) + DefaultT,
{DefaultT, Opts#{deadline => Deadline}}.
disconnect(Id) ->
gen_server:call(via(Id), disconnect).
send(Via, Msg) ->
gen_server:cast(via(Via), {send, Msg}).
status() ->
{via, gproc, HeadPat} = via('$1'),
Connectors = gproc:select([{{HeadPat,'_','_'}, [], ['$1']}]),
[{Id, status_(via(Id))} || Id <- Connectors].
status(Id) ->
{via, gproc, Req} = via(Id),
case gproc:where(Req) of
undefined ->
disconnected;
Pid when is_pid(Pid) ->
status_(Pid)
end.
status_(Proc) ->
try gen_server:call(Proc, status)
catch
error:_ ->
disconnected
end.
%% start_link() ->
%% start_link(#{}).
whereis_id(Id) ->
gproc:where(reg(Id)).
reg(Id) ->
{n, l, {?MODULE, Id}}.
via(Id) ->
{via, gproc, reg(Id)}.
start_link(#{id := Id} = Opts) ->
gen_server:start_link(via(Id), ?MODULE, Opts, []).
init(#{id := Id} = Opts) when is_map(Opts) ->
AutoConnect = opt_autoconnect(Opts),
S0 = #st{id = Id, auto_connect = AutoConnect},
Nowait = maps:get(nowait, Opts, false),
if Nowait ->
proc_lib:init_ack({ok, self()});
true ->
ok
end,
S1 =
case AutoConnect of
true ->
case try_connect(Opts, S0) of
{ok, S} ->
?LOG_DEBUG("Initial connect succeeded", []),
S;
{error, _} = Error ->
?LOG_WARNING("Could not connect to core server: ~p", [Error]),
start_reconnect_timer(S0#st{econn = undefined})
end;
false ->
S0
end,
if Nowait ->
gen_server:enter_loop(?MODULE, [], S1, via(Id));
true ->
{ok, S1}
end.
handle_call(status, _From, #st{econn = EConn} = S) ->
Status = case EConn of
undefined -> disconnected;
_ -> connected
end,
{reply, Status, S};
handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) ->
Nowait = maps:get(nowait, Opts, false),
case Nowait of
true -> gen_server:reply(From, ok);
false -> ok
end,
case try_connect(Opts, S) of
{ok, S1} ->
if Nowait -> {noreply, S1};
true -> {reply, ok, S1}
end;
{error, _} = Error ->
case maps:get(retry, Opts, true) of
true ->
Waiters1 = if Nowait -> Waiters;
true -> [{From, retry_opts(Opts)}|Waiters]
end,
S1 = start_reconnect_timer(
S#st{ auto_connect = true
, awaiting_connect = Waiters1 }, Opts),
{noreply, S1};
false ->
if Nowait -> {noreply, S};
true -> {reply, Error, S}
end
end
end;
handle_call(disconnect, _From, #st{econn = EConn} = S) ->
case EConn of
undefined ->
ok;
_ ->
enoise:close(EConn)
end,
S1 = cancel_reconnect_timer(S#st{ auto_connect = false
, econn = undefined }),
{reply, ok, S1};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast({send, Msg0}, #st{ econn = EConn
, protocol = P
, version = V } = S) when EConn =/= undefined ->
try
Msg = maps:remove(via, Msg0),
Data = gmhp_msgs:encode(Msg, P, V),
enoise:send(EConn, Data)
catch
error:E:T ->
?LOG_ERROR("CAUGHT error:~p / ~p", [E, T])
end,
{noreply, S};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({noise, EConn, Data}, #st{ id = Id
, econn = EConn
, protocol = P, version = V} = S) ->
try gmhp_msgs:decode(Data, P, V) of
Msg ->
gmhc_handler:from_pool(Msg#{via => Id})
catch
error:E ->
?LOG_WARNING("Unknown message (~p): ~p", [E, Data])
end,
{noreply, S};
handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _, _, _}
, auto_connect = true } = S) ->
case try_connect(Opts, S) of
{ok, S1} ->
?LOG_DEBUG("protocol connected", []),
{noreply, S1};
{error, _} = Error ->
?LOG_DEBUG("Reconnect attempt failed: ~p", [Error]),
{noreply, restart_reconnect_timer(S)}
end;
handle_info({tcp_closed, _Port}, #st{} = S) ->
?LOG_DEBUG("got tcp_closed", []),
disconnected(S#st.id),
S1 = case S#st.auto_connect of
true -> start_reconnect_timer(S#st{econn = undefined});
false -> S#st{econn = undefined}
end,
{noreply, S1};
handle_info(Msg, S) ->
?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
%% try_connect() ->
%% try_connect(#{}, #st{}).
%% try_connect(#st{} = S) ->
%% try_connect(#{}, S).
try_connect(Opts, S) ->
try try_connect_(Opts, S)
catch
error:E:T ->
?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]),
{error, E}
end.
try_connect_(Opts0, S) ->
case eureka_get_host_port() of
{error, _} = Error ->
Error;
PoolOpts when is_map(PoolOpts) ->
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
{ok, EConn, Opts1} ->
S1 = protocol_connect(Opts1, S#st{ econn = EConn
, reconnect_timer = undefined }),
{ok, S1};
{error, _} = Error ->
Error
end
end.
eureka_get_host_port() ->
case gmhc_eureka:get_pool_address() of
#{<<"address">> := Host,
<<"port">> := Port,
<<"pool_id">> := PoolId} ->
#{host => binary_to_list(Host),
port => Port,
pool_id => binary_to_list(PoolId)};
{error, _} = Error ->
Error
end.
try_noise_connect(Opts) ->
Host = binary_to_list(gmhc_config:get_config([<<"pool">>, <<"host">>])),
Port = gmhc_config:get_config([<<"pool">>, <<"port">>]),
noise_connect(maps:merge(#{host => Host, port => Port}, Opts)).
noise_connect(#{host := Host, port := Port} = Opts) ->
TcpOpts = maps:get(tcp_opts, Opts, default_tcp_opts()),
Timeout = maps:get(connect_timeout, Opts, 5000),
?LOG_DEBUG("TCP connect: Host=~p, Port=~p, Timeout=~p, Opts=~p",
[Host,Port,Timeout,TcpOpts]),
case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of
{ok, TcpSock} ->
?LOG_DEBUG("Connected, TcpSock = ~p", [TcpSock]),
EnoiseOpts = maps:get(enoise_opts, Opts, enoise_opts()),
case enoise:connect(TcpSock, EnoiseOpts) of
{ok, EConn, _FinalSt} ->
{ok, EConn, Opts#{ tcp_opts => TcpOpts
, timeout => Timeout
, enoise_opts => EnoiseOpts }};
{error, _} = Err ->
Err
end;
{error, _} = TcpErr ->
?LOG_DEBUG("TCP connection failed: ~p", [TcpErr]),
TcpErr
end.
default_tcp_opts() ->
[ {active, true}
, {reuseaddr, true}
, {mode, binary}
].
enoise_opts() ->
[{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}].
start_reconnect_timer(#st{} = S) ->
start_reconnect_timer(S, #{}).
start_reconnect_timer(#st{} = S, Opts) ->
case deadline_reached(Opts) of
{true, D} ->
?LOG_DEBUG("timer deadline reached, not restarting timer", []),
notify_deadline(D, S#st{reconnect_timer = undefined});
false ->
?LOG_DEBUG("starting reconnect timer ...", []),
TRef = start_timer(1000, Opts),
S#st{reconnect_timer = {TRef, 10, 1000, Opts}}
end.
restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) ->
NewT = max(T * 2, ?MAX_RETRY_INTERVAL),
TRef = start_timer(NewT, Opts),
S#st{reconnect_timer = {TRef, 10, NewT, Opts}};
restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) ->
TRef = start_timer(T, Opts),
S#st{reconnect_timer = {TRef, N-1, T, Opts}}.
deadline_reached(#{deadline := D}) ->
case erlang:monotonic_time(millisecond) > D of
true -> {true, D};
false -> false
end;
deadline_reached(_) ->
false.
notify_deadline(D, #st{awaiting_connect = Waiters} = S) ->
Waiters1 =
lists:foldr(fun({From, D1}, Acc) when D1 == D ->
gen_server:reply(From, {error, timeout}),
Acc;
(Other, Acc) ->
[Other | Acc]
end, [], Waiters),
S#st{awaiting_connect = Waiters1}.
notify_connected(#st{id = Id, awaiting_connect = Waiters} = S) ->
gmhc_events:publish(connected, #{id => Id}),
[gen_server:reply(From, ok) || {From, _} <- Waiters],
gmhc_handler:pool_connected(S#st.id, S#st.opts),
S#st{awaiting_connect = []}.
cancel_reconnect_timer(#st{reconnect_timer = T} = S) ->
case T of
undefined -> S;
{TRef, _, _, _} ->
erlang:cancel_timer(TRef),
S#st{reconnect_timer = undefined}
end.
start_timer(T, Opts0) ->
Opts = retry_opts(Opts0),
erlang:start_timer(T, self(), {reconnect, Opts}).
retry_opts(Opts) ->
maps:with([deadline], Opts).
protocol_connect(Opts, #st{econn = EConn} = S) ->
Pubkey = to_bin(opt(pubkey, Opts, [<<"pubkey">>])),
Extra = [to_bin(X) || X <- opt(extra_pubkeys, Opts, [<<"extra_pubkeys">>])],
PoolId = to_bin(opt(pool_id, Opts, [<<"pool">>, <<"id">>])),
Type = to_atom(opt(type, Opts, [<<"type">>])),
RId = erlang:unique_integer(),
Vsns = gmhp_msgs:versions(),
Protocols = gmhp_msgs:protocols(hd(Vsns)),
ConnectReq = #{ protocols => Protocols
, versions => Vsns
, pool_id => PoolId
, pubkey => Pubkey
, extra_pubkeys => Extra
, type => Type
, nonces => gmhc_server:total_nonces()
, signature => ""},
?LOG_DEBUG("ConnectReq = ~p", [ConnectReq]),
Msg = gmhp_msgs:encode_connect(ConnectReq, RId),
enoise:send(EConn, Msg),
receive
{noise, EConn, Data} ->
case gmhp_msgs:decode_connect_ack(Data) of
#{reply := #{ id := RId
, result := #{connect_ack := #{ protocol := P
, version := V }}
}} ->
connected(S#st.id, Type),
Opts1 = Opts#{ pubkey => Pubkey
, extra => Extra
, pool_id => PoolId
, type => Type },
notify_connected(S#st{protocol = P, version = V, opts = Opts1});
#{error := #{message := Msg}} ->
?LOG_ERROR("Connect error: ~s", [Msg]),
error(protocol_connect)
end
after 10000 ->
error(protocol_connect_timeout)
end.
to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8);
to_bin(S) ->
iolist_to_binary(S).
to_atom(A) when is_atom(A) -> A;
to_atom(S) ->
binary_to_existing_atom(iolist_to_binary(S), utf8).
connected(Id, Type) when Type==worker; Type==monitor ->
gmhc_server:connected(Id, Type).
disconnected(Id) ->
gmhc_events:publish(disconnected, #{id => Id}),
gmhc_server:disconnected(Id).
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
Bool;
opt_autoconnect(#{id := Id}) ->
case Id of
1 ->
application:get_env(gmhive_client, auto_connect, true);
_ ->
true
end.
opt(OptsK, Opts, SchemaPath) ->
case maps:find(OptsK, Opts) of
error ->
gmhc_config:get_config(SchemaPath);
{ok, V} ->
V
end.