This commit is contained in:
2018-04-23 07:58:56 +09:00
parent 66b65d3fd4
commit 2e9df29149
5 changed files with 206 additions and 80 deletions
+1
View File
@@ -2492,6 +2492,7 @@ usage() ->
" zx add package PackageName~n"
" zx add packager PackageName~n"
" zx add maintainer PackageName~n"
" zx add sysop UserID~n"
" zx review PackageID~n"
" zx approve PackageID~n"
" zx reject PackageID~n"
+162 -56
View File
@@ -1,4 +1,4 @@
%%% @doc
%% @doc
%%% ZX Connector
%%%
%%% This module represents a connection to a Zomp server.
@@ -11,12 +11,59 @@
-copyright("Craig Everett <zxq9@zxq9.com>").
-license("GPL-3.0").
-export([subscribe/2, unsubscribe/2, fetch/3, query/3]).
-export([start/1, stop/1]).
-export([start_link/1]).
-export([start_link/1, init/2]).
-include("zx_logger.erl").
-include("zx_logger.hrl").
%%% Types
-type incoming() :: ping
| {sub, Channel :: term(), Message :: term()}
| term().
%%% Interface
-spec subscribe(Conn, Package) -> ok
when Conn :: pid(),
Package :: zx:package().
subscribe(Conn, Realm) ->
Conn ! {subscribe, Realm},
ok.
-spec unsubscribe(Conn, Package) -> ok
when Conn :: pid(),
Package :: zx:package().
unsubscribe(Conn, Package) ->
Conn ! {unsubscribe, Package},
ok.
-spec fetch(Conn, ID, Object) -> ok
when Conn :: pid(),
ID :: zx_daemon:id(),
Object :: zx_daemon:object().
fetch(Conn, ID, Object) ->
Conn ! {fetch, ID, Object},
ok.
-spec query(Conn, ID, Action) -> ok
when Conn :: pid(),
ID :: zx_daemon:id(),
Action :: zx_daemon:action().
query(Conn, ID, Action) ->
Conn ! {query, ID, Action},
ok.
%%% Startup
@@ -42,25 +89,7 @@ stop(Conn) ->
ok.
-spec subscribe(Conn, Package) -> ok
when Conn :: pid(),
Package :: zx:package().
subscribe(Conn, Realm) ->
Conn ! {subscribe, Realm},
ok.
-spec unsubscribe(Conn, Package) -> ok
when Conn :: pid(),
Package :: zx:package().
unsubscribe(Conn, Package) ->
Conn ! {unsubscribe, Package},
ok.
-spec start_link(Target) ->
-spec start_link(Target) -> Result
when Target :: zx:host(),
Result :: {ok, pid()}
| {error, Reason},
@@ -94,13 +123,13 @@ init(Parent, Target) ->
Target :: zx:host().
connect(Parent, Debug, {Host, Port}) ->
Options = [{packet, 4}, {mode, binary}, {active, true}],
Options = [{packet, 4}, {mode, binary}, {nodelay, true}, {active, true}],
case gen_tcp:connect(Host, Port, Options, 5000) of
{ok, Socket} ->
confirm_service(Parent, Debug, Socket);
{error, Error} ->
ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]),
ok = zx_daemon:report(disconnected)
ok = log(warning, "Connection problem with ~tp: ~tp", [Host, Error]),
ok = zx_daemon:report(failed),
terminate()
end.
@@ -170,12 +199,23 @@ query_realms(Parent, Debug, Socket) ->
loop(Parent, Debug, Socket) ->
receive
{tcp, Socket, Bin} ->
ok = handle(Bin, Socket),
ok = handle_message(Socket, Bin),
ok = inet:setopts(Socket, [{active, once}]),
loop(Parent, Debug, Socket);
{subscribe, Package} ->
ok = zx_net:send(Socket, {subscribe, Package}),
loop(Parent, Debug, Socket);
{unsubscribe, Package} ->
ok = zx_net:send(Socket, {unsubscribe, Package}),
loop(Parent, Debug, Socket);
{fetch, ID, Object} ->
{ok, Outcome} = handle_fetch(Socket, Object),
ok = zx_daemon:result(ID, Outcome),
loop(Parent, Debug, Socket);
{query, ID, Action} ->
{ok, Outcome} = handle_query(Socket, Action),
ok = zx_daemon:result(ID, Outcome),
loop(Parent, Debug, Socket);
stop ->
ok = zx_net:disconnect(Socket),
terminate();
@@ -185,41 +225,106 @@ loop(Parent, Debug, Socket) ->
end.
-spec handle(Bin, Socket) -> ok | no_return()
when Bin :: binary(),
Socket :: gen_tcp:socket().
%%% Idle Incoming Upstream Messages
-spec handle_message(Socket, Bin) -> ok | no_return()
when Socket :: gen_tcp:socket(),
Bin :: binary().
%% @private
%% Single point to convert a binary message to a safe internal message. Actual handling
%% of the converted message occurs in dispatch/2.
handle(Bin, Socket) ->
handle_message(Socket, Bin) ->
Message = binary_to_term(Bin, [safe]),
ok = log(info, "Received network message: ~tp", [Message]),
dispatch(Message, Socket).
case binary_to_term(Bin, [safe]) of
ping ->
zx_net:send(Socket, pong);
{sub, Channel, Message} ->
log("Sub: ~tp - ~tp", [Channel, Message]);
{update, Message} ->
log("Update: ~tp", [Message]);
{redirect, Nodes} ->
log("Redirected to ~tp", [Nodes]);
Invalid ->
{ok, {Addr, Port}} = zomp:peername(Socket),
Host = inet:ntoa(Addr),
ok = log(warning, "Invalid message from ~tp:~p: ", [Invalid]),
ok = zx_net:disconnect(Socket),
terminate()
end.
-spec dispatch(Message, Socket) -> ok | no_return()
when Message :: incoming(),
Socket :: gen_tcp:socket().
%% @private
%% Dispatch a procedure based on the received message.
%% Tranfers and other procedures that involve a sequence of messages occur in discrete
%% states defined in other functions -- this only dispatches based on a valid initial
%% message received in the default waiting-loop state.
dispatch(ping, Socket) ->
zx_net:send(Socket, pong);
dispatch(Invalid, Socket) ->
{ok, {Addr, Port}} = zomp:peername(Socket),
Host = inet:ntoa(Addr),
ok = log(warning, "Invalid message from ~tp:~p: ", [Invalid]),
ok = zx_net:disconnect(Socket),
terminate().
%%% Incoming Request Actions
-spec handle_request(Socket, Action) -> Result
when Socket :: gen_tcp:socket(),
Action :: term(),
Result :: {ok, Outcome :: term()}.
handle_request(Socket, Action) ->
ok = zx_net:send(Socket, Action),
case element(1, Action) of
list ->
do_list(Action, Socket);
latest ->
do_latest(Action, Socket);
fetch ->
do_fetch(Action, Socket);
key ->
do_key(Action, Socket);
pending ->
do_pending(Action, Socket);
packagers ->
do_packagers(Action, Socket);
maintainers ->
do_maintainers(Action, Socket);
sysops ->
do_sysops(Action, Socket)
end,
handle_response(Socket, Response).
handle_response(Socket, Command) ->
receive
{tcp, Socket, Bin} ->
Outcome = binary_to_term(Bin, [safe]),
interpret_response(Socket, Outcome, Command);
{tcp_closed, Socket} ->
handle_unexpected_close()
after 5000 ->
handle_timeout(Socket)
end.
interpret_response(Socket, ping, Command) ->
ok = zx_net:send(Socket, pong),
handle_response(Socket, Command);
interpret_response(Socket, {sub, Channel, Message}, Command) ->
ok = zx_daemon:notify(Channel, Message),
handle_response(Socket, Command);
interpret_response(Socket, {update, Message}, Command) ->
interpret_response(Socket, Response, list) ->
interpret_response(Socket, Response, latest) ->
interpret_response(Socket, Response, fetch) ->
interpret_response(Socket, Response, key) ->
interpret_response(Socket, Response, pending) ->
interpret_response(Socket, Response, packagers) ->
interpret_response(Socket, Response, maintainers) ->
interpret_response(Socket, Response, sysops) ->
case element(1, Action) of
end,
-spec fetch(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
PackageID :: zx:package_id(),
Result :: ok.
%% @private
%% Download a package to the local cache.
@@ -227,13 +332,14 @@ dispatch(Invalid, Socket) ->
fetch(Socket, PackageID) ->
{ok, LatestID} = request_zrp(Socket, PackageID),
ok = receive_zrp(Socket, LatestID),
log(info, "Fetched ~ts", [package_string(LatestID)]).
Latest = zx_lib:package_string(LatestID),
log(info, "Fetched ~ts", [Latest]).
-spec request_zrp(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
Result :: {ok, Latest :: package_id()}
PackageID :: zx:package_id(),
Result :: {ok, Latest :: zx:package_id()}
| {error, Reason :: timeout | term()}.
request_zrp(Socket, PackageID) ->
@@ -244,7 +350,7 @@ request_zrp(Socket, PackageID) ->
{sending, LatestID} ->
{ok, LatestID};
Error = {error, Reason} ->
PackageString = package_string(PackageID),
PackageString = zx_lib:package_string(PackageID),
Message = "Error receiving package ~ts: ~tp",
ok = log(info, Message, [PackageString, Reason]),
Error
@@ -258,7 +364,7 @@ request_zrp(Socket, PackageID) ->
-spec receive_zrp(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: package_id(),
PackageID :: zx:package_id(),
Result :: ok | {error, timeout}.
receive_zrp(Socket, PackageID) ->
@@ -286,11 +392,11 @@ handle_unexpected_close() ->
terminate().
-spec handle_timeout(gen_tcp:socket()) -> no_return()
-spec handle_timeout(gen_tcp:socket()) -> no_return().
handle_timeout(Socket) ->
ok = zx_daemon:report(timeout),
ok = disconnect(Socket),
ok = zx_net:disconnect(Socket),
terminate().
+16 -9
View File
@@ -218,7 +218,9 @@
%% Conn Communication
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
| {redirect, Hosts :: [zx:host()]}
| disconnected.
| failed
| disconnected
| timeout.
%% Subscriber / Requestor Communication
% Incoming Request messages
@@ -229,8 +231,9 @@
| {list, zx:realm(), zx:name(), zx:version()}
| {latest, zx:realm(), zx:name(), zx:version()}
| {fetch, zx:realm(), zx:name(), zx:version()}
| {key, zx:realm(), zx:key_name()}
| {fetchkey, zx:realm(), zx:key_name()}
| {pending, zx:realm(), zx:name()}
| {resigns, zx:realm()}
| {packagers, zx:realm(), zx:name()}
| {maintainers, zx:realm(), zx:name()}
| {sysops, zx:realm()}.
@@ -447,14 +450,14 @@ latest({Realm, Name, Version}) ->
%% Response messages are of the type `result()' where the third element is of the
%% type `fetch_result()'.
fetch({Realm, Name, Version}) ->
fetch_zsp(PackageID = {Realm, Name, Version}) ->
true = zx_lib:valid_lower0_9(Realm),
true = zx_lib:valid_lower0_9(Name),
true = zx_lib:valid_version(Version),
request({fetch, Realm, Name, Version}).
request({fetch, zsp, PackageID}).
-spec key(KeyID) -> {ok, RequestID}
-spec fetch_key(KeyID) -> {ok, RequestID}
when KeyID :: zx:key_id(),
RequestID :: id().
%% @doc
@@ -464,10 +467,10 @@ fetch({Realm, Name, Version}) ->
%% Response messages are of the type `result()' where the third element is of the
%% type `key_result()'.
key({Realm, KeyName}) ->
fetch_key(KeyID = {Realm, KeyName}) ->
true = zx_lib:valid_lower0_9(Realm),
true = zx_lib:valid_lower0_9(KeyName),
request({key, Realm, KeyName}).
request({fetch, key, KeyID}).
-spec pending(Package) -> {ok, RequestID}
@@ -561,7 +564,7 @@ report(Message) ->
gen_server:cast(?MODULE, {report, self(), Message}).
-spec result(reference(), result()) -> ok.
-spec result(id(), result()) -> ok.
%% @private
%% Return a tagged result back to the daemon to be forwarded to the original requestor.
@@ -839,6 +842,10 @@ do_report(Conn, failed, State = #s{mx = MX}) ->
NewMX = mx_del_monitor(Conn, attempt, MX),
failed(Conn, State#s{mx = NewMX});
do_report(Conn, disconnected, State = #s{mx = MX}) ->
NewMX = mx_del_monitor(Conn, conn, MX),
disconnected(Conn, State#s{mx = NewMX});
do_report(Conn, timeout, State = #s{mx = MX}) ->
ok = log(warning, "Connection ~tp timed out.", [Conn]),
NewMX = mx_del_monitor(Conn, conn, MX),
disconnected(Conn, State#s{mx = NewMX}).
@@ -1109,7 +1116,7 @@ dispatch_request(Action, ID, CX) ->
Realm = element(2, Action),
case cx_pre_send(Realm, ID, CX) of
{ok, Conn, NewCX} ->
ok = zx_conn:make_request(Conn, ID, Action),
ok = zx_conn:request(Conn, ID, Action),
{dispatched, NewCX};
unassigned ->
wait;