Moving connection index into ADT. For the lulz.

This commit is contained in:
Craig Everett 2018-02-07 19:37:45 +09:00
parent 71e31c48b2
commit 9aea2d189b

View File

@ -19,13 +19,19 @@
%%%
%%% When the zx_daemon is started it checks local configuration and cache files to
%%% determine what realms must be found and what cached Zomp nodes it is aware of.
%%% It populates an internal record #hx{} (typed as host_index()) with realm config
%%% It populates an internal record #cx{} (typed as conn_index()) with realm config
%%% and host cache data and then immediately initiates three connection attempts to
%%% cached nodes for each realm configured. If a host is known to service more than
%%% one configured realm only one attempt will be made at a time to connect to it.
%%% If all cached hosts for a given realm have been tried already, or if none are
%%% known, then the daemon will direct a connection be made to the prime node.
%%%
%%% The conn_index() type is abstract across the module, handled expicitly via use
%%% of manipulation functions called cx_*. The details of index manipulation can
%%% easily litter the code with incidental complexity (as far as a reader is concerned)
%%% so hiding that as abstract data leaves more mental cycles to consider the goals
%%% of the program.
%%%
%%% Once the connection attempts have been initiated (via zx_conn:start/1) the daemon
%%% waits in receive for either a connection report from a connection or an action
%%% request from elsewhere in the system.
@ -36,7 +42,7 @@
%%% - A failure can occur at any time. In the event a connected and assigned zx_conn
%%% has failed (whether it was connected and assigned, or never succeeded at all) the
%%% target host will be dropped from the hosts cache and another attempt will be made
%%% in its place if other hosts are known.
%%% in its place.
%%% - If a connection is disconnected then the host will be placed at the back of the
%%% hosts cache.
%%% - If a connection is redirected then the redirecting host will be placed at the
@ -51,6 +57,24 @@
%%% whatever realms it provides that are not yet services by a connection, and in
%%% the case it does not service any required and unassigned realms it will be
%%% instructed to disconnect.
%%%
%%% Because the daemon always initiates connection attempts on startup success or
%%% failure messages are guaranteed to be received without any need for a timer. For
%%% that reason there is no timed re-inspection mechanism present in this module.
%%%
%%% Action requests are queued within the zx_daemon, so requests to download a package,
%%% for example, look the same as several requests to download several packages. Each
%%% request that requires dispatching to a zx_conn is held in the active action slot
%%% until complete, paired with the pid of the zx_conn handling the action. If a
%%% zx_conn dies before completing an action or between the time an action request is
%%% received by the daemon and dispatch to the zx_conn occurs then the daemon will
%%% receive the terminal monitor message and be able to put the pending action back
%%% into the action queue, re-establish a connection to the necessary realm, and then
%%% re-dispatch the action to the new zx_conn.
%%%
%%% A bit of state handling is required (queueing requests and storing the current
%%% action state), but this permits the system above the daemon to interact with it in
%%% a blocking way, but zx_daemon and zx_conn to work asynchronously with one another.
%%% @end
-module(zx_daemon).
@ -81,18 +105,109 @@
argv = none :: none | [string()],
reqp = none :: none | pid(),
reqm = none :: none | reference(),
action = none :: none | action(),
actions = queue:new() :: queue:queue(action()),
realms = #{} :: #{zx:realm() := zx:serial()},
cx = #cx{} :: conn_index()}).
-record(cx,
{realms = #{} :: #{zx:realm() := zx:serial()},
primes = #{} :: #{zx:realm() := zx:host()},
hosts = #{} :: #{zx:realm() := [zx:host()]},
conns = [] :: [{pid(), reference()}]}).
hosts = #{} :: #{zx:realm() := queue:queue(zx:host())},
conns = #{} :: #{zx:realm() := pid()},
attempts = #{} :: #{pid() := zx:host()},
zx_conns = sets:new() :: sets:set({pid(), reference()})}).
-spec cx_connected(Available, Conn, CX) -> Result
when Available :: [{zx:realm(), zx:serial()}],
Conn :: pid(),
CX :: conn_index(),
Result :: {Assignment, NewCX},
Assignment :: assigned | unassigned,
NewCX :: conn_index().
%% @private
%% An abstract data handler which is called whenever a new connection is successfully
%% established by a zx_conn. Any unconnected realms with a valid serial will be
%% assigned to the new connection; if none are needed then the connection is closed.
%% The successful host is placed back in the hosts queue for each available realm.
%% The return value is a tuple that indicates whether the new connection was assigned
%% or not and the updated CX data value.
cx_connected(Available, Conn, CX) ->
cx_connected(unassigned, Available, Conn, CX).
cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) ->
case maps:find(Realm, Realms) of
{ok, S} when S < Serial ->
NewRealms = maps:update(Realm, Serial, Realms),
{NewA, NewCX} = cx_assign(A, Conn, Realm, CX#cx{realms = NewRealms}),
cx_connected(NewA, Rest, Conn, NewCX);
{ok, S} when S == Serial ->
{NewA, NewCX} = cx_assign(A, Conn, Realm, CX),
cx_connected(NewA, Rest, Conn, NewCX);
{ok, S} when S > Serial ->
cx_connected(A, Rest, Conn, CX);
error ->
cx_connected(A, Rest, Conn, CX)
end;
cx_connected(A, [], Conn, CX = #cx{attempts = Attempts}) ->
NewCX = CX#cx{attempts = maps:remove(Conn, Attempts)},
{A, NewCX}.
cx_assign(A, Conn, Realm, CX = #cx{hosts = Hosts, conns = Conns, attempts = Attempts}) ->
Host = maps:get(Conn, Attempts),
Enqueue = fun(Q) -> queue:in(Host, Q) end,
NewHosts = maps:update_with(Realm, Enqueue, Hosts),
case maps:is_key(Realm, Conns) of
true ->
{A, CX#cx{hosts = NewHosts}};
false ->
NewConns = maps:put(Realm, Conn, Conns),
{assigned, CX#cx{hosts = NewHosts, conns = NewConns}}
end.
-spec cx_disconnect(Conn, CX) -> NewCX
cx_disconnected(Conn, CX) ->
-spec cx_failed(Conn, CX) -> NewCX
cx_failed(Conn, CX) ->
-spec cx_next(Realm, CX) ->
cx_next(Realm, CX) ->
-spec cx_resolve(Realm, CX) -> Conn
cx_resolve(Realm, CX) ->
-type state() :: #s{}.
-type conn_index() :: #cx{}.
-type action() :: {subscribe, zx:package()},
| unsubscribe
| {list,
| {list, zx:realm()}
| {list, zx:realm(), zx:name()}
| {list, zx:realm(), zx:name(), zx:version()}
| {latest, zx:realm()}
| {latest, zx:realm(), zx:name()}
| {latest, zx:realm(), zx:name(), zx:version()}
| {fetch, zx:package_id()}
| {key, zx:key_id()}
| {pending, zx:package()}
| {packagers, zx:package()}
| {maintainers, zx:package()}
| sysops
| {subscribe, zx:realm(), zx:name()}.
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
| failed
| disconnected.