Blah blah blah

This commit is contained in:
Craig Everett 2018-02-28 08:04:21 +09:00
parent 107da5f41d
commit eac31b0e90

View File

@ -3,9 +3,29 @@
%%% %%%
%%% Resident execution daemon and runtime interface to Zomp. %%% Resident execution daemon and runtime interface to Zomp.
%%% %%%
%%% The daemon lives in the background once started and awaits action requests from %%% The daemon lives in the background once started and awaits query requests and
%%% callers running within the system. The daemon is only capable of handling %%% subscriptions from from other processes. The daemon is only capable of handling
%%% unprivileged (user) actions such as querying a Zomp node or fetching packages. %%% unprivileged (user) actions.
%%%
%%%
%%% Discrete state and local abstract data types
%%%
%%% The daemon must keep track of requestors, subscribers, and zx_conn processes by
%%% using monitors, and because the various types of clients are found in different
%%% locations the monitors are maintained in a data type called monitor_index(),
%%% shortened to "mx" throughout the module. This structure is treated as an opaque
%%% data type and is handled by a set of functions defined toward the end of the module
%%% as mx_*/N.
%%%
%%% Node connections (cx_conn processes) must also be tracked for status and realm
%%% availability. This is done using a type called the conn_index(), shortened to
%%% "cx" throughout the modue. conn_index() is treated as an abstract, opaque data
%%% type across the module in the same way as the monitor_index() mentioned above,
%%% and is handled via a set of cx_*/N functions defined at the end of the module
%%% after the mx_*/N section.
%%%
%%% Do NOT directly access data within these structures, use (or write) an accessor
%%% function that does what you want.
%%% %%%
%%% %%%
%%% Connection handling %%% Connection handling
@ -14,63 +34,57 @@
%%% http://zxq9.com/archives/1311 %%% http://zxq9.com/archives/1311
%%% It is in charge of the high-level task of servicing requested actions and returning %%% It is in charge of the high-level task of servicing requested actions and returning
%%% responses to callers as well as mapping successful connections to configured realms %%% responses to callers as well as mapping successful connections to configured realms
%%% and repairing failed connections to various realms by searching for and directing %%% and repairing failed connections to nodes that reduce availability of configured
%%% that connections be made to various realms to satisfy action request requirements. %%% realms.
%%% %%%
%%% When the zx_daemon is started it checks local configuration and cache files to %%% When the zx_daemon is started it checks local configuration and cache files to
%%% determine what realms must be found and what cached Zomp nodes it is aware of. %%% determine what realms must be available and what cached Zomp nodes it is aware of.
%%% It populates an internal record #cx{} (typed as conn_index()) with realm config %%% It populates the CX (conn_index(), mentioned above) with realm config and host
%%% and host cache data and then immediately initiates three connection attempts to %%% cache data, and then immediately initiates three connection attempts to cached
%%% cached nodes for each realm configured. If a host is known to service more than %%% nodes for each realm configured (see init_connections/0).
%%% one configured realm only one attempt will be made at a time to connect to it.
%%% If all cached hosts for a given realm have been tried already, or if none are
%%% known, then the daemon will direct a connection be made to the prime node.
%%% %%%
%%% The conn_index() type is abstract across the module, handled expicitly via use %%% Once connection attempts have been initiated the daemon waits in receive for
%%% of manipulation functions called cx_*. The details of index manipulation can %%% either a connection report (success or failure) or an action request from
%%% easily litter the code with incidental complexity (as far as a reader is concerned) %%% elsewhere in the system.
%%% so hiding that as abstract data leaves more mental cycles to consider the goals
%%% of the program.
%%% %%%
%%% Once the connection attempts have been initiated (via zx_conn:start/1) the daemon %%% Connection status is relayed with report/1 and indicates to the daemon whether
%%% waits in receive for either a connection report from a connection or an action %%% a connection has failed, been disconneocted, redirected, or succeeded. See the
%%% request from elsewhere in the system. %%% zx_conn internals for details. If a connection is successful then the zx_conn
%%% will relay the connected node's realm availability status to the daemon, and
%%% the daemon will match the node's provided realms with the configured realm list.
%%% Any realms that are not yet provided by another connection will be assigned to
%%% the reporting successful zx_conn. If no unserviced realms are provided by the
%%% node the zx_conn will be shut down but the host info will be cached for future
%%% use. Any realms that have an older serial than the serial currently known to
%%% zx will be disregarded (which may result in termination of the connection if it
%%% means there are no useful realms available on a given node).
%%% %%%
%%% A connection request is made with a call to report/1 and indicates to the daemon %%% A failure can occur at any time. In the event a connected and assigned zx_conn
%%% whether a connection has failed, been disconneocted, redirected, or succeeded. %%% has failed the target host will be dropped from the hosts cache, the zx_conn will
%%% Connection handling is as follows: %%% terminate and a new one will be spawned in its place if there is a gap in
%%% - A failure can occur at any time. In the event a connected and assigned zx_conn %%% configured realm coverage.
%%% has failed (whether it was connected and assigned, or never succeeded at all) the
%%% target host will be dropped from the hosts cache and another attempt will be made
%%% in its place.
%%% - If a connection is disconnected then the host will be placed at the back of the
%%% hosts cache.
%%% - If a connection is redirected then the redirecting host will be placed at the
%%% back of the hosts cache and the list of new Zomp nodes provided by the redirect
%%% will be added at the front of the connect queue.
%%% - In the event a connection succeeds then the list of provided realms and their
%%% current serials will be compared to the list of known realms and serials, and
%%% the host will be added to the relevant realm host index if not already present
%%% and the provided serial is newer than the currently known one (but the realm
%%% serial will not be updated at this point, only the host added). After the
%%% host's record is updated across the realm indices the daemon will assign it to
%%% whatever realms it provides that are not yet services by a connection, and in
%%% the case it does not service any required and unassigned realms it will be
%%% instructed to disconnect.
%%% %%%
%%% Because the daemon always initiates connection attempts on startup success or %%% Nodes may be too busy (their client slots full) to accept a new connection. In
%%% failure messages are guaranteed to be received without any need for a timer. For %%% this case the node should give the zx_conn a redirect instruction during protocol
%%% that reason there is no timed re-inspection mechanism present in this module. %%% negotiation. The zx_conn will report the redirect and host list to the daemon,
%%% and the daemon will add the hosts to the host cache and the redirecting host will
%%% be placed at the rear of the host cache unless it is the prime node for the target
%%% realm.
%%% %%%
%%% Action requests are queued within the zx_daemon, so requests to download a package, %%%
%%% for example, look the same as several requests to download several packages. Each %%% Request queues
%%% request that requires dispatching to a zx_conn is held in the active action slot %%%
%%% until complete, paired with the pid of the zx_conn handling the action. If a %%% Requests, reports and subscription updates are all either forwarded to affected
%%% zx_conn dies before completing an action or between the time an action request is %%% processes or entered into a work queue. All such work requests are received as
%%% received by the daemon and dispatch to the zx_conn occurs then the daemon will %%% asynchronous messages and cause the work queue to first be updated, and then,
%%% receive the terminal monitor message and be able to put the pending action back %%% as a separate step, the work queue is re-evaluated in its entirety. Any work that
%%% into the action queue, re-establish a connection to the necessary realm, and then %%% cannot be completed (due to a realm not being available, for example) is recycled
%%% re-dispatch the action to the new zx_conn. %%% to the queue. A connection report also triggers a queue re-evaluation, so there
%%% should not be cases where the work queue stalls on active requests.
%%%
%%% Requestors sending either download or realm query requests are given a reference
%%% to match on for receipt of their result messages or to be used to cancel the
%%% requested work (timeouts are handled by the caller, not by the daemon).
%%% %%%
%%% A bit of state handling is required (queueing requests and storing the current %%% A bit of state handling is required (queueing requests and storing the current
%%% action state), but this permits the system above the daemon to interact with it in %%% action state), but this permits the system above the daemon to interact with it in
@ -113,9 +127,8 @@
-record(cx, -record(cx,
{realms = #{} :: #{zx:realm() := realm_meta()}, {realms = #{} :: #{zx:realm() := realm_meta()},
assigned = [] :: [{zx:realm(), pid()}], attempts = [] :: [{pid(), zx:host(), [zx:realm()]}],
attempts = [] :: [{pid(), zx:host()}], conns = [] :: [{pid(), zx:host(), [zx:realm()]}]}).
conns = [] :: [{pid(), zx:host()}]}).
-record(rmeta, -record(rmeta,
@ -126,7 +139,9 @@
mirrors = queue:new() :: queue:queue(zx:host()), mirrors = queue:new() :: queue:queue(zx:host()),
realm_keys = [] :: [zx:key_meta()], realm_keys = [] :: [zx:key_meta()],
package_keys = [] :: [zx:key_meta()], package_keys = [] :: [zx:key_meta()],
sysops = [] :: [zx:sysop_meta()]}). sysops = [] :: [zx:sysop_meta()],
assigned = none :: none | pid(),
available = [] :: [pid()]}).
%% State Types %% State Types
-type state() :: #s{}. -type state() :: #s{}.
@ -134,17 +149,20 @@
-type conn_index() :: #cx{}. -type conn_index() :: #cx{}.
-type realm_meta() :: #rmeta{}. -type realm_meta() :: #rmeta{}.
-type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()} -type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()}
| conn_attempt | attempt
| conn. | conn.
%% Conn Communication %% Conn Communication
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} -type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
| {redirect, Hosts :: [zx:host()]}
| disconnected. | disconnected.
%% Subscriber / Requestor Communication %% Subscriber / Requestor Communication
% Incoming Request messages % Incoming Request messages
-type request() :: {Requestor :: pid(), Action :: action() | subunsub()}. -type action () :: {Requestor :: pid(),
-type action() :: {list, realms | zx:identifier()} Request :: request() | subunsub()}.
-type request() :: {list, realms | zx:identifier()}
| {latest, zx:identifier()} | {latest, zx:identifier()}
| {fetch, zx:package_id()} | {fetch, zx:package_id()}
| {key, zx:key_id()} | {key, zx:key_id()}
@ -156,6 +174,12 @@
| {unsubscribe, zx:package()}. | {unsubscribe, zx:package()}.
% Outgoing Result Messages % Outgoing Result Messages
%
% Results are sent wrapped a triple: {result, Ref, Result}
% where the result itself is a triple: {Type, Identifier, Content}
%
% Subscription messages are a separate type below.
-type result() :: sub_result() -type result() :: sub_result()
| list_result() | list_result()
| latest_result() | latest_result()
@ -165,9 +189,6 @@
| pack_result() | pack_result()
| maint_result() | maint_result()
| sysop_result(). | sysop_result().
-type sub_result() :: {subscription, zx:package(),
Message :: {update, zx:package_id()}
| {error, bad_realm | bad_package}}.
-type list_result() :: {list, realms, -type list_result() :: {list, realms,
Message :: {ok, [zx:realm()]}} Message :: {ok, [zx:realm()]}}
| {list, zx:realm(), | {list, zx:realm(),
@ -177,7 +198,7 @@
Message :: {ok, [zx:version]} Message :: {ok, [zx:version]}
| {error, bad_realm | {error, bad_realm
| bad_package | bad_package
| timeout}} | timeout}}.
-type latest_result() :: {latest, -type latest_result() :: {latest,
Package :: zx:package() Package :: zx:package()
| zx:package_id(), | zx:package_id(),
@ -218,6 +239,12 @@
| timeout}. | timeout}.
% Subscription Results
-type sub_message() :: {subscription, zx:package(),
Message :: {update, zx:package_id()}
| {error, bad_realm | bad_package}}.
%%% Requestor Interface %%% Requestor Interface
-spec pass_meta(Meta, Dir, ArgV) -> ok -spec pass_meta(Meta, Dir, ArgV) -> ok
@ -240,19 +267,12 @@ pass_meta(Meta, Dir, ArgV) ->
gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}).
-spec subscribe(Package) -> Result -spec subscribe(Package) -> ok
when Package :: zx:package(), when Package :: zx:package().
Result :: ok
| {error, Reason},
Reason :: illegal_requestor
| {already_subscribed, zx:package()}.
%% @doc %% @doc
%% Subscribe to update notification for a for a particular package. %% Subscribe to update notification for a for a particular package.
%% The daemon is designed to monitor a single package at a time, so a second call to %% The caller will receive update notifications of type sub_result() as Erlang
%% subscribe/1 will return an `already_subscribed' error, or possibly an %% messages whenever an update occurs.
%% `illegal_requestor' error in the case that a second call is made from a different
%% process than the original requestor.
%% Other functions can be used to query the status of a package at an arbitrary time.
subscribe(Package) -> subscribe(Package) ->
gen_server:cast(?MODULE, {subscribe, self(), Package}). gen_server:cast(?MODULE, {subscribe, self(), Package}).
@ -354,7 +374,7 @@ request(Action) ->
%%% Connection interface %%% Upstream Zomp connection interface
-spec report(Message) -> ok -spec report(Message) -> ok
when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]} when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
@ -402,7 +422,7 @@ start_link() ->
init(none) -> init(none) ->
{ok, MX, CX} = init_connections(), {ok, MX, CX} = init_connections(),
State = #s{mx = MX, cx = NewCX}, State = #s{mx = MX, cx = CX},
{ok, State}. {ok, State}.
@ -413,15 +433,20 @@ init_connections() ->
init_connections(Realms, MX, CX). init_connections(Realms, MX, CX).
init_connections([Realm | Realms], Monitors, CX) -> init_connections([Realm | Realms], MX, CX = #cx{attempts = Attempts}) ->
{ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX), {ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX),
StartConn = StartConn =
fun(Host) -> fun(Host, A) ->
{ok, Pid} = zx_conn:start(Host), case lists:keymember(Host, 2, Attempts) of
{Pid, Host} false ->
{ok, Pid} = zx_conn:start(Host),
[{Pid, Host} | A];
true ->
A
end
end, end,
NewAttempts = lists:map(StartConn, Hosts), NewAttempts = lists:foldl(StartConn, [], Hosts),
AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, conn_attempt, M) end, AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, attempt, M) end,
NewMX = lists:foldl(AddMonitor, MX, NewAttempts), NewMX = lists:foldl(AddMonitor, MX, NewAttempts),
NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts), NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts),
init_connections(Realms, NewMX, NewCX); init_connections(Realms, NewMX, NewCX);
@ -556,16 +581,21 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
{ScrubbedMX, NextCX} {ScrubbedMX, NextCX}
end, end,
State#s{mx = NewMX, cx = NewCX}; State#s{mx = NewMX, cx = NewCX};
do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) ->
NextMX = mx_del_monitor(Conn, attempt, MX),
{Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX),
{NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX)
State#s{mx = NewMX, cx = NewCX};
do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) -> do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) ->
{Unassigned, NextMX, NextCX} = {Unassigned, NextMX, NextCX} =
case mx_lookup_category(Conn, MX) of case mx_lookup_category(Conn, MX) of
conn_attempt -> attempt ->
{ok, [], ScrubbedCX} cx_disconnected(Conn, attempt, CX), ScrubbedMX = mx_del_monitor(Conn, attempt, MX),
ScrubbedMX = mx_del_monitor(Conn, conn_attempt, MX), ScrubbedCX = cx_failed(Conn, CX),
{[], ScrubbedMX, ScrubbedCX}; {[], ScrubbedMX, ScrubbedCX};
conn -> conn ->
{ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX),
ScrubbedMX = mx_del_monitor(Conn, conn, MX), ScrubbedMX = mx_del_monitor(Conn, conn, MX),
{ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX),
{Dropped, ScrubbedMX, ScrubbedCX} {Dropped, ScrubbedMX, ScrubbedCX}
end, end,
{NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX), {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX),
@ -581,11 +611,21 @@ do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) ->
%% @private %% @private
%% Initiates a single connection and updates the relevant structures. %% Initiates a single connection and updates the relevant structures.
init_connection([Realm | Realms], MX, CX) -> init_connection([Realm | Realms], MX, CX = #cx{realms = Metas}) ->
{ok, Host, NextCX} = cx_next_hosts(Realm, CX), {NewMX, NewCX} =
{ok, Pid} = zx_conn:start(Host), case maps:get(Realm) of
NewMX = mx_add_monitor(Pid, conn_attempt, MX), #rmeta{available = []} ->
NewCX = cx_add_attempt({Pid, Host}, CX), {ok, Host, UpdatedCX} = cx_next_hosts(Realm, CX),
{ok, Pid} = zx_conn:start(Host),
NextMX = mx_add_monitor(Pid, attempt, MX),
NextCX = cx_add_attempt({Pid, Host}, UpdatedCX),
{NextMX, NextCX};
Meta = #rmeta{available = [Conn | Conns]} ->
NewMeta = Meta#rmeta{assigned = Conn, available = Conns},
NewMetas = maps:put(Realm, NewMeta, Metas),
NextCX = CX#cx{realms = NewMetas},
{MX, NewCX}
end,
init_connection(Realms, NewMX, NewCX); init_connection(Realms, NewMX, NewCX);
init_connection([], MX, CX) -> init_connection([], MX, CX) ->
{MX, CX}. {MX, CX}.
@ -883,12 +923,12 @@ mx_add_monitor(Pid, requestor, MX) ->
Ref = monitor(process, Pid), Ref = monitor(process, Pid),
[{Ref, Pid, {0, 1}} | MX] [{Ref, Pid, {0, 1}} | MX]
end; end;
mx_add_monitor(Pid, conn_attempt, MX) -> mx_add_monitor(Pid, attempt, MX) ->
false = lists:keymember(Pid, 2, MX), false = lists:keymember(Pid, 2, MX),
Ref = monitor(process, Pid), Ref = monitor(process, Pid),
[{Ref, Pid, conn_attempt} | MX]; [{Ref, Pid, attempt} | MX];
mx_add_monitor(Pid, conn, MX) -> mx_add_monitor(Pid, conn, MX) ->
{value, {Ref, Pid, conn_attempt}, NextMX} = lists:keytake(Pid, 2, MX), {value, {Ref, Pid, attempt}, NextMX} = lists:keytake(Pid, 2, MX),
[{Ref, Pid, conn} | NextMX], [{Ref, Pid, conn} | NextMX],
@ -924,7 +964,7 @@ mx_del_monitor(Pid, Category, MX) ->
-spec mx_lookup_category(pid(), monitor_index()) -> Result -spec mx_lookup_category(pid(), monitor_index()) -> Result
when Result :: conn_attempt when Result :: attempt
| conn | conn
| requestor | requestor
| subscriber | subscriber
@ -935,7 +975,7 @@ mx_del_monitor(Pid, Category, MX) ->
mx_lookup_category(Pid, MX) -> mx_lookup_category(Pid, MX) ->
case lists:keyfind(Pid, 2, MX) of case lists:keyfind(Pid, 2, MX) of
{_, _, conn_attempt} -> conn_attempt; {_, _, attempt} -> attempt;
{_, _, conn} -> conn; {_, _, conn} -> conn;
{_, _, {0, _}} -> requestor; {_, _, {0, _}} -> requestor;
{_, _, {_, 0}} -> subscriber; {_, _, {_, 0}} -> subscriber;
@ -951,11 +991,11 @@ mx_lookup_category(Pid, MX) ->
| {error, not_found}, | {error, not_found},
NewMX :: monitor_index(). NewMX :: monitor_index().
%% @private %% @private
%% Upgrade a conn_attempt to a conn. %% Upgrade a attempt to a conn.
mx_upgrade_conn(Pid, MX) -> mx_upgrade_conn(Pid, MX) ->
case lists:keytake(Pid, 2, MX) of case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, conn_attempt}, NextMX} -> {value, {Ref, Pid, attempt}, NextMX} ->
NewMX = [{Ref, Pid, conn} | NextMX], NewMX = [{Ref, Pid, conn} | NextMX],
{ok, NewMX}; {ok, NewMX};
false -> false ->
@ -1002,19 +1042,22 @@ cx_populate() ->
Pattern = filename:join(Home, "*.realm"), Pattern = filename:join(Home, "*.realm"),
case filelib:wildcard(Pattern) of case filelib:wildcard(Pattern) of
[] -> {error, no_realms}; [] -> {error, no_realms};
RealmFiles -> cx_fetch_cache(RealmFiles, #cx{}) RealmFiles -> {ok, cx_populate(RealmFiles, #cx{})}
end. end.
cx_populate([File | Files], CX) -> cx_populate([File | Files], CX) ->
case file:consult(File) of case file:consult(File) of
{ok, Meta} -> {ok, Meta} ->
cx_load_realm_meta(Meta, CX); NewCX = cx_load_realm_meta(Meta, CX),
cx_populate(Files, NewCX);
{error, Reason} -> {error, Reason} ->
Message = "Realm file ~tp could not be read. Failed with: ~tp. Skipping.", Message = "Realm file ~tp could not be read. Failed with: ~tp. Skipping.",
ok = log(warning, Message, [File, Reason]), ok = log(warning, Message, [File, Reason]),
populate(Files, CX) cx_populate(Files, CX)
end. end;
cx_populate([], CX) ->
CX.
cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) -> cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) ->
@ -1167,10 +1210,11 @@ cx_next_host3(Meta = #rmeta{prime = Prime, private = Privae, mirrors = Mirrors})
%% of a given realm, taking private mirrors first, then public mirrors, and ending %% of a given realm, taking private mirrors first, then public mirrors, and ending
%% with the prime node for the realm if no others exist. %% with the prime node for the realm if no others exist.
cx_next_hosts(N, Realm, CX = #cx{assigned = Assigned}) -> cx_next_hosts(N, Realm, CX = #cx{realms = Realms}) ->
case lists:keymember(Realm, 1, Assigned) of case maps:find(Realm, Realms) of
false -> cx_next_hosts2(N, Realm, CX); {ok, #rmeta{assigned = none}} -> cx_next_hosts2(N, Realm, CX);
true -> {error, connected} {ok, _} -> {error, connected};
error -> {error, bad_realm}
end. end.
@ -1195,7 +1239,7 @@ cx_next_hosts3(N, Hosts, Meta) ->
-spec cx_add_attempt(New, CX) -> NewCX -spec cx_add_attempt(New, CX) -> NewCX
when New :: {pid(), reference(), zx:host()}, when New :: {pid(), zx:host()},
CX :: conn_index(), CX :: conn_index(),
NewCX :: conn_index(). NewCX :: conn_index().
@ -1218,19 +1262,20 @@ cx_add_attempt(New, CX = #cx{attempts = Attempts}) ->
%% The return value is a tuple that indicates whether the new connection was assigned %% The return value is a tuple that indicates whether the new connection was assigned
%% or not and the updated CX data value. %% or not and the updated CX data value.
cx_connected(Available, Conn, CX) -> cx_connected(Available, Conn, CX = #cx{attempts = Attempts, conns = Conns}) ->
cx_connected(unassigned, Available, Conn, CX). {value, {Conn, Host}, NewAttempts} = lists:keytake(Conn, 1, Attempts),
Realms = [element(1, A) || A <- Available],
NewConns = [{Conn, Host, Realms} | Conns],
NewCX = CX#cx{attempts = NewAttempts, conns = NewConns},
cx_connected(unassigned, Available, Conn, NewCX).
cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) -> cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) ->
case maps:find(Realm, Realms) of case maps:find(Realm, Realms) of
{ok, Meta = #rmeta{serial = S}} when S < Serial -> {ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial ->
NewMeta = Meta#rmeta{serial = Serial}, NewMeta = Meta#rmeta{serial = Serial, available = [Conn | Available]},
{NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX), {NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX),
cx_connected(NewA, Rest, Conn, NewCX); cx_connected(NewA, Rest, Conn, NewCX);
{ok, Meta = #rmeta{serial = S}} when S == Serial ->
{NewA, NewCX} = cx_connected(A, Realm, Conn, Meta, CX),
cx_connected(NewA, Rest, Conn, NewCX);
{ok, #rmeta{serial = S}} when S > Serial -> {ok, #rmeta{serial = S}} when S > Serial ->
cx_connected(A, Rest, Conn, CX); cx_connected(A, Rest, Conn, CX);
error -> error ->
@ -1247,11 +1292,11 @@ cx_connected(A,
Realm, Realm,
Conn, Conn,
Meta = #rmeta{prime = Prime, mirrors = Mirrors}, Meta = #rmeta{prime = Prime, mirrors = Mirrors},
CX = #cx{realms = Realms, assigned = Assigned, attempts = Attempts}) -> CX = #cx{realms = Realms, attempts = Attempts}) ->
{NewMirrors, Node} = {NewMirrors, Node} =
case lists:keyfind(Conn, 1, Attempts) of case lists:keyfind(Conn, 1, Attempts) of
{_, _, Prime} -> {Mirrors, Prime}; {_, Prime} -> {Mirrors, Prime};
{_, _, Host} -> {enqueue_unique(Host, Mirrors), Host} {_, Host} -> {enqueue_unique(Host, Mirrors), Host}
end, end,
{NewA, NewAssigned} = {NewA, NewAssigned} =
case lists:keymember(Realm, 1, Assigned) of case lists:keymember(Realm, 1, Assigned) of
@ -1277,6 +1322,72 @@ enqueue_unique(Element, Queue) ->
end. end.
-spec cx_failed(Conn, CX) -> NewCX
when Conn :: pid(),
CX :: conn_index(),
NewCX :: conn_index().
%% @private
%% Remove a failed attempt and all its associations.
cx_failed(Conn, #cx{attempts = Attempts}) ->
NewAttempts = lists:keydelete(Conn, 1, Attempts),
CX#cx{attempts = NewAttempts}.
-spec cx_redirect(Conn, Hosts, CX) -> {Unassigned, NextCX}
when Conn :: pid(),
Hosts :: [{zx:host(), [zx:realm()]}],
CX :: conn_index(),
Unassigned :: [zx:realm()],
NextCX :: conn_index().
cx_redirect(Conn, Hosts, CX) ->
NextCX = cx_failed(Conn, CX),
NewCX = #cx{realms = Realms} = cx_redirect(Hosts, NextCX),
Unassigned = cx_unassigned(NewCX),
{Unassigned, NewCX}.
-spec cx_redirect(Hosts, CX) -> NewCX
when Hosts :: [{zx:host(), [zx:realm()]}],
CX :: conn_index(),
NextCX :: conn_index().
%% @private
%% Add host to any realm mirror queues that are known to provide the realm.
cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) ->
Apply =
fun(R, M) ->
case maps:find(R, M) of
{ok, Meta = #rmeta{mirrors = Mirrors}} ->
NewMirrors =
case queue:member(Host, Mirrors) of
true -> Mirrors;
false -> queue:in(Host, Mirrors)
end,
NewMeta = Meta#rmeta{mirrors = NewMirrors},
maps:put(R, Meta, M);
error ->
M
end
end,
NewRealms = lists:foldl(Apply, Realms, Provided),
cx_redirect(Rest, CX#cx{realms = NewRealms});
cx_redirect([], CX) ->
CX.
cx_unassigned(#cx{realms = Realms}) ->
NotAssigned =
fun(Realm, #rmeta{assigned = Conn}, Unassigned) ->
case Conn == none of
true -> [Realm | Unassigned];
false -> Unassigned
end
end,
maps:fold(NotAssigned, [], Realms).
-spec cx_disconnected(Conn, CX) -> Result -spec cx_disconnected(Conn, CX) -> Result
when Conn :: pid(), when Conn :: pid(),
CX :: conn_index(), CX :: conn_index(),
@ -1291,44 +1402,53 @@ enqueue_unique(Element, Queue) ->
%% realms, and returns the monitor reference of the pid, a list of realms that are now %% realms, and returns the monitor reference of the pid, a list of realms that are now
%% unassigned, and an updated connection index. %% unassigned, and an updated connection index.
cx_disconnected(Conn, attempt, #cx{attempts = Attempts}) -> cx_disconnected(Conn, CX = #cx{realms = Realms, conns = Conns}) ->
case lists:keytake(Conn, 1, Attempts) of
{value, {Conn, Host}, NewAttempts} ->
NewCX = CX#cx{attempts = NewAttempts},
{ok, [], NewCX};
false ->
{error, unknown}
end;
cx_disconnected(Conn, conn, CX = #cx{assigned = Assigned, conns = Conns}) ->
case lists:keytake(Conn, 1, Conns) of case lists:keytake(Conn, 1, Conns) of
{value, {Conn, Host}, NewConns} -> {value, {Conn, Host, Realms}, NewConns} ->
{UnassignedRealms, NewAssigned} = cx_scrub_assigned(Conn, Assigned), {UnassignedRealms, NewRealms} = cx_scrub_assigned(Conn, Host, Realms),
NewCX = CX#cx{assigned = NewAssigned, conns = NewConns}, NewCX = CX#cx{realms = NewRealms, conns = NewConns},
{ok, UnassignedRealms, NewCX}; {ok, UnassignedRealms, NewCX};
false -> false ->
{error, unknown} {error, unknown}
end. end.
-spec cx_scrub_assigned(Pid, Assigned) -> {UnassignedRealms, NewAssigned} -spec cx_scrub_assigned(Pid, Host, Realms) -> {UnassignedRealms, NewRealms}
when Pid :: pid(), when Pid :: pid(),
Assigned :: [{zx:realm(), pid()}], Host :: zx:host(),
Realms :: realm_meta(),
UnassignedRealms :: [zx:realm()], UnassignedRealms :: [zx:realm()],
NewAssigned :: [{zx:realm(), pid()}]. NewRealms :: realm_meta().
%% @private %% @private
%% This could have been performed as a set of two list operations (a partition and a %% This could have been performed as a set of two list operations (a partition and a
%% map), but to make the procedure perfectly clear it is written out explicitly. %% map), but to make the procedure perfectly clear it is written out explicitly.
cx_scrub_assigned(Pid, Assigned) -> cx_scrub_assigned(Pid, Host, Realms) ->
cx_scrub_assigned(Pid, Assigned, [], []). Scrub =
fun
(Realm,
cx_scrub_assigned(Pid, [{Realm, Pid} | Rest], Unassigned, Assigned) -> Meta = #rmeta{mirrors = Mirrors,
cx_scrub_assigned(Pid, Rest, [Realm | Unassigned], Assigned); assigned = Assigned,
cx_scrub_assigned(Pid, [A | Rest], Unassigned, Assigned) -> available = Available},
cx_scrub_assigned(Pid, Rest, Unassigned, [A | Assigned]); {Unassigned, Metas}) ->
cx_scrub_assigned(_, [], Unassigned, Assigned) -> {NewUnassigned, NewAssigned} =
{Unassigned, Assigned}. case Assigned of
Pid -> {[Realm | Unassigned], none};
Other -> {Unassigned, Other}
end,
NewMirrors =
case queue:member(Host, Mirrors) of
false -> queue:in(Host, Mirrors);
true -> Mirrors
end,
NewAvailable = lists:delete(Pid, Available),
NewMeta = Meta#rmeta{mirrors = NewMirrors,
assigned = NewAssigned,
available = NewAvailable},
NewMetas = maps:put(Realm, NewMeta, Metas),
{NewUnassigned, NewMetas}
end,
maps:fold(Scrub, {[], maps:new()}, Realms).
-spec cx_resolve(Realm, CX) -> Result -spec cx_resolve(Realm, CX) -> Result
@ -1341,13 +1461,9 @@ cx_scrub_assigned(_, [], Unassigned, Assigned) ->
%% Check the registry of assigned realms and return the pid of the appropriate %% Check the registry of assigned realms and return the pid of the appropriate
%% connection, or an `unassigned' indication if the realm is not yet connected. %% connection, or an `unassigned' indication if the realm is not yet connected.
cx_resolve(Realm, #cx{realms = Realms, assigned = Assigned}) -> cx_resolve(Realm, #cx{realms = Realms}) ->
case lists:keyfind(Realm, 1, Assigned) of case maps:find(Realm, Realms) of
{Realm, Conn} -> {ok, #rmeta{assigned = none}} -> unassigned;
{ok, Conn}; {ok, #rmeta{assigned = Conn}} -> {ok, Conn};
false -> error -> unconfigured
case maps:is_key(Realm, Realms) ->
true -> unassigned;
false -> unconfigured
end
end. end.