Playing with connectivity ideas
This commit is contained in:
parent
7f352da2f3
commit
71e31c48b2
@ -11,7 +11,7 @@
|
||||
-copyright("Craig Everett <zxq9@zxq9.com>").
|
||||
-license("GPL-3.0").
|
||||
|
||||
-export([start/1, stop/0]).
|
||||
-export([start/1, stop/1]).
|
||||
-export([start_link/1]).
|
||||
|
||||
-include("zx_logger.erl").
|
||||
@ -25,18 +25,41 @@
|
||||
Result :: {ok, pid()}
|
||||
| {error, Reason},
|
||||
Reason :: term().
|
||||
%% @doc
|
||||
%% Starts a connection to a given target Zomp node. This call itself should never fail,
|
||||
%% but this process may fail to connect or crash immediately after spawning. Should
|
||||
%% only be called by zx_daemon.
|
||||
|
||||
start(Target) ->
|
||||
zx_conn_sup:start_conn(Target).
|
||||
|
||||
|
||||
-spec stop(Conn :: pid()) -> ok.
|
||||
%% @doc
|
||||
%% Signals the connection to disconnect and retire immediately.
|
||||
|
||||
stop(Conn) ->
|
||||
Conn ! stop,
|
||||
ok.
|
||||
|
||||
|
||||
-spec subscribe(Conn, Realm) -> ok
|
||||
when Conn :: pid(),
|
||||
Realm :: zx:realm(),
|
||||
Result :: ok.
|
||||
|
||||
subscribe(Conn, Realm) ->
|
||||
Conn ! {subscribe, Realm},
|
||||
ok.
|
||||
|
||||
|
||||
-spec start_link(Target) ->
|
||||
when Target :: zx:host(),
|
||||
Result :: {ok, pid()}
|
||||
| {error, Reason},
|
||||
Reason :: term().
|
||||
%% @private
|
||||
%% Starts a connector with a target host in its state.
|
||||
%% The supervisor's way of spawning a new connector.
|
||||
|
||||
start_link(Target) ->
|
||||
proc_lib:start_link(?MODULE, init, [self(), Target]).
|
||||
@ -45,6 +68,8 @@ start_link(Target) ->
|
||||
-spec init(Parent, Target) -> no_return()
|
||||
when Parent :: pid(),
|
||||
Target :: zx:host().
|
||||
%% @private
|
||||
%% gen_server callback. For more information refer to the OTP documentation.
|
||||
|
||||
init(Parent, Target) ->
|
||||
ok = log(info, "Connecting to ~tp", [Target]),
|
||||
@ -53,163 +78,133 @@ init(Parent, Target) ->
|
||||
connect(Parent, Debug, Target).
|
||||
|
||||
|
||||
-spec connect(Parent, Debug, Target) -> no_return().
|
||||
|
||||
%%% Connection Procedure
|
||||
|
||||
-spec connect(Parent, Debug, Target) -> no_return()
|
||||
when Parent :: pid(),
|
||||
Debug :: [sys:dbg_opt()],
|
||||
Target :: zx:host().
|
||||
|
||||
connect(Parent, Debug, {Host, Port}) ->
|
||||
Options = [{packet, 4}, {mode, binary}, {active, true}],
|
||||
case gen_tcp:connect(Host, Port, Options, 5000) of
|
||||
{ok, Socket} ->
|
||||
confirm(Parent, Debug, Socket);
|
||||
confirm_service(Parent, Debug, Socket);
|
||||
{error, Error} ->
|
||||
ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]),
|
||||
ok = zx_daemon:report(
|
||||
connect_user(Realm, Rest)
|
||||
ok = zx_daemon:report(failed)
|
||||
terminate()
|
||||
end.
|
||||
|
||||
|
||||
confirm(Parent, Debug, Socket) ->
|
||||
|
||||
|
||||
-spec connect_user(realm()) -> gen_tcp:socket() | no_return().
|
||||
%% @private
|
||||
%% Connect to a given realm, whatever method is required.
|
||||
|
||||
connect_user(Realm) ->
|
||||
ok = log(info, "Connecting to realm ~ts...", [Realm]),
|
||||
Hosts =
|
||||
case file:consult(zx_lib:hosts_cache_file(Realm)) of
|
||||
{ok, Cached} -> Cached;
|
||||
{error, enoent} -> []
|
||||
end,
|
||||
connect_user(Realm, Hosts).
|
||||
|
||||
|
||||
-spec connect_user(realm(), [host()]) -> gen_tcp:socket() | no_return().
|
||||
%% @private
|
||||
%% Try to connect to a subordinate host, if there are none then connect to prime.
|
||||
|
||||
connect_user(Realm, []) ->
|
||||
{Host, Port} = zx_lib:get_prime(Realm),
|
||||
HostString =
|
||||
case io_lib:printable_unicode_list(Host) of
|
||||
true -> Host;
|
||||
false -> inet:ntoa(Host)
|
||||
end,
|
||||
ok = log(info, "Trying prime at ~ts:~160tp", [HostString, Port]),
|
||||
case gen_tcp:connect(Host, Port, connect_options(), 5000) of
|
||||
{ok, Socket} ->
|
||||
confirm_user(Realm, Socket, []);
|
||||
{error, Error} ->
|
||||
ok = log(warning, "Connection problem with prime: ~tp", [Error]),
|
||||
halt(0)
|
||||
end;
|
||||
connect_user(Realm, Hosts = [Node = {Host, Port} | Rest]) ->
|
||||
ok = log(info, "Trying node at ~ts:~tp", [inet:ntoa(Host), Port]),
|
||||
case gen_tcp:connect(Host, Port, connect_options(), 5000) of
|
||||
{ok, Socket} ->
|
||||
confirm_user(Realm, Socket, Hosts);
|
||||
{error, Error} ->
|
||||
ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]),
|
||||
connect_user(Realm, Rest)
|
||||
end.
|
||||
|
||||
|
||||
-spec confirm_user(Realm, Socket, Hosts) -> Socket | no_return()
|
||||
when Realm :: realm(),
|
||||
Socket :: gen_tcp:socket(),
|
||||
Hosts :: [host()].
|
||||
-spec confirm_service(Parent, Debug, Socket) -> no_return()
|
||||
when Parent :: pid(),
|
||||
Debug :: [sys:dbg_opt()],
|
||||
Socket :: gen_tcp:socket().
|
||||
%% @private
|
||||
%% Confirm the zomp node can handle "OTPR USER 1" and is accepting connections or try
|
||||
%% another node.
|
||||
|
||||
confirm_user(Realm, Socket, Hosts) ->
|
||||
{ok, {Addr, Port}} = inet:peername(Socket),
|
||||
Host = inet:ntoa(Addr),
|
||||
confirm_service(Parent, Debug, Socket) ->
|
||||
ok = gen_tcp:send(Socket, <<"OTPR USER 1">>),
|
||||
receive
|
||||
{tcp, Socket, Bin} ->
|
||||
case binary_to_term(Bin) of
|
||||
case binary_to_term(Bin, [safe]) of
|
||||
ok ->
|
||||
ok = log(info, "Connected to ~ts:~p", [Host, Port]),
|
||||
confirm_serial(Realm, Socket, Hosts);
|
||||
{redirect, Next} ->
|
||||
ok = log(info, "Redirected..."),
|
||||
ok = disconnect(Socket),
|
||||
connect_user(Realm, Next ++ Hosts)
|
||||
query_realms(Parent, Debug, Socket);
|
||||
{redirect, Hosts} ->
|
||||
ok = zx_daemon:report({redirect, Hosts}),
|
||||
ok = zx_net:disconnect(Socket),
|
||||
terminate()
|
||||
end;
|
||||
{tcp_closed, Socket} ->
|
||||
halt_on_unexpected_close()
|
||||
handle_unexpected_close()
|
||||
after 5000 ->
|
||||
ok = log(warning, "Host ~ts:~p timed out.", [Host, Port]),
|
||||
ok = disconnect(Socket),
|
||||
connect_user(Realm, Hosts)
|
||||
handle_timeout(Socket)
|
||||
end.
|
||||
|
||||
|
||||
-spec confirm_serial(Realm, Socket, Hosts) -> Socket | no_return()
|
||||
when Realm :: realm(),
|
||||
Socket :: gen_tcp:socket(),
|
||||
Hosts :: [host()].
|
||||
-spec query_realms(Parent, Debug, Socket) -> no_return()
|
||||
when Parent :: pid(),
|
||||
Debug :: [sys:dbg_opt()],
|
||||
Socket :: gen_tcp:socket().
|
||||
%% @private
|
||||
%% Confirm that the connected host has a valid serial for the realm zx is trying to
|
||||
%% reach, and if not retry on another node.
|
||||
|
||||
confirm_serial(Realm, Socket, Hosts) ->
|
||||
SerialFile = filename:join(zx_lib:zomp_home(), "realm.serials"),
|
||||
Serials =
|
||||
case file:consult(SerialFile) of
|
||||
{ok, Ss} -> Ss;
|
||||
{error, enoent} -> []
|
||||
end,
|
||||
Serial =
|
||||
case lists:keyfind(Realm, 1, Serials) of
|
||||
false -> 0;
|
||||
{Realm, S} -> S
|
||||
end,
|
||||
ok = send(Socket, {latest, Realm}),
|
||||
query_realms(Parent, Debug, Socket) ->
|
||||
ok = zx_net:send(Socket, list),
|
||||
receive
|
||||
{tcp, Socket, Bin} ->
|
||||
case binary_to_term(Bin) of
|
||||
{ok, Serial} ->
|
||||
ok = log(info, "Node's serial same as ours."),
|
||||
Socket;
|
||||
{ok, Current} when Current > Serial ->
|
||||
ok = log(info, "Node's serial newer than ours. Storing."),
|
||||
NewSerials = lists:keystore(Realm, 1, Serials, {Realm, Current}),
|
||||
{ok, Host} = inet:peername(Socket),
|
||||
CacheFile = zx_lib:hosts_cache_file(Realm),
|
||||
ok = zx_lib:write_terms(CacheFile, [Host | Hosts]),
|
||||
ok = zx_lib:write_terms(SerialFile, NewSerials),
|
||||
Socket;
|
||||
{ok, Current} when Current < Serial ->
|
||||
log(info, "Our serial: ~tp, node serial: ~tp.", [Serial, Current]),
|
||||
ok = log(info, "Node's serial older than ours. Trying another."),
|
||||
ok = disconnect(Socket),
|
||||
connect_user(Realm, Hosts);
|
||||
{error, bad_realm} ->
|
||||
ok = log(info, "Node is no longer serving realm. Trying another."),
|
||||
ok = disconnect(Socket),
|
||||
connect_user(Realm, Hosts)
|
||||
end;
|
||||
{ok, Realms} = binary_to_term(Bin, [safe]),
|
||||
ok = zx_daemon:report({connected, Realms}),
|
||||
loop(Parent, Debug, Socket);
|
||||
{tcp_closed, Socket} ->
|
||||
halt_on_unexpected_close()
|
||||
handle_unexpected_close()
|
||||
after 5000 ->
|
||||
ok = log(info, "Host timed out on confirm_serial. Trying another."),
|
||||
ok = disconnect(Socket),
|
||||
connect_user(Realm, Hosts)
|
||||
handle_timeout(Socket)
|
||||
end.
|
||||
|
||||
|
||||
|
||||
-spec halt_on_unexpected_close() -> no_return().
|
||||
%%% Service Loop
|
||||
|
||||
halt_on_unexpected_close() ->
|
||||
ok = log(warning, "Socket closed unexpectedly."),
|
||||
halt(1).
|
||||
-spec loop(Parent, Debug, Socket) -> no_return()
|
||||
when Parent :: pid(),
|
||||
Debug :: [sys:dbg_opt()],
|
||||
Socket :: gen_tcp:socket().
|
||||
%% @private
|
||||
%% Service loop. Messages incoming from the connected Zomp node, the zx_daemon, and
|
||||
%% OTP system messages all come here. This is the only catch-all receive loop, so
|
||||
%% messages that occur in a specific state must not be accidentally received here out
|
||||
%% of order or else whatever sequenced communication was happening will be corrupted.
|
||||
|
||||
loop(Parent, Debug, Socket) ->
|
||||
receive
|
||||
{tcp, Socket, Bin} ->
|
||||
ok = handle(Bin, Socket),
|
||||
ok = inet:setopts(Socket, [{active, once}]),
|
||||
loop(Parent, Debug, Socket);
|
||||
stop ->
|
||||
ok = zx_net:disconnect(Socket),
|
||||
terminat();
|
||||
Unexpected ->
|
||||
ok = log(warning, "Unexpected message: ~tp", [Unexpected]),
|
||||
loop(Parent, Debug, Socket)
|
||||
end.
|
||||
|
||||
|
||||
-spec handle(Bin, Socket) -> ok | no_return()
|
||||
when Bin :: binary(),
|
||||
Socket :: gen_tcp:socket().
|
||||
%% @private
|
||||
%% Single point to convert a binary message to a safe internal message. Actual handling
|
||||
%% of the converted message occurs in dispatch/2.
|
||||
|
||||
handle(Bin, Socket) ->
|
||||
Message = binary_to_term(Bin, [safe]),
|
||||
ok = log(info, "Received network message: ~tp", [Message]),
|
||||
dispatch(Message, Socket).
|
||||
|
||||
|
||||
-spec dispatch(Message, Socket) -> ok | no_return()
|
||||
when Message :: incoming(),
|
||||
Socket :: gen_tcp:socket().
|
||||
%% @private
|
||||
%% Dispatch a procedure based on the received message.
|
||||
%% Tranfers and other procedures that involve a sequence of messages occur in discrete
|
||||
%% states defined in other functions -- this only dispatches based on a valid initial
|
||||
%% message received in the default waiting-loop state.
|
||||
|
||||
dispatch(ping, Socket) ->
|
||||
zx_net:send(Socket, pong);
|
||||
dispatch(Invalid, Socket) ->
|
||||
{ok, {Addr, Port}} = zomp:peername(Socket),
|
||||
Host = inet:ntoa(Addr),
|
||||
ok = log(warning, "Invalid message from ~tp:~p: ", [Invalid]),
|
||||
ok = zx_net:disconnect(Socket),
|
||||
terminate().
|
||||
|
||||
|
||||
-spec fetch(Socket, PackageID) -> Result
|
||||
@ -232,7 +227,7 @@ fetch(Socket, PackageID) ->
|
||||
| {error, Reason :: timeout | term()}.
|
||||
|
||||
request_zrp(Socket, PackageID) ->
|
||||
ok = send(Socket, {fetch, PackageID}),
|
||||
ok = zx_net:send(Socket, {fetch, PackageID}),
|
||||
receive
|
||||
{tcp, Socket, Bin} ->
|
||||
case binary_to_term(Bin) of
|
||||
@ -245,7 +240,7 @@ request_zrp(Socket, PackageID) ->
|
||||
Error
|
||||
end;
|
||||
{tcp_closed, Socket} ->
|
||||
halt_on_unexpected_close()
|
||||
handle_unexpected_close()
|
||||
after 60000 ->
|
||||
{error, timeout}
|
||||
end.
|
||||
@ -261,11 +256,40 @@ receive_zrp(Socket, PackageID) ->
|
||||
{tcp, Socket, Bin} ->
|
||||
ZrpPath = filename:join("zrp", zx_lib:namify_zrp(PackageID)),
|
||||
ok = file:write_file(ZrpPath, Bin),
|
||||
ok = send(Socket, ok),
|
||||
ok = zx_net:send(Socket, ok),
|
||||
log(info, "Wrote ~ts", [ZrpPath]);
|
||||
{tcp_closed, Socket} ->
|
||||
halt_on_unexpected_close()
|
||||
handle_unexpected_close()
|
||||
after 60000 ->
|
||||
ok = log(error, "Timeout in socket receive for ~tp", [PackageID]),
|
||||
{error, timeout}
|
||||
end.
|
||||
|
||||
|
||||
|
||||
%%% Terminal handlers
|
||||
|
||||
-spec handle_unexpected_close() -> no_return().
|
||||
|
||||
handle_unexpected_close() ->
|
||||
ok = zx_daemon:report(disconnected),
|
||||
terminate().
|
||||
|
||||
|
||||
-spec handle_timeout(gen_tcp:socket()) -> no_return()
|
||||
|
||||
handle_timeout(Socket) ->
|
||||
ok = zx_daemon:report(timeout),
|
||||
ok = disconnect(Socket),
|
||||
terminate().
|
||||
|
||||
|
||||
-spec terminate() -> no_return().
|
||||
%% @private
|
||||
%% Convenience wrapper around the suicide call.
|
||||
%% In the case that a more formal retirement procedure is required, consider notifying
|
||||
%% the supervisor with `supervisor:terminate_child(zomp_client_sup, PID)' and writing
|
||||
%% a proper system_terminate/2.
|
||||
|
||||
terminate() ->
|
||||
exit(normal).
|
||||
|
||||
@ -10,7 +10,7 @@
|
||||
-copyright("Craig Everett <zxq9@zxq9.com>").
|
||||
-license("GPL-3.0").
|
||||
|
||||
-export([start_conn/1]).
|
||||
-export([start_conn/2]).
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
@ -18,8 +18,9 @@
|
||||
|
||||
%%% Interface Functions
|
||||
|
||||
-spec start_conn(Host) -> Result
|
||||
-spec start_conn(Host, Serial) -> Result
|
||||
when Host :: zx:host(),
|
||||
Serial :: zx:serial(),
|
||||
Result :: {ok, pid()}
|
||||
| {error, Reason},
|
||||
Reason :: term().
|
||||
@ -27,8 +28,8 @@
|
||||
%% Start an upstream connection handler.
|
||||
%% (Should only be called from zx_conn).
|
||||
|
||||
start_conn(Host) ->
|
||||
supervisor:start_child(?MODULE, [Host]).
|
||||
start_conn(Host, Serial) ->
|
||||
supervisor:start_child(?MODULE, [Host, Serial]).
|
||||
|
||||
|
||||
|
||||
|
||||
@ -3,15 +3,54 @@
|
||||
%%%
|
||||
%%% Resident execution daemon and runtime interface to Zomp.
|
||||
%%%
|
||||
%%% The daemon lives in the system and does background tasks and also acts as the
|
||||
%%% serial interface for any complex functions involving network tasks that may fail
|
||||
%%% and need to be retried or may span realms.
|
||||
%%% The daemon lives in the background once started and awaits action requests from
|
||||
%%% callers running within the system. The daemon is only capable of handling
|
||||
%%% unprivileged (user) actions such as querying a Zomp node or fetching packages.
|
||||
%%%
|
||||
%%% In particular, the functions accessible to programs launched by ZX can interact
|
||||
%%% with Zomp realms via the zx_daemon, and non-administrative tasks that involve
|
||||
%%% maintaining a connection with a Zomp constellation can be abstracted behind the
|
||||
%%% zx_daemon. Administrative tasks, however, essentially stateless request/response
|
||||
%%% pairs.
|
||||
%%%
|
||||
%%% Connection handling
|
||||
%%%
|
||||
%%% The daemon is structured as a service manager in a service -> worker structure.
|
||||
%%% http://zxq9.com/archives/1311
|
||||
%%% It is in charge of the high-level task of servicing requested actions and returning
|
||||
%%% responses to callers as well as mapping successful connections to configured realms
|
||||
%%% and repairing failed connections to various realms by searching for and directing
|
||||
%%% that connections be made to various realms to satisfy action request requirements.
|
||||
%%%
|
||||
%%% When the zx_daemon is started it checks local configuration and cache files to
|
||||
%%% determine what realms must be found and what cached Zomp nodes it is aware of.
|
||||
%%% It populates an internal record #hx{} (typed as host_index()) with realm config
|
||||
%%% and host cache data and then immediately initiates three connection attempts to
|
||||
%%% cached nodes for each realm configured. If a host is known to service more than
|
||||
%%% one configured realm only one attempt will be made at a time to connect to it.
|
||||
%%% If all cached hosts for a given realm have been tried already, or if none are
|
||||
%%% known, then the daemon will direct a connection be made to the prime node.
|
||||
%%%
|
||||
%%% Once the connection attempts have been initiated (via zx_conn:start/1) the daemon
|
||||
%%% waits in receive for either a connection report from a connection or an action
|
||||
%%% request from elsewhere in the system.
|
||||
%%%
|
||||
%%% A connection request is made with a call to report/1 and indicates to the daemon
|
||||
%%% whether a connection has failed, been disconneocted, redirected, or succeeded.
|
||||
%%% Connection handling is as follows:
|
||||
%%% - A failure can occur at any time. In the event a connected and assigned zx_conn
|
||||
%%% has failed (whether it was connected and assigned, or never succeeded at all) the
|
||||
%%% target host will be dropped from the hosts cache and another attempt will be made
|
||||
%%% in its place if other hosts are known.
|
||||
%%% - If a connection is disconnected then the host will be placed at the back of the
|
||||
%%% hosts cache.
|
||||
%%% - If a connection is redirected then the redirecting host will be placed at the
|
||||
%%% back of the hosts cache and the list of new Zomp nodes provided by the redirect
|
||||
%%% will be added at the front of the connect queue.
|
||||
%%% - In the event a connection succeeds then the list of provided realms and their
|
||||
%%% current serials will be compared to the list of known realms and serials, and
|
||||
%%% the host will be added to the relevant realm host index if not already present
|
||||
%%% and the provided serial is newer than the currently known one (but the realm
|
||||
%%% serial will not be updated at this point, only the host added). After the
|
||||
%%% host's record is updated across the realm indices the daemon will assign it to
|
||||
%%% whatever realms it provides that are not yet services by a connection, and in
|
||||
%%% the case it does not service any required and unassigned realms it will be
|
||||
%%% instructed to disconnect.
|
||||
%%% @end
|
||||
|
||||
-module(zx_daemon).
|
||||
@ -38,20 +77,24 @@
|
||||
|
||||
-record(s,
|
||||
{meta = none :: none | zx:package_meta(),
|
||||
dir = none :: none | file:filename(),
|
||||
home = none :: none | file:filename(),
|
||||
argv = none :: none | [string()],
|
||||
reqp = none :: none | pid(),
|
||||
reqm = none :: none | reference(),
|
||||
connp = none :: none | pid(),
|
||||
connm = none :: none | reference(),
|
||||
prime = none :: none | zx:realm(),
|
||||
hosts = [] :: #s{zx:realm() := [zx:host()]}}).
|
||||
actions = queue:new() :: queue:queue(action()),
|
||||
realms = #{} :: #{zx:realm() := zx:serial()},
|
||||
primes = #{} :: #{zx:realm() := zx:host()},
|
||||
hosts = #{} :: #{zx:realm() := [zx:host()]},
|
||||
conns = [] :: [{pid(), reference()}]}).
|
||||
|
||||
|
||||
-type state() :: #s{}.
|
||||
|
||||
-type hosts() :: #{zx:realm() := [zx:host()]}.
|
||||
-type conn_report() :: {connected, Realms :: [zx:realm()]}
|
||||
| conn_fail
|
||||
-type action() :: {subscribe, zx:package()},
|
||||
| unsubscribe
|
||||
| {list,
|
||||
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
|
||||
| failed
|
||||
| disconnected.
|
||||
|
||||
|
||||
@ -175,12 +218,13 @@ fetch(PackageIDs) ->
|
||||
%%% Connection interface
|
||||
|
||||
-spec report(Message) -> ok
|
||||
when Message :: {connected, Realms :: [zx:realm()]}
|
||||
| conn_fail
|
||||
when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
|
||||
| {redirect, Hosts :: [{zx:host(), [zx:realm()]}]}
|
||||
| failed
|
||||
| disconnected.
|
||||
%% @private
|
||||
%% Should only be called by a zx_conn. This function is how a zx_conn reports its
|
||||
%% current connection status.
|
||||
%% current connection status and job results.
|
||||
|
||||
report(Message) ->
|
||||
gen_server:cast(?MODULE, {report, self(), Message}).
|
||||
@ -199,7 +243,21 @@ start_link() ->
|
||||
-spec init(none) -> {ok, state()}.
|
||||
|
||||
init(none) ->
|
||||
{ok, #s{}}.
|
||||
SerialFile = filename:join(zx_lib:zomp_home(), "realm.serials"),
|
||||
Serials =
|
||||
case file:consult(SerialFile) of
|
||||
{ok, Ss} ->
|
||||
maps:from_list(Ss);
|
||||
{error, enoent} ->
|
||||
ok = log(info, "Initializing zomp/realm.serials..."),
|
||||
maps:new();
|
||||
{error, Reason} ->
|
||||
Message = "Reading zomp/realm.serials failed with ~tp. Recreating..."
|
||||
ok = log(error, Message, [Reason]),
|
||||
maps:new()
|
||||
end,
|
||||
State = #s{serials = Serials},
|
||||
{ok, State}.
|
||||
|
||||
|
||||
|
||||
@ -291,9 +349,15 @@ do_pass_meta(Meta, Dir, ArgV, State) ->
|
||||
|
||||
do_subscribe(Requestor,
|
||||
{Realm, Name},
|
||||
State = #s{name = none, connp = none, reqp = none, hosts = Hosts}) ->
|
||||
State = #s{name = none, connp = none, reqp = none,
|
||||
hosts = Hosts, serials = Serials}) ->
|
||||
Monitor = monitor(process, Requestor),
|
||||
{Host, NewHosts} = select_host(Realm, Hosts),
|
||||
Serial =
|
||||
case lists:keyfind(Realm, 1, Serials) of
|
||||
false -> 0;
|
||||
{Realm, S} -> S
|
||||
end,
|
||||
{ok, ConnP} = zx_conn:start(Host),
|
||||
ConnM = monitor(process, ConnP),
|
||||
NewState = State#s{realm = Realm, name = Name,
|
||||
@ -381,6 +445,7 @@ do_report(From, {connected, Realms}, State = #s{
|
||||
|
||||
|
||||
|
||||
|
||||
-spec do_fetch(PackageIDs) -> Result
|
||||
when PackageIDs :: [zx:package_id()],
|
||||
Result :: ok
|
||||
|
||||
@ -25,7 +25,8 @@
|
||||
package_id/1, package_string/1,
|
||||
package_dir/1, package_dir/2,
|
||||
namify_zrp/1, namify_tgz/1,
|
||||
find_latest_compatible/2, installed/1]).
|
||||
find_latest_compatible/2, installed/1,
|
||||
realm_conf/1, load_realm_conf/1]).
|
||||
|
||||
-include("zx_logger.hrl").
|
||||
|
||||
@ -445,7 +446,7 @@ package_dir(Prefix, {Realm, Name}) ->
|
||||
|
||||
|
||||
-spec namify_zrp(PackageID) -> ZrpFileName
|
||||
when PackageID :: package_id(),
|
||||
when PackageID :: zx:package_id(),
|
||||
ZrpFileName :: file:filename().
|
||||
%% @private
|
||||
%% Map an PackageID to its correct .zrp package file name.
|
||||
@ -454,7 +455,7 @@ namify_zrp(PackageID) -> namify(PackageID, "zrp").
|
||||
|
||||
|
||||
-spec namify_tgz(PackageID) -> TgzFileName
|
||||
when PackageID :: package_id(),
|
||||
when PackageID :: zx:package_id(),
|
||||
TgzFileName :: file:filename().
|
||||
%% @private
|
||||
%% Map an PackageID to its correct gzipped tarball source bundle filename.
|
||||
@ -463,7 +464,7 @@ namify_tgz(PackageID) -> namify(PackageID, "tgz").
|
||||
|
||||
|
||||
-spec namify(PackageID, Suffix) -> FileName
|
||||
when PackageID :: package_id(),
|
||||
when PackageID :: zx:package_id(),
|
||||
Suffix :: string(),
|
||||
FileName :: file:filename().
|
||||
%% @private
|
||||
@ -535,7 +536,7 @@ realm_conf(Realm) ->
|
||||
|
||||
|
||||
-spec load_realm_conf(Realm) -> Result
|
||||
when Realm :: realm(),
|
||||
when Realm :: zx:realm(),
|
||||
Result :: {ok, RealmConf}
|
||||
| {error, Reason},
|
||||
RealmConf :: list(),
|
||||
|
||||
68
zomp/lib/otpr-zx/0.1.0/src/zx_net.erl
Normal file
68
zomp/lib/otpr-zx/0.1.0/src/zx_net.erl
Normal file
@ -0,0 +1,68 @@
|
||||
%%% @doc
|
||||
%%% ZX Network Functions
|
||||
%%%
|
||||
%%% A few common network functions concentrated in one place.
|
||||
%%% @end
|
||||
|
||||
-module(zx_net).
|
||||
-author("Craig Everett <zxq9@zxq9.com>").
|
||||
-copyright("Craig Everett <zxq9@zxq9.com>").
|
||||
-license("GPL-3.0").
|
||||
|
||||
-export([send/2, disconnect/1]).
|
||||
|
||||
-include("zx_logger.hrl").
|
||||
|
||||
|
||||
-spec send(Socket, Message) -> Result
|
||||
when Socket :: gen_tcp:socket(),
|
||||
Message :: term(),
|
||||
Result :: ok
|
||||
| {error, Reason},
|
||||
Reason :: closed | inet:posix().
|
||||
%% @doc
|
||||
%% Packages an Erlang term and sends it to the indicated socket.
|
||||
|
||||
send(Socket, Message) ->
|
||||
BinMessage = term_to_binary(Message),
|
||||
gen_tcp:send(Socket, BinMessage).
|
||||
|
||||
|
||||
-spec disconnect(Socket) -> ok
|
||||
when Socket :: gen_tcp:socket().
|
||||
%% @doc
|
||||
%% Disconnects from a socket, handling the case where the socket is already
|
||||
%% disconnected on the other side.
|
||||
|
||||
disconnect(Socket) ->
|
||||
case zomp:peername(Socket) of
|
||||
{ok, {Addr, Port}} ->
|
||||
Host = inet:ntoa(Addr),
|
||||
disconnect(Socket, Host, Port);
|
||||
{error, Reason} ->
|
||||
log(warning, "Disconnect failed with: ~p", [Reason])
|
||||
end.
|
||||
|
||||
|
||||
-spec disconnect(Socket, Host, Port) -> ok
|
||||
when Socket :: gen_tcp:socket(),
|
||||
Host :: string(),
|
||||
Port :: inet:port_number().
|
||||
|
||||
disconnect(Socket, Host, Port) ->
|
||||
case gen_tcp:shutdown(Socket, read_write) of
|
||||
ok ->
|
||||
log(info, "~ts:~w disconnected", [Host, Port]);
|
||||
{error, enotconn} ->
|
||||
log(info, "~ts:~w disconnected", [Host, Port]),
|
||||
receive
|
||||
{tcp_closed, Socket} -> ok
|
||||
after 0 -> ok
|
||||
end;
|
||||
{error, E} ->
|
||||
log(warning, "~ts:~w disconnect failed with: ~p", [Host, Port, E]),
|
||||
receive
|
||||
{tcp_closed, Socket} -> ok
|
||||
after 0 -> ok
|
||||
end
|
||||
end.
|
||||
Loading…
x
Reference in New Issue
Block a user