555 lines
18 KiB
Erlang
555 lines
18 KiB
Erlang
-module(gmhc_connector).
|
|
-vsn("0.8.3").
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-export([ start_link/1
|
|
, init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
, code_change/3
|
|
]).
|
|
|
|
-export([
|
|
connect/1
|
|
, disconnect/1
|
|
, status/0
|
|
, status/1
|
|
, send/2
|
|
]).
|
|
|
|
-export([ whereis_id/1 ]).
|
|
|
|
%% for outcoing messages
|
|
%% -export([ solution/3
|
|
%% , no_solution/2
|
|
%% , nonces/2
|
|
%% ]).
|
|
|
|
-include_lib("kernel/include/logger.hrl").
|
|
|
|
-define(MAX_RETRY_INTERVAL, 30000).
|
|
|
|
-type retry_opts() :: #{ deadline => pos_integer() }.
|
|
|
|
-type timer_ref() :: {reference(), non_neg_integer(), non_neg_integer(), retry_opts()}.
|
|
|
|
-type id() :: non_neg_integer().
|
|
|
|
-type connect_opts() :: #{ host => string()
|
|
, port => pos_integer()
|
|
, timeout => pos_integer() %% overrides deadline
|
|
, deadline => pos_integer() %% ms monotonic must be in the future
|
|
, nowait => boolean() %% default: false
|
|
, tcp_opts => list()
|
|
, enoise_opts => list() %% if present, MUST contain {'noise', _}
|
|
, tcp_opts => list()
|
|
, connect_timeout => pos_integer()
|
|
}.
|
|
|
|
-export_type([ id/0
|
|
, connect_opts/0 ]).
|
|
|
|
-record(st, {
|
|
id :: non_neg_integer()
|
|
, auto_connect = true :: boolean()
|
|
, econn
|
|
, connected = false :: boolean()
|
|
, status = disconnected :: disconnected | connecting | connected
|
|
, reconnect = true :: boolean()
|
|
, reconnect_timer :: timer_ref() | 'undefined'
|
|
, recache_timer :: reference() | 'undefined'
|
|
, awaiting_connect = [] :: list()
|
|
, protocol :: binary() | 'undefined'
|
|
, version :: binary() | 'undefined'
|
|
, opts = #{} :: map()
|
|
}).
|
|
|
|
-spec connect(connect_opts()) -> ok | {error, any()}.
|
|
connect(Opts) when is_map(Opts) ->
|
|
{Timeout, Opts1} = manual_connect_timeout(Opts),
|
|
gen_server:call(?MODULE, {connect, Opts1}, Timeout).
|
|
|
|
manual_connect_timeout(#{timeout := T} = Opts) ->
|
|
Deadline = erlang:monotonic_time(millisecond) + T,
|
|
{T, maps:remove(timeout, Opts#{deadline => Deadline})};
|
|
manual_connect_timeout(#{deadline := D} = Opts) ->
|
|
Timeout = D - erlang:monotonic_time(millisecond),
|
|
if Timeout < 0 ->
|
|
error(invalid_deadline);
|
|
true ->
|
|
{Timeout, Opts}
|
|
end;
|
|
manual_connect_timeout(Opts) ->
|
|
DefaultT = 10000,
|
|
Deadline = erlang:monotonic_time(millisecond) + DefaultT,
|
|
{DefaultT, Opts#{deadline => Deadline}}.
|
|
|
|
disconnect(Id) ->
|
|
gen_server:call(via(Id), disconnect).
|
|
|
|
send(Via, Msg) ->
|
|
gen_server:cast(via(Via), {send, Msg}).
|
|
|
|
status() ->
|
|
{via, gproc, HeadPat} = via('$1'),
|
|
Connectors = gproc:select([{{HeadPat,'_','_'}, [], ['$1']}]),
|
|
[{Id, status_(via(Id))} || Id <- Connectors].
|
|
|
|
status(Id) ->
|
|
{via, gproc, Req} = via(Id),
|
|
case gproc:where(Req) of
|
|
undefined ->
|
|
disconnected;
|
|
Pid when is_pid(Pid) ->
|
|
status_(Pid)
|
|
end.
|
|
|
|
status_(Proc) ->
|
|
try gen_server:call(Proc, status)
|
|
catch
|
|
error:_ ->
|
|
disconnected
|
|
end.
|
|
|
|
%% start_link() ->
|
|
%% start_link(#{}).
|
|
|
|
whereis_id(Id) ->
|
|
gproc:where(reg(Id)).
|
|
|
|
reg(Id) ->
|
|
{n, l, {?MODULE, Id}}.
|
|
|
|
via(Id) ->
|
|
{via, gproc, reg(Id)}.
|
|
|
|
start_link(#{id := Id} = Opts) ->
|
|
gen_server:start_link(via(Id), ?MODULE, Opts, []).
|
|
|
|
init(#{id := Id} = Opts) when is_map(Opts) ->
|
|
AutoConnect = opt_autoconnect(Opts),
|
|
S0 = #st{id = Id, auto_connect = AutoConnect},
|
|
Nowait = maps:get(nowait, Opts, false),
|
|
if Nowait ->
|
|
proc_lib:init_ack({ok, self()});
|
|
true ->
|
|
ok
|
|
end,
|
|
S1 =
|
|
case AutoConnect of
|
|
true ->
|
|
case try_connect(Opts, S0) of
|
|
{ok, S} ->
|
|
?LOG_DEBUG("Initial connect succeeded", []),
|
|
S;
|
|
{error, rejected} ->
|
|
?LOG_WARNING("Connection rejected; will not retry", []),
|
|
S0#st{econn = undefined, reconnect = false};
|
|
{error, _} = Error ->
|
|
?LOG_WARNING("Could not connect to core server: ~p", [Error]),
|
|
start_reconnect_timer(S0#st{econn = undefined})
|
|
end;
|
|
false ->
|
|
S0
|
|
end,
|
|
if Nowait ->
|
|
gen_server:enter_loop(?MODULE, [], S1, via(Id));
|
|
true ->
|
|
{ok, S1}
|
|
end.
|
|
|
|
handle_call(status, _From, #st{econn = EConn} = S) ->
|
|
Status = case EConn of
|
|
undefined -> disconnected;
|
|
_ -> connected
|
|
end,
|
|
{reply, Status, S};
|
|
handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) ->
|
|
Nowait = maps:get(nowait, Opts, false),
|
|
case Nowait of
|
|
true -> gen_server:reply(From, ok);
|
|
false -> ok
|
|
end,
|
|
case try_connect(Opts, S) of
|
|
{ok, S1} ->
|
|
if Nowait -> {noreply, S1};
|
|
true -> {reply, ok, S1}
|
|
end;
|
|
{error, _} = Error ->
|
|
case maps:get(retry, Opts, true) of
|
|
true ->
|
|
Waiters1 = if Nowait -> Waiters;
|
|
true -> [{From, retry_opts(Opts)}|Waiters]
|
|
end,
|
|
S1 = start_reconnect_timer(
|
|
S#st{ auto_connect = true
|
|
, reconnect = true
|
|
, awaiting_connect = Waiters1 }, Opts),
|
|
{noreply, S1};
|
|
false ->
|
|
if Nowait -> {noreply, S};
|
|
true -> {reply, Error, S}
|
|
end
|
|
end
|
|
end;
|
|
handle_call(disconnect, _From, #st{econn = EConn} = S) ->
|
|
case EConn of
|
|
undefined ->
|
|
ok;
|
|
_ ->
|
|
|
|
enoise:close(EConn)
|
|
end,
|
|
S1 = cancel_reconnect_timer(S#st{ auto_connect = false
|
|
, econn = undefined }),
|
|
{reply, ok, S1};
|
|
handle_call(_Req, _From, S) ->
|
|
{reply, {error, unknown_call}, S}.
|
|
|
|
handle_cast({send, Msg0}, #st{ econn = EConn
|
|
, protocol = P
|
|
, version = V } = S) when EConn =/= undefined ->
|
|
try
|
|
Msg = maps:remove(via, Msg0),
|
|
Data = gmhp_msgs:encode(Msg, P, V),
|
|
enoise:send(EConn, Data)
|
|
catch
|
|
error:E:T ->
|
|
?LOG_ERROR("CAUGHT error:~p / ~p", [E, T])
|
|
end,
|
|
{noreply, S};
|
|
handle_cast(_Msg, S) ->
|
|
{noreply, S}.
|
|
|
|
handle_info({noise, EConn, Data}, #st{ id = Id
|
|
, econn = EConn
|
|
, protocol = P, version = V} = S) ->
|
|
try gmhp_msgs:decode(Data, P, V) of
|
|
Msg ->
|
|
gmhc_handler:from_pool(Msg#{via => Id})
|
|
catch
|
|
error:E ->
|
|
?LOG_WARNING("Unknown message (~p): ~p", [E, Data])
|
|
end,
|
|
{noreply, S};
|
|
handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _, _, _}
|
|
, auto_connect = true } = S) ->
|
|
case try_connect(Opts, S) of
|
|
{ok, S1} ->
|
|
?LOG_DEBUG("protocol connected", []),
|
|
{noreply, S1};
|
|
{error, _} = Error ->
|
|
?LOG_DEBUG("Reconnect attempt failed: ~p", [Error]),
|
|
{noreply, restart_reconnect_timer(S)}
|
|
end;
|
|
handle_info({tcp_closed, _Port}, #st{} = S) ->
|
|
?LOG_DEBUG("got tcp_closed", []),
|
|
S1 = disconnected(S#st.id, S),
|
|
S2 = case S1#st.auto_connect of
|
|
true -> start_reconnect_timer(S1#st{econn = undefined});
|
|
false -> S1#st{econn = undefined}
|
|
end,
|
|
{noreply, S2};
|
|
handle_info({timeout, _, recache}, #st{opts = Opts, econn = EConn} = S0) ->
|
|
S = S0#st{recache_timer = undefined},
|
|
case (EConn =/= undefined) andalso S#st.connected of
|
|
false ->
|
|
{noreply, S};
|
|
true ->
|
|
cache_eureka_info(Opts),
|
|
{noreply, ensure_recache_timer(S)}
|
|
end;
|
|
handle_info(Msg, S) ->
|
|
?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]),
|
|
{noreply, S}.
|
|
|
|
terminate(_Reason, _S) ->
|
|
ok.
|
|
|
|
code_change(_FromVsn, S, _Extra) ->
|
|
{ok, S}.
|
|
|
|
%% try_connect() ->
|
|
%% try_connect(#{}, #st{}).
|
|
|
|
%% try_connect(#st{} = S) ->
|
|
%% try_connect(#{}, S).
|
|
|
|
try_connect(Opts, S) ->
|
|
try try_connect_(Opts, S)
|
|
catch
|
|
error:rejected:_ ->
|
|
{error, rejected};
|
|
error:E:T ->
|
|
?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]),
|
|
{error, E}
|
|
end.
|
|
|
|
try_connect_(Opts0, S) ->
|
|
case eureka_get_host_port() of
|
|
{error, _} = Error ->
|
|
Error;
|
|
PoolOpts when is_map(PoolOpts) ->
|
|
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
|
|
{ok, EConn, Opts1} ->
|
|
S1 = protocol_connect(Opts1, S#st{ econn = EConn
|
|
, status = connecting
|
|
, reconnect_timer = undefined }),
|
|
{ok, S1#st{status = connected}};
|
|
{error, _} = Error ->
|
|
Error
|
|
end
|
|
end.
|
|
|
|
eureka_get_host_port() ->
|
|
case gmhc_eureka:get_pool_address() of
|
|
{ok, #{host := Host,
|
|
port := Port,
|
|
pool_id := PoolId}} ->
|
|
#{host => unicode:characters_to_list(Host),
|
|
port => Port,
|
|
pool_id => unicode:characters_to_list(PoolId)};
|
|
{error, _} = Error ->
|
|
Error
|
|
end.
|
|
|
|
|
|
try_noise_connect(Opts) ->
|
|
Host = binary_to_list(gmhc_config:get_config([<<"pool">>, <<"host">>])),
|
|
Port = gmhc_config:get_config([<<"pool">>, <<"port">>]),
|
|
noise_connect(maps:merge(#{host => Host, port => Port}, Opts)).
|
|
|
|
noise_connect(#{host := Host, port := Port} = Opts) ->
|
|
TcpOpts = maps:get(tcp_opts, Opts, default_tcp_opts()),
|
|
Timeout = maps:get(connect_timeout, Opts, 5000),
|
|
?LOG_DEBUG("TCP connect: Host=~p, Port=~p, Timeout=~p, Opts=~p",
|
|
[Host,Port,Timeout,TcpOpts]),
|
|
case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of
|
|
{ok, TcpSock} ->
|
|
?LOG_DEBUG("Connected, TcpSock = ~p", [TcpSock]),
|
|
EnoiseOpts = maps:get(enoise_opts, Opts, enoise_opts()),
|
|
case enoise:connect(TcpSock, EnoiseOpts) of
|
|
{ok, EConn, _FinalSt} ->
|
|
{ok, EConn, Opts#{ tcp_opts => TcpOpts
|
|
, timeout => Timeout
|
|
, enoise_opts => EnoiseOpts }};
|
|
{error, _} = Err ->
|
|
Err
|
|
end;
|
|
{error, _} = TcpErr ->
|
|
?LOG_DEBUG("TCP connection failed: ~p", [TcpErr]),
|
|
TcpErr
|
|
end.
|
|
|
|
default_tcp_opts() ->
|
|
[ {active, true}
|
|
, {reuseaddr, true}
|
|
, {mode, binary}
|
|
].
|
|
|
|
enoise_opts() ->
|
|
[{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}].
|
|
|
|
start_reconnect_timer(#st{reconnect = false} = S) ->
|
|
S;
|
|
start_reconnect_timer(#st{} = S) ->
|
|
start_reconnect_timer(S, #{}).
|
|
|
|
start_reconnect_timer(#st{} = S, Opts) ->
|
|
case deadline_reached(Opts) of
|
|
{true, D} ->
|
|
?LOG_DEBUG("timer deadline reached, not restarting timer", []),
|
|
notify_deadline(D, S#st{reconnect_timer = undefined});
|
|
false ->
|
|
?LOG_DEBUG("starting reconnect timer ...", []),
|
|
TRef = start_timer(1000, Opts),
|
|
S#st{reconnect_timer = {TRef, 10, 8000 + gmhc_lib:rand(3000), Opts}}
|
|
end.
|
|
|
|
restart_reconnect_timer(#st{reconnect = false} = S) ->
|
|
cancel_reconnect_timer(S);
|
|
restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) ->
|
|
NewT = max(T + 5000 + gmhc_lib:rand(1000), ?MAX_RETRY_INTERVAL),
|
|
if (NewT > T andalso NewT =:= ?MAX_RETRY_INTERVAL) ->
|
|
gmhc_eureka:invalidate_cache();
|
|
true ->
|
|
ok
|
|
end,
|
|
TRef = start_timer(NewT, Opts),
|
|
S#st{reconnect_timer = {TRef, 10, NewT, Opts}};
|
|
restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) ->
|
|
TRef = start_timer(T, Opts),
|
|
S#st{reconnect_timer = {TRef, N-1, T, Opts}}.
|
|
|
|
deadline_reached(#{deadline := D}) ->
|
|
case erlang:monotonic_time(millisecond) > D of
|
|
true -> {true, D};
|
|
false -> false
|
|
end;
|
|
deadline_reached(_) ->
|
|
false.
|
|
|
|
notify_deadline(D, #st{awaiting_connect = Waiters} = S) ->
|
|
Waiters1 =
|
|
lists:foldr(fun({From, D1}, Acc) when D1 == D ->
|
|
gen_server:reply(From, {error, timeout}),
|
|
Acc;
|
|
(Other, Acc) ->
|
|
[Other | Acc]
|
|
end, [], Waiters),
|
|
S#st{awaiting_connect = Waiters1}.
|
|
|
|
cache_eureka_info(Opts) ->
|
|
gmhc_eureka:cache_good_address(maps:with([host, port, pool_id], Opts)).
|
|
|
|
notify_connected(#st{id = Id, awaiting_connect = Waiters, opts = Opts} = S) ->
|
|
gmhc_events:publish(connected, #{id => Id}),
|
|
[gen_server:reply(From, ok) || {From, _} <- Waiters],
|
|
gmhc_handler:pool_connected(S#st.id, S#st.opts),
|
|
cache_eureka_info(Opts),
|
|
gmhc_connectors_sup:add_restart_info(Id, Opts),
|
|
ensure_recache_timer(S#st{awaiting_connect = []}).
|
|
|
|
ensure_recache_timer(#st{recache_timer = T} = S) ->
|
|
case T of
|
|
undefined ->
|
|
TRef = erlang:start_timer(1*60*1000, self(), recache),
|
|
S#st{recache_timer = TRef};
|
|
_ when is_reference(T) ->
|
|
S
|
|
end.
|
|
|
|
cancel_reconnect_timer(#st{reconnect_timer = T} = S) ->
|
|
case T of
|
|
undefined -> S;
|
|
{TRef, _, _, _} ->
|
|
erlang:cancel_timer(TRef),
|
|
S#st{reconnect_timer = undefined}
|
|
end.
|
|
|
|
start_timer(T, Opts0) ->
|
|
Opts = retry_opts(Opts0),
|
|
erlang:start_timer(T, self(), {reconnect, Opts}).
|
|
|
|
retry_opts(Opts) ->
|
|
maps:with([deadline], Opts).
|
|
|
|
protocol_connect(Opts, #st{econn = EConn} = S) ->
|
|
Pubkey = to_bin(opt(pubkey, Opts, [<<"pubkey">>])),
|
|
Extra = [to_bin(X) || X <- opt(extra_pubkeys, Opts, [<<"extra_pubkeys">>])],
|
|
PoolId = to_bin(opt(pool_id, Opts, [<<"pool">>, <<"id">>])),
|
|
Type = to_atom(opt(type, Opts, [<<"type">>])),
|
|
RId = erlang:unique_integer(),
|
|
Vsns = gmhp_msgs:versions(),
|
|
Client = client_name(),
|
|
Protocols = gmhp_msgs:protocols(hd(Vsns)),
|
|
ConnectReq = #{ protocols => Protocols
|
|
, versions => Vsns
|
|
, pool_id => PoolId
|
|
, pubkey => Pubkey
|
|
, extra_pubkeys => Extra
|
|
, client => Client
|
|
, type => Type
|
|
, nonces => gmhc_server:total_nonces()
|
|
, signature => ""},
|
|
?LOG_DEBUG("ConnectReq = ~p", [ConnectReq]),
|
|
try gmhp_msgs:encode_connect(ConnectReq, RId) of
|
|
Msg ->
|
|
send_connect(EConn, RId, Msg, ConnectReq, Opts, S)
|
|
catch error:Error ->
|
|
ErrMsg = unicode:characters_to_binary(io_lib:fwrite("~p", [Error])),
|
|
disconnected(S#st.id, #{error =>
|
|
#{code => gmhp_msgs:error_code(invalid_input),
|
|
message => ErrMsg}}, S)
|
|
end.
|
|
|
|
send_connect(EConn, RId, Msg, #{pubkey := Pubkey,
|
|
extra_pubkeys := Extra,
|
|
pool_id := PoolId,
|
|
type := Type}, Opts, S) ->
|
|
enoise:send(EConn, Msg),
|
|
receive
|
|
{noise, EConn, Data} ->
|
|
case gmhp_msgs:decode_connect_ack(Data) of
|
|
#{reply := #{ id := RId
|
|
, result := #{connect_ack := #{ protocol := P
|
|
, version := V }}
|
|
}} ->
|
|
S1 = connected(S#st.id, Type, S),
|
|
Opts1 = Opts#{ pubkey => Pubkey
|
|
, extra => Extra
|
|
, pool_id => PoolId
|
|
, type => Type },
|
|
notify_connected(S1#st{protocol = P, version = V, opts = Opts1});
|
|
#{error := #{code := _, message := ErrMsg}} = ErrReply ->
|
|
?LOG_ERROR("Connect error: ~s", [ErrMsg]),
|
|
%% TODO: fix the flow so that we send one disconnected event,
|
|
%% and set the reconnect in the right place. For now, stuff
|
|
%% the `reconnect = false`.
|
|
disconnected(S#st.id, ErrReply, S#st{reconnect = false}),
|
|
gmhc_eureka:invalidate_cache(),
|
|
error(rejected)
|
|
end
|
|
after 10000 ->
|
|
gmhc_eureka:invalidate_cache(),
|
|
error(protocol_connect_timeout)
|
|
end.
|
|
|
|
client_name() ->
|
|
MyStr = app_string(gmhive_client),
|
|
case app_string(gajumine) of
|
|
<<>> -> MyStr;
|
|
GMStr -> <<MyStr/binary,",",GMStr/binary>>
|
|
end.
|
|
|
|
app_string(App) ->
|
|
case application:get_key(App, vsn) of
|
|
undefined ->
|
|
<<>>;
|
|
{ok, Vsn} ->
|
|
unicode:characters_to_binary([atom_to_binary(App),"-",Vsn])
|
|
end.
|
|
|
|
to_bin(A) when is_atom(A) ->
|
|
atom_to_binary(A, utf8);
|
|
to_bin(S) ->
|
|
iolist_to_binary(S).
|
|
|
|
to_atom(A) when is_atom(A) -> A;
|
|
to_atom(S) ->
|
|
binary_to_existing_atom(iolist_to_binary(S), utf8).
|
|
|
|
connected(Id, Type, S) when Type==worker; Type==monitor ->
|
|
gmhc_server:connected(Id, Type),
|
|
S#st{connected = true}.
|
|
|
|
disconnected(Id, S) ->
|
|
disconnected(Id, #{}, S).
|
|
|
|
disconnected(_, _, #st{status = disconnected} = S) -> S;
|
|
disconnected(Id, Msg, #st{reconnect = Bool} = S) ->
|
|
gmhc_events:publish(disconnected, Msg#{id => Id, reconnecting => Bool}),
|
|
gmhc_server:disconnected(Id),
|
|
S#st{connected = false, status = disconnected}.
|
|
|
|
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
|
|
Bool;
|
|
opt_autoconnect(#{id := Id}) ->
|
|
case Id of
|
|
1 ->
|
|
application:get_env(gmhive_client, auto_connect, true);
|
|
_ ->
|
|
true
|
|
end.
|
|
|
|
opt(OptsK, Opts, SchemaPath) ->
|
|
case maps:find(OptsK, Opts) of
|
|
error ->
|
|
gmhc_config:get_config(SchemaPath);
|
|
{ok, V} ->
|
|
V
|
|
end.
|