zx/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl
2018-02-13 09:59:30 +09:00

302 lines
8.7 KiB
Erlang

%%% @doc
%%% ZX Connector
%%%
%%% This module represents a connection to a Zomp server.
%%% Multiple connections can exist at a given time, but each one of these processes
%%% only represents a single connection at a time.
%%% @end
-module(zx_conn).
-author("Craig Everett <zxq9@zxq9.com>").
-copyright("Craig Everett <zxq9@zxq9.com>").
-license("GPL-3.0").
-export([start_monitor/1, stop/1]).
-export([start_link/1]).
-include("zx_logger.erl").
%%% Startup
-spec start_monitor(Target) -> Result
when Target :: zx:host(),
Result :: {ok, PID :: pid(), Mon :: reference()}
| {error, Reason},
Reason :: term().
%% @doc
%% Starts a connection to a given target Zomp node. This call itself should never fail,
%% but this process may fail to connect or crash immediately after spawning. Should
%% only be called by zx_daemon.
start_monitor(Target) ->
case zx_conn_sup:start_conn(Target) of
{ok, Pid} ->
Mon = monitor(process, Pid),
{ok, Pid, Mon};
Error ->
Error
end.
-spec stop(Conn :: pid()) -> ok.
%% @doc
%% Signals the connection to disconnect and retire immediately.
stop(Conn) ->
Conn ! stop,
ok.
-spec subscribe(Conn, Realm) -> ok
when Conn :: pid(),
Realm :: zx:realm(),
Result :: ok.
subscribe(Conn, Realm) ->
Conn ! {subscribe, Realm},
ok.
-spec start_link(Target) ->
when Target :: zx:host(),
Result :: {ok, pid()}
| {error, Reason},
Reason :: term().
%% @private
%% The supervisor's way of spawning a new connector.
start_link(Target) ->
proc_lib:start_link(?MODULE, init, [self(), Target]).
-spec init(Parent, Target) -> no_return()
when Parent :: pid(),
Target :: zx:host().
%% @private
%% gen_server callback. For more information refer to the OTP documentation.
init(Parent, Target) ->
ok = log(info, "Connecting to ~tp", [Target]),
Debug = sys:debug_options([]),
ok = proc_lib:init_ack(Parent, {ok, self()}),
connect(Parent, Debug, Target).
%%% Connection Procedure
-spec connect(Parent, Debug, Target) -> no_return()
when Parent :: pid(),
Debug :: [sys:dbg_opt()],
Target :: zx:host().
connect(Parent, Debug, {Host, Port}) ->
Options = [{packet, 4}, {mode, binary}, {active, true}],
case gen_tcp:connect(Host, Port, Options, 5000) of
{ok, Socket} ->
confirm_service(Parent, Debug, Socket);
{error, Error} ->
ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]),
ok = zx_daemon:report(failed)
terminate()
end.
-spec confirm_service(Parent, Debug, Socket) -> no_return()
when Parent :: pid(),
Debug :: [sys:dbg_opt()],
Socket :: gen_tcp:socket().
%% @private
%% Confirm the zomp node can handle "OTPR USER 1" and is accepting connections or try
%% another node.
confirm_service(Parent, Debug, Socket) ->
ok = gen_tcp:send(Socket, <<"OTPR USER 1">>),
receive
{tcp, Socket, Bin} ->
case binary_to_term(Bin, [safe]) of
ok ->
query_realms(Parent, Debug, Socket);
{redirect, Hosts} ->
ok = zx_daemon:report({redirect, Hosts}),
ok = zx_net:disconnect(Socket),
terminate()
end;
{tcp_closed, Socket} ->
handle_unexpected_close()
after 5000 ->
handle_timeout(Socket)
end.
-spec query_realms(Parent, Debug, Socket) -> no_return()
when Parent :: pid(),
Debug :: [sys:dbg_opt()],
Socket :: gen_tcp:socket().
%% @private
%% Confirm that the connected host has a valid serial for the realm zx is trying to
%% reach, and if not retry on another node.
query_realms(Parent, Debug, Socket) ->
ok = zx_net:send(Socket, list),
receive
{tcp, Socket, Bin} ->
{ok, Realms} = binary_to_term(Bin, [safe]),
ok = zx_daemon:report({connected, Realms}),
loop(Parent, Debug, Socket);
{tcp_closed, Socket} ->
handle_unexpected_close()
after 5000 ->
handle_timeout(Socket)
end.
%%% Service Loop
-spec loop(Parent, Debug, Socket) -> no_return()
when Parent :: pid(),
Debug :: [sys:dbg_opt()],
Socket :: gen_tcp:socket().
%% @private
%% Service loop. Messages incoming from the connected Zomp node, the zx_daemon, and
%% OTP system messages all come here. This is the only catch-all receive loop, so
%% messages that occur in a specific state must not be accidentally received here out
%% of order or else whatever sequenced communication was happening will be corrupted.
loop(Parent, Debug, Socket) ->
receive
{tcp, Socket, Bin} ->
ok = handle(Bin, Socket),
ok = inet:setopts(Socket, [{active, once}]),
loop(Parent, Debug, Socket);
stop ->
ok = zx_net:disconnect(Socket),
terminat();
Unexpected ->
ok = log(warning, "Unexpected message: ~tp", [Unexpected]),
loop(Parent, Debug, Socket)
end.
-spec handle(Bin, Socket) -> ok | no_return()
when Bin :: binary(),
Socket :: gen_tcp:socket().
%% @private
%% Single point to convert a binary message to a safe internal message. Actual handling
%% of the converted message occurs in dispatch/2.
handle(Bin, Socket) ->
Message = binary_to_term(Bin, [safe]),
ok = log(info, "Received network message: ~tp", [Message]),
dispatch(Message, Socket).
-spec dispatch(Message, Socket) -> ok | no_return()
when Message :: incoming(),
Socket :: gen_tcp:socket().
%% @private
%% Dispatch a procedure based on the received message.
%% Tranfers and other procedures that involve a sequence of messages occur in discrete
%% states defined in other functions -- this only dispatches based on a valid initial
%% message received in the default waiting-loop state.
dispatch(ping, Socket) ->
zx_net:send(Socket, pong);
dispatch(Invalid, Socket) ->
{ok, {Addr, Port}} = zomp:peername(Socket),
Host = inet:ntoa(Addr),
ok = log(warning, "Invalid message from ~tp:~p: ", [Invalid]),
ok = zx_net:disconnect(Socket),
terminate().
-spec fetch(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
Result :: ok.
%% @private
%% Download a package to the local cache.
fetch(Socket, PackageID) ->
{ok, LatestID} = request_zrp(Socket, PackageID),
ok = receive_zrp(Socket, LatestID),
log(info, "Fetched ~ts", [package_string(LatestID)]).
-spec request_zrp(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
Result :: {ok, Latest :: package_id()}
| {error, Reason :: timeout | term()}.
request_zrp(Socket, PackageID) ->
ok = zx_net:send(Socket, {fetch, PackageID}),
receive
{tcp, Socket, Bin} ->
case binary_to_term(Bin) of
{sending, LatestID} ->
{ok, LatestID};
Error = {error, Reason} ->
PackageString = package_string(PackageID),
Message = "Error receiving package ~ts: ~tp",
ok = log(info, Message, [PackageString, Reason]),
Error
end;
{tcp_closed, Socket} ->
handle_unexpected_close()
after 60000 ->
{error, timeout}
end.
-spec receive_zrp(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
Result :: ok | {error, timeout}.
receive_zrp(Socket, PackageID) ->
receive
{tcp, Socket, Bin} ->
ZrpPath = filename:join("zrp", zx_lib:namify_zrp(PackageID)),
ok = file:write_file(ZrpPath, Bin),
ok = zx_net:send(Socket, ok),
log(info, "Wrote ~ts", [ZrpPath]);
{tcp_closed, Socket} ->
handle_unexpected_close()
after 60000 ->
ok = log(error, "Timeout in socket receive for ~tp", [PackageID]),
{error, timeout}
end.
%%% Terminal handlers
-spec handle_unexpected_close() -> no_return().
handle_unexpected_close() ->
ok = zx_daemon:report(disconnected),
terminate().
-spec handle_timeout(gen_tcp:socket()) -> no_return()
handle_timeout(Socket) ->
ok = zx_daemon:report(timeout),
ok = disconnect(Socket),
terminate().
-spec terminate() -> no_return().
%% @private
%% Convenience wrapper around the suicide call.
%% In the case that a more formal retirement procedure is required, consider notifying
%% the supervisor with `supervisor:terminate_child(zomp_client_sup, PID)' and writing
%% a proper system_terminate/2.
terminate() ->
exit(normal).