From 9aea2d189b4e8c71506a5faebc2f7c38aa3e6b67 Mon Sep 17 00:00:00 2001 From: Craig Everett Date: Wed, 7 Feb 2018 19:37:45 +0900 Subject: [PATCH] Moving connection index into ADT. For the lulz. --- zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl | 131 +++++++++++++++++++++-- 1 file changed, 123 insertions(+), 8 deletions(-) diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index beeb489..495a0dd 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -19,13 +19,19 @@ %%% %%% When the zx_daemon is started it checks local configuration and cache files to %%% determine what realms must be found and what cached Zomp nodes it is aware of. -%%% It populates an internal record #hx{} (typed as host_index()) with realm config +%%% It populates an internal record #cx{} (typed as conn_index()) with realm config %%% and host cache data and then immediately initiates three connection attempts to %%% cached nodes for each realm configured. If a host is known to service more than %%% one configured realm only one attempt will be made at a time to connect to it. %%% If all cached hosts for a given realm have been tried already, or if none are %%% known, then the daemon will direct a connection be made to the prime node. %%% +%%% The conn_index() type is abstract across the module, handled expicitly via use +%%% of manipulation functions called cx_*. The details of index manipulation can +%%% easily litter the code with incidental complexity (as far as a reader is concerned) +%%% so hiding that as abstract data leaves more mental cycles to consider the goals +%%% of the program. +%%% %%% Once the connection attempts have been initiated (via zx_conn:start/1) the daemon %%% waits in receive for either a connection report from a connection or an action %%% request from elsewhere in the system. @@ -36,7 +42,7 @@ %%% - A failure can occur at any time. In the event a connected and assigned zx_conn %%% has failed (whether it was connected and assigned, or never succeeded at all) the %%% target host will be dropped from the hosts cache and another attempt will be made -%%% in its place if other hosts are known. +%%% in its place. %%% - If a connection is disconnected then the host will be placed at the back of the %%% hosts cache. %%% - If a connection is redirected then the redirecting host will be placed at the @@ -51,6 +57,24 @@ %%% whatever realms it provides that are not yet services by a connection, and in %%% the case it does not service any required and unassigned realms it will be %%% instructed to disconnect. +%%% +%%% Because the daemon always initiates connection attempts on startup success or +%%% failure messages are guaranteed to be received without any need for a timer. For +%%% that reason there is no timed re-inspection mechanism present in this module. +%%% +%%% Action requests are queued within the zx_daemon, so requests to download a package, +%%% for example, look the same as several requests to download several packages. Each +%%% request that requires dispatching to a zx_conn is held in the active action slot +%%% until complete, paired with the pid of the zx_conn handling the action. If a +%%% zx_conn dies before completing an action or between the time an action request is +%%% received by the daemon and dispatch to the zx_conn occurs then the daemon will +%%% receive the terminal monitor message and be able to put the pending action back +%%% into the action queue, re-establish a connection to the necessary realm, and then +%%% re-dispatch the action to the new zx_conn. +%%% +%%% A bit of state handling is required (queueing requests and storing the current +%%% action state), but this permits the system above the daemon to interact with it in +%%% a blocking way, but zx_daemon and zx_conn to work asynchronously with one another. %%% @end -module(zx_daemon). @@ -81,18 +105,109 @@ argv = none :: none | [string()], reqp = none :: none | pid(), reqm = none :: none | reference(), + action = none :: none | action(), actions = queue:new() :: queue:queue(action()), - realms = #{} :: #{zx:realm() := zx:serial()}, - primes = #{} :: #{zx:realm() := zx:host()}, - hosts = #{} :: #{zx:realm() := [zx:host()]}, - conns = [] :: [{pid(), reference()}]}). - + cx = #cx{} :: conn_index()}). + + +-record(cx, + {realms = #{} :: #{zx:realm() := zx:serial()}, + primes = #{} :: #{zx:realm() := zx:host()}, + hosts = #{} :: #{zx:realm() := queue:queue(zx:host())}, + conns = #{} :: #{zx:realm() := pid()}, + attempts = #{} :: #{pid() := zx:host()}, + zx_conns = sets:new() :: sets:set({pid(), reference()})}). + +-spec cx_connected(Available, Conn, CX) -> Result + when Available :: [{zx:realm(), zx:serial()}], + Conn :: pid(), + CX :: conn_index(), + Result :: {Assignment, NewCX}, + Assignment :: assigned | unassigned, + NewCX :: conn_index(). +%% @private +%% An abstract data handler which is called whenever a new connection is successfully +%% established by a zx_conn. Any unconnected realms with a valid serial will be +%% assigned to the new connection; if none are needed then the connection is closed. +%% The successful host is placed back in the hosts queue for each available realm. +%% The return value is a tuple that indicates whether the new connection was assigned +%% or not and the updated CX data value. + +cx_connected(Available, Conn, CX) -> + cx_connected(unassigned, Available, Conn, CX). + + +cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) -> + case maps:find(Realm, Realms) of + {ok, S} when S < Serial -> + NewRealms = maps:update(Realm, Serial, Realms), + {NewA, NewCX} = cx_assign(A, Conn, Realm, CX#cx{realms = NewRealms}), + cx_connected(NewA, Rest, Conn, NewCX); + {ok, S} when S == Serial -> + {NewA, NewCX} = cx_assign(A, Conn, Realm, CX), + cx_connected(NewA, Rest, Conn, NewCX); + {ok, S} when S > Serial -> + cx_connected(A, Rest, Conn, CX); + error -> + cx_connected(A, Rest, Conn, CX) + end; +cx_connected(A, [], Conn, CX = #cx{attempts = Attempts}) -> + NewCX = CX#cx{attempts = maps:remove(Conn, Attempts)}, + {A, NewCX}. + + +cx_assign(A, Conn, Realm, CX = #cx{hosts = Hosts, conns = Conns, attempts = Attempts}) -> + Host = maps:get(Conn, Attempts), + Enqueue = fun(Q) -> queue:in(Host, Q) end, + NewHosts = maps:update_with(Realm, Enqueue, Hosts), + case maps:is_key(Realm, Conns) of + true -> + {A, CX#cx{hosts = NewHosts}}; + false -> + NewConns = maps:put(Realm, Conn, Conns), + {assigned, CX#cx{hosts = NewHosts, conns = NewConns}} + end. + + +-spec cx_disconnect(Conn, CX) -> NewCX + +cx_disconnected(Conn, CX) -> + + +-spec cx_failed(Conn, CX) -> NewCX + +cx_failed(Conn, CX) -> + + +-spec cx_next(Realm, CX) -> + +cx_next(Realm, CX) -> + + +-spec cx_resolve(Realm, CX) -> Conn + +cx_resolve(Realm, CX) -> + + -type state() :: #s{}. +-type conn_index() :: #cx{}. -type action() :: {subscribe, zx:package()}, | unsubscribe - | {list, + | {list, zx:realm()} + | {list, zx:realm(), zx:name()} + | {list, zx:realm(), zx:name(), zx:version()} + | {latest, zx:realm()} + | {latest, zx:realm(), zx:name()} + | {latest, zx:realm(), zx:name(), zx:version()} + | {fetch, zx:package_id()} + | {key, zx:key_id()} + | {pending, zx:package()} + | {packagers, zx:package()} + | {maintainers, zx:package()} + | sysops + | {subscribe, zx:realm(), zx:name()}. -type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} | failed | disconnected.