diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index a684394..f2f4704 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -3,9 +3,29 @@ %%% %%% Resident execution daemon and runtime interface to Zomp. %%% -%%% The daemon lives in the background once started and awaits action requests from -%%% callers running within the system. The daemon is only capable of handling -%%% unprivileged (user) actions such as querying a Zomp node or fetching packages. +%%% The daemon lives in the background once started and awaits query requests and +%%% subscriptions from from other processes. The daemon is only capable of handling +%%% unprivileged (user) actions. +%%% +%%% +%%% Discrete state and local abstract data types +%%% +%%% The daemon must keep track of requestors, subscribers, and zx_conn processes by +%%% using monitors, and because the various types of clients are found in different +%%% locations the monitors are maintained in a data type called monitor_index(), +%%% shortened to "mx" throughout the module. This structure is treated as an opaque +%%% data type and is handled by a set of functions defined toward the end of the module +%%% as mx_*/N. +%%% +%%% Node connections (cx_conn processes) must also be tracked for status and realm +%%% availability. This is done using a type called the conn_index(), shortened to +%%% "cx" throughout the modue. conn_index() is treated as an abstract, opaque data +%%% type across the module in the same way as the monitor_index() mentioned above, +%%% and is handled via a set of cx_*/N functions defined at the end of the module +%%% after the mx_*/N section. +%%% +%%% Do NOT directly access data within these structures, use (or write) an accessor +%%% function that does what you want. %%% %%% %%% Connection handling @@ -14,63 +34,57 @@ %%% http://zxq9.com/archives/1311 %%% It is in charge of the high-level task of servicing requested actions and returning %%% responses to callers as well as mapping successful connections to configured realms -%%% and repairing failed connections to various realms by searching for and directing -%%% that connections be made to various realms to satisfy action request requirements. +%%% and repairing failed connections to nodes that reduce availability of configured +%%% realms. %%% %%% When the zx_daemon is started it checks local configuration and cache files to -%%% determine what realms must be found and what cached Zomp nodes it is aware of. -%%% It populates an internal record #cx{} (typed as conn_index()) with realm config -%%% and host cache data and then immediately initiates three connection attempts to -%%% cached nodes for each realm configured. If a host is known to service more than -%%% one configured realm only one attempt will be made at a time to connect to it. -%%% If all cached hosts for a given realm have been tried already, or if none are -%%% known, then the daemon will direct a connection be made to the prime node. +%%% determine what realms must be available and what cached Zomp nodes it is aware of. +%%% It populates the CX (conn_index(), mentioned above) with realm config and host +%%% cache data, and then immediately initiates three connection attempts to cached +%%% nodes for each realm configured (see init_connections/0). %%% -%%% The conn_index() type is abstract across the module, handled expicitly via use -%%% of manipulation functions called cx_*. The details of index manipulation can -%%% easily litter the code with incidental complexity (as far as a reader is concerned) -%%% so hiding that as abstract data leaves more mental cycles to consider the goals -%%% of the program. +%%% Once connection attempts have been initiated the daemon waits in receive for +%%% either a connection report (success or failure) or an action request from +%%% elsewhere in the system. %%% -%%% Once the connection attempts have been initiated (via zx_conn:start/1) the daemon -%%% waits in receive for either a connection report from a connection or an action -%%% request from elsewhere in the system. +%%% Connection status is relayed with report/1 and indicates to the daemon whether +%%% a connection has failed, been disconneocted, redirected, or succeeded. See the +%%% zx_conn internals for details. If a connection is successful then the zx_conn +%%% will relay the connected node's realm availability status to the daemon, and +%%% the daemon will match the node's provided realms with the configured realm list. +%%% Any realms that are not yet provided by another connection will be assigned to +%%% the reporting successful zx_conn. If no unserviced realms are provided by the +%%% node the zx_conn will be shut down but the host info will be cached for future +%%% use. Any realms that have an older serial than the serial currently known to +%%% zx will be disregarded (which may result in termination of the connection if it +%%% means there are no useful realms available on a given node). %%% -%%% A connection request is made with a call to report/1 and indicates to the daemon -%%% whether a connection has failed, been disconneocted, redirected, or succeeded. -%%% Connection handling is as follows: -%%% - A failure can occur at any time. In the event a connected and assigned zx_conn -%%% has failed (whether it was connected and assigned, or never succeeded at all) the -%%% target host will be dropped from the hosts cache and another attempt will be made -%%% in its place. -%%% - If a connection is disconnected then the host will be placed at the back of the -%%% hosts cache. -%%% - If a connection is redirected then the redirecting host will be placed at the -%%% back of the hosts cache and the list of new Zomp nodes provided by the redirect -%%% will be added at the front of the connect queue. -%%% - In the event a connection succeeds then the list of provided realms and their -%%% current serials will be compared to the list of known realms and serials, and -%%% the host will be added to the relevant realm host index if not already present -%%% and the provided serial is newer than the currently known one (but the realm -%%% serial will not be updated at this point, only the host added). After the -%%% host's record is updated across the realm indices the daemon will assign it to -%%% whatever realms it provides that are not yet services by a connection, and in -%%% the case it does not service any required and unassigned realms it will be -%%% instructed to disconnect. +%%% A failure can occur at any time. In the event a connected and assigned zx_conn +%%% has failed the target host will be dropped from the hosts cache, the zx_conn will +%%% terminate and a new one will be spawned in its place if there is a gap in +%%% configured realm coverage. %%% -%%% Because the daemon always initiates connection attempts on startup success or -%%% failure messages are guaranteed to be received without any need for a timer. For -%%% that reason there is no timed re-inspection mechanism present in this module. +%%% Nodes may be too busy (their client slots full) to accept a new connection. In +%%% this case the node should give the zx_conn a redirect instruction during protocol +%%% negotiation. The zx_conn will report the redirect and host list to the daemon, +%%% and the daemon will add the hosts to the host cache and the redirecting host will +%%% be placed at the rear of the host cache unless it is the prime node for the target +%%% realm. %%% -%%% Action requests are queued within the zx_daemon, so requests to download a package, -%%% for example, look the same as several requests to download several packages. Each -%%% request that requires dispatching to a zx_conn is held in the active action slot -%%% until complete, paired with the pid of the zx_conn handling the action. If a -%%% zx_conn dies before completing an action or between the time an action request is -%%% received by the daemon and dispatch to the zx_conn occurs then the daemon will -%%% receive the terminal monitor message and be able to put the pending action back -%%% into the action queue, re-establish a connection to the necessary realm, and then -%%% re-dispatch the action to the new zx_conn. +%%% +%%% Request queues +%%% +%%% Requests, reports and subscription updates are all either forwarded to affected +%%% processes or entered into a work queue. All such work requests are received as +%%% asynchronous messages and cause the work queue to first be updated, and then, +%%% as a separate step, the work queue is re-evaluated in its entirety. Any work that +%%% cannot be completed (due to a realm not being available, for example) is recycled +%%% to the queue. A connection report also triggers a queue re-evaluation, so there +%%% should not be cases where the work queue stalls on active requests. +%%% +%%% Requestors sending either download or realm query requests are given a reference +%%% to match on for receipt of their result messages or to be used to cancel the +%%% requested work (timeouts are handled by the caller, not by the daemon). %%% %%% A bit of state handling is required (queueing requests and storing the current %%% action state), but this permits the system above the daemon to interact with it in @@ -113,9 +127,8 @@ -record(cx, {realms = #{} :: #{zx:realm() := realm_meta()}, - assigned = [] :: [{zx:realm(), pid()}], - attempts = [] :: [{pid(), zx:host()}], - conns = [] :: [{pid(), zx:host()}]}). + attempts = [] :: [{pid(), zx:host(), [zx:realm()]}], + conns = [] :: [{pid(), zx:host(), [zx:realm()]}]}). -record(rmeta, @@ -126,7 +139,9 @@ mirrors = queue:new() :: queue:queue(zx:host()), realm_keys = [] :: [zx:key_meta()], package_keys = [] :: [zx:key_meta()], - sysops = [] :: [zx:sysop_meta()]}). + sysops = [] :: [zx:sysop_meta()], + assigned = none :: none | pid(), + available = [] :: [pid()]}). %% State Types -type state() :: #s{}. @@ -134,17 +149,20 @@ -type conn_index() :: #cx{}. -type realm_meta() :: #rmeta{}. -type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()} - | conn_attempt + | attempt | conn. %% Conn Communication -type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} + | {redirect, Hosts :: [zx:host()]} | disconnected. %% Subscriber / Requestor Communication % Incoming Request messages --type request() :: {Requestor :: pid(), Action :: action() | subunsub()}. --type action() :: {list, realms | zx:identifier()} +-type action () :: {Requestor :: pid(), + Request :: request() | subunsub()}. + +-type request() :: {list, realms | zx:identifier()} | {latest, zx:identifier()} | {fetch, zx:package_id()} | {key, zx:key_id()} @@ -156,6 +174,12 @@ | {unsubscribe, zx:package()}. % Outgoing Result Messages +% +% Results are sent wrapped a triple: {result, Ref, Result} +% where the result itself is a triple: {Type, Identifier, Content} +% +% Subscription messages are a separate type below. + -type result() :: sub_result() | list_result() | latest_result() @@ -165,9 +189,6 @@ | pack_result() | maint_result() | sysop_result(). --type sub_result() :: {subscription, zx:package(), - Message :: {update, zx:package_id()} - | {error, bad_realm | bad_package}}. -type list_result() :: {list, realms, Message :: {ok, [zx:realm()]}} | {list, zx:realm(), @@ -177,7 +198,7 @@ Message :: {ok, [zx:version]} | {error, bad_realm | bad_package - | timeout}} + | timeout}}. -type latest_result() :: {latest, Package :: zx:package() | zx:package_id(), @@ -218,6 +239,12 @@ | timeout}. +% Subscription Results +-type sub_message() :: {subscription, zx:package(), + Message :: {update, zx:package_id()} + | {error, bad_realm | bad_package}}. + + %%% Requestor Interface -spec pass_meta(Meta, Dir, ArgV) -> ok @@ -240,19 +267,12 @@ pass_meta(Meta, Dir, ArgV) -> gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). --spec subscribe(Package) -> Result - when Package :: zx:package(), - Result :: ok - | {error, Reason}, - Reason :: illegal_requestor - | {already_subscribed, zx:package()}. +-spec subscribe(Package) -> ok + when Package :: zx:package(). %% @doc %% Subscribe to update notification for a for a particular package. -%% The daemon is designed to monitor a single package at a time, so a second call to -%% subscribe/1 will return an `already_subscribed' error, or possibly an -%% `illegal_requestor' error in the case that a second call is made from a different -%% process than the original requestor. -%% Other functions can be used to query the status of a package at an arbitrary time. +%% The caller will receive update notifications of type sub_result() as Erlang +%% messages whenever an update occurs. subscribe(Package) -> gen_server:cast(?MODULE, {subscribe, self(), Package}). @@ -354,7 +374,7 @@ request(Action) -> -%%% Connection interface +%%% Upstream Zomp connection interface -spec report(Message) -> ok when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]} @@ -402,7 +422,7 @@ start_link() -> init(none) -> {ok, MX, CX} = init_connections(), - State = #s{mx = MX, cx = NewCX}, + State = #s{mx = MX, cx = CX}, {ok, State}. @@ -413,15 +433,20 @@ init_connections() -> init_connections(Realms, MX, CX). -init_connections([Realm | Realms], Monitors, CX) -> +init_connections([Realm | Realms], MX, CX = #cx{attempts = Attempts}) -> {ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX), StartConn = - fun(Host) -> - {ok, Pid} = zx_conn:start(Host), - {Pid, Host} + fun(Host, A) -> + case lists:keymember(Host, 2, Attempts) of + false -> + {ok, Pid} = zx_conn:start(Host), + [{Pid, Host} | A]; + true -> + A + end end, - NewAttempts = lists:map(StartConn, Hosts), - AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, conn_attempt, M) end, + NewAttempts = lists:foldl(StartConn, [], Hosts), + AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, attempt, M) end, NewMX = lists:foldl(AddMonitor, MX, NewAttempts), NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts), init_connections(Realms, NewMX, NewCX); @@ -556,16 +581,21 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> {ScrubbedMX, NextCX} end, State#s{mx = NewMX, cx = NewCX}; +do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) -> + NextMX = mx_del_monitor(Conn, attempt, MX), + {Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX), + {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX) + State#s{mx = NewMX, cx = NewCX}; do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) -> {Unassigned, NextMX, NextCX} = case mx_lookup_category(Conn, MX) of - conn_attempt -> - {ok, [], ScrubbedCX} cx_disconnected(Conn, attempt, CX), - ScrubbedMX = mx_del_monitor(Conn, conn_attempt, MX), + attempt -> + ScrubbedMX = mx_del_monitor(Conn, attempt, MX), + ScrubbedCX = cx_failed(Conn, CX), {[], ScrubbedMX, ScrubbedCX}; conn -> - {ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX), ScrubbedMX = mx_del_monitor(Conn, conn, MX), + {ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX), {Dropped, ScrubbedMX, ScrubbedCX} end, {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX), @@ -581,11 +611,21 @@ do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) -> %% @private %% Initiates a single connection and updates the relevant structures. -init_connection([Realm | Realms], MX, CX) -> - {ok, Host, NextCX} = cx_next_hosts(Realm, CX), - {ok, Pid} = zx_conn:start(Host), - NewMX = mx_add_monitor(Pid, conn_attempt, MX), - NewCX = cx_add_attempt({Pid, Host}, CX), +init_connection([Realm | Realms], MX, CX = #cx{realms = Metas}) -> + {NewMX, NewCX} = + case maps:get(Realm) of + #rmeta{available = []} -> + {ok, Host, UpdatedCX} = cx_next_hosts(Realm, CX), + {ok, Pid} = zx_conn:start(Host), + NextMX = mx_add_monitor(Pid, attempt, MX), + NextCX = cx_add_attempt({Pid, Host}, UpdatedCX), + {NextMX, NextCX}; + Meta = #rmeta{available = [Conn | Conns]} -> + NewMeta = Meta#rmeta{assigned = Conn, available = Conns}, + NewMetas = maps:put(Realm, NewMeta, Metas), + NextCX = CX#cx{realms = NewMetas}, + {MX, NewCX} + end, init_connection(Realms, NewMX, NewCX); init_connection([], MX, CX) -> {MX, CX}. @@ -883,12 +923,12 @@ mx_add_monitor(Pid, requestor, MX) -> Ref = monitor(process, Pid), [{Ref, Pid, {0, 1}} | MX] end; -mx_add_monitor(Pid, conn_attempt, MX) -> +mx_add_monitor(Pid, attempt, MX) -> false = lists:keymember(Pid, 2, MX), Ref = monitor(process, Pid), - [{Ref, Pid, conn_attempt} | MX]; + [{Ref, Pid, attempt} | MX]; mx_add_monitor(Pid, conn, MX) -> - {value, {Ref, Pid, conn_attempt}, NextMX} = lists:keytake(Pid, 2, MX), + {value, {Ref, Pid, attempt}, NextMX} = lists:keytake(Pid, 2, MX), [{Ref, Pid, conn} | NextMX], @@ -924,7 +964,7 @@ mx_del_monitor(Pid, Category, MX) -> -spec mx_lookup_category(pid(), monitor_index()) -> Result - when Result :: conn_attempt + when Result :: attempt | conn | requestor | subscriber @@ -935,7 +975,7 @@ mx_del_monitor(Pid, Category, MX) -> mx_lookup_category(Pid, MX) -> case lists:keyfind(Pid, 2, MX) of - {_, _, conn_attempt} -> conn_attempt; + {_, _, attempt} -> attempt; {_, _, conn} -> conn; {_, _, {0, _}} -> requestor; {_, _, {_, 0}} -> subscriber; @@ -951,11 +991,11 @@ mx_lookup_category(Pid, MX) -> | {error, not_found}, NewMX :: monitor_index(). %% @private -%% Upgrade a conn_attempt to a conn. +%% Upgrade a attempt to a conn. mx_upgrade_conn(Pid, MX) -> case lists:keytake(Pid, 2, MX) of - {value, {Ref, Pid, conn_attempt}, NextMX} -> + {value, {Ref, Pid, attempt}, NextMX} -> NewMX = [{Ref, Pid, conn} | NextMX], {ok, NewMX}; false -> @@ -1002,19 +1042,22 @@ cx_populate() -> Pattern = filename:join(Home, "*.realm"), case filelib:wildcard(Pattern) of [] -> {error, no_realms}; - RealmFiles -> cx_fetch_cache(RealmFiles, #cx{}) + RealmFiles -> {ok, cx_populate(RealmFiles, #cx{})} end. cx_populate([File | Files], CX) -> case file:consult(File) of {ok, Meta} -> - cx_load_realm_meta(Meta, CX); + NewCX = cx_load_realm_meta(Meta, CX), + cx_populate(Files, NewCX); {error, Reason} -> Message = "Realm file ~tp could not be read. Failed with: ~tp. Skipping.", ok = log(warning, Message, [File, Reason]), - populate(Files, CX) - end. + cx_populate(Files, CX) + end; +cx_populate([], CX) -> + CX. cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) -> @@ -1167,10 +1210,11 @@ cx_next_host3(Meta = #rmeta{prime = Prime, private = Privae, mirrors = Mirrors}) %% of a given realm, taking private mirrors first, then public mirrors, and ending %% with the prime node for the realm if no others exist. -cx_next_hosts(N, Realm, CX = #cx{assigned = Assigned}) -> - case lists:keymember(Realm, 1, Assigned) of - false -> cx_next_hosts2(N, Realm, CX); - true -> {error, connected} +cx_next_hosts(N, Realm, CX = #cx{realms = Realms}) -> + case maps:find(Realm, Realms) of + {ok, #rmeta{assigned = none}} -> cx_next_hosts2(N, Realm, CX); + {ok, _} -> {error, connected}; + error -> {error, bad_realm} end. @@ -1195,7 +1239,7 @@ cx_next_hosts3(N, Hosts, Meta) -> -spec cx_add_attempt(New, CX) -> NewCX - when New :: {pid(), reference(), zx:host()}, + when New :: {pid(), zx:host()}, CX :: conn_index(), NewCX :: conn_index(). @@ -1218,19 +1262,20 @@ cx_add_attempt(New, CX = #cx{attempts = Attempts}) -> %% The return value is a tuple that indicates whether the new connection was assigned %% or not and the updated CX data value. -cx_connected(Available, Conn, CX) -> - cx_connected(unassigned, Available, Conn, CX). +cx_connected(Available, Conn, CX = #cx{attempts = Attempts, conns = Conns}) -> + {value, {Conn, Host}, NewAttempts} = lists:keytake(Conn, 1, Attempts), + Realms = [element(1, A) || A <- Available], + NewConns = [{Conn, Host, Realms} | Conns], + NewCX = CX#cx{attempts = NewAttempts, conns = NewConns}, + cx_connected(unassigned, Available, Conn, NewCX). cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) -> case maps:find(Realm, Realms) of - {ok, Meta = #rmeta{serial = S}} when S < Serial -> - NewMeta = Meta#rmeta{serial = Serial}, + {ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial -> + NewMeta = Meta#rmeta{serial = Serial, available = [Conn | Available]}, {NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX), cx_connected(NewA, Rest, Conn, NewCX); - {ok, Meta = #rmeta{serial = S}} when S == Serial -> - {NewA, NewCX} = cx_connected(A, Realm, Conn, Meta, CX), - cx_connected(NewA, Rest, Conn, NewCX); {ok, #rmeta{serial = S}} when S > Serial -> cx_connected(A, Rest, Conn, CX); error -> @@ -1247,11 +1292,11 @@ cx_connected(A, Realm, Conn, Meta = #rmeta{prime = Prime, mirrors = Mirrors}, - CX = #cx{realms = Realms, assigned = Assigned, attempts = Attempts}) -> + CX = #cx{realms = Realms, attempts = Attempts}) -> {NewMirrors, Node} = case lists:keyfind(Conn, 1, Attempts) of - {_, _, Prime} -> {Mirrors, Prime}; - {_, _, Host} -> {enqueue_unique(Host, Mirrors), Host} + {_, Prime} -> {Mirrors, Prime}; + {_, Host} -> {enqueue_unique(Host, Mirrors), Host} end, {NewA, NewAssigned} = case lists:keymember(Realm, 1, Assigned) of @@ -1277,6 +1322,72 @@ enqueue_unique(Element, Queue) -> end. +-spec cx_failed(Conn, CX) -> NewCX + when Conn :: pid(), + CX :: conn_index(), + NewCX :: conn_index(). +%% @private +%% Remove a failed attempt and all its associations. + +cx_failed(Conn, #cx{attempts = Attempts}) -> + NewAttempts = lists:keydelete(Conn, 1, Attempts), + CX#cx{attempts = NewAttempts}. + + +-spec cx_redirect(Conn, Hosts, CX) -> {Unassigned, NextCX} + when Conn :: pid(), + Hosts :: [{zx:host(), [zx:realm()]}], + CX :: conn_index(), + Unassigned :: [zx:realm()], + NextCX :: conn_index(). + +cx_redirect(Conn, Hosts, CX) -> + NextCX = cx_failed(Conn, CX), + NewCX = #cx{realms = Realms} = cx_redirect(Hosts, NextCX), + Unassigned = cx_unassigned(NewCX), + {Unassigned, NewCX}. + + +-spec cx_redirect(Hosts, CX) -> NewCX + when Hosts :: [{zx:host(), [zx:realm()]}], + CX :: conn_index(), + NextCX :: conn_index(). +%% @private +%% Add host to any realm mirror queues that are known to provide the realm. + +cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) -> + Apply = + fun(R, M) -> + case maps:find(R, M) of + {ok, Meta = #rmeta{mirrors = Mirrors}} -> + NewMirrors = + case queue:member(Host, Mirrors) of + true -> Mirrors; + false -> queue:in(Host, Mirrors) + end, + NewMeta = Meta#rmeta{mirrors = NewMirrors}, + maps:put(R, Meta, M); + error -> + M + end + end, + NewRealms = lists:foldl(Apply, Realms, Provided), + cx_redirect(Rest, CX#cx{realms = NewRealms}); +cx_redirect([], CX) -> + CX. + + +cx_unassigned(#cx{realms = Realms}) -> + NotAssigned = + fun(Realm, #rmeta{assigned = Conn}, Unassigned) -> + case Conn == none of + true -> [Realm | Unassigned]; + false -> Unassigned + end + end, + maps:fold(NotAssigned, [], Realms). + + -spec cx_disconnected(Conn, CX) -> Result when Conn :: pid(), CX :: conn_index(), @@ -1291,44 +1402,53 @@ enqueue_unique(Element, Queue) -> %% realms, and returns the monitor reference of the pid, a list of realms that are now %% unassigned, and an updated connection index. -cx_disconnected(Conn, attempt, #cx{attempts = Attempts}) -> - case lists:keytake(Conn, 1, Attempts) of - {value, {Conn, Host}, NewAttempts} -> - NewCX = CX#cx{attempts = NewAttempts}, - {ok, [], NewCX}; - false -> - {error, unknown} - end; -cx_disconnected(Conn, conn, CX = #cx{assigned = Assigned, conns = Conns}) -> +cx_disconnected(Conn, CX = #cx{realms = Realms, conns = Conns}) -> case lists:keytake(Conn, 1, Conns) of - {value, {Conn, Host}, NewConns} -> - {UnassignedRealms, NewAssigned} = cx_scrub_assigned(Conn, Assigned), - NewCX = CX#cx{assigned = NewAssigned, conns = NewConns}, + {value, {Conn, Host, Realms}, NewConns} -> + {UnassignedRealms, NewRealms} = cx_scrub_assigned(Conn, Host, Realms), + NewCX = CX#cx{realms = NewRealms, conns = NewConns}, {ok, UnassignedRealms, NewCX}; false -> {error, unknown} end. --spec cx_scrub_assigned(Pid, Assigned) -> {UnassignedRealms, NewAssigned} +-spec cx_scrub_assigned(Pid, Host, Realms) -> {UnassignedRealms, NewRealms} when Pid :: pid(), - Assigned :: [{zx:realm(), pid()}], + Host :: zx:host(), + Realms :: realm_meta(), UnassignedRealms :: [zx:realm()], - NewAssigned :: [{zx:realm(), pid()}]. + NewRealms :: realm_meta(). %% @private %% This could have been performed as a set of two list operations (a partition and a %% map), but to make the procedure perfectly clear it is written out explicitly. -cx_scrub_assigned(Pid, Assigned) -> - cx_scrub_assigned(Pid, Assigned, [], []). - - -cx_scrub_assigned(Pid, [{Realm, Pid} | Rest], Unassigned, Assigned) -> - cx_scrub_assigned(Pid, Rest, [Realm | Unassigned], Assigned); -cx_scrub_assigned(Pid, [A | Rest], Unassigned, Assigned) -> - cx_scrub_assigned(Pid, Rest, Unassigned, [A | Assigned]); -cx_scrub_assigned(_, [], Unassigned, Assigned) -> - {Unassigned, Assigned}. +cx_scrub_assigned(Pid, Host, Realms) -> + Scrub = + fun + (Realm, + Meta = #rmeta{mirrors = Mirrors, + assigned = Assigned, + available = Available}, + {Unassigned, Metas}) -> + {NewUnassigned, NewAssigned} = + case Assigned of + Pid -> {[Realm | Unassigned], none}; + Other -> {Unassigned, Other} + end, + NewMirrors = + case queue:member(Host, Mirrors) of + false -> queue:in(Host, Mirrors); + true -> Mirrors + end, + NewAvailable = lists:delete(Pid, Available), + NewMeta = Meta#rmeta{mirrors = NewMirrors, + assigned = NewAssigned, + available = NewAvailable}, + NewMetas = maps:put(Realm, NewMeta, Metas), + {NewUnassigned, NewMetas} + end, + maps:fold(Scrub, {[], maps:new()}, Realms). -spec cx_resolve(Realm, CX) -> Result @@ -1341,13 +1461,9 @@ cx_scrub_assigned(_, [], Unassigned, Assigned) -> %% Check the registry of assigned realms and return the pid of the appropriate %% connection, or an `unassigned' indication if the realm is not yet connected. -cx_resolve(Realm, #cx{realms = Realms, assigned = Assigned}) -> - case lists:keyfind(Realm, 1, Assigned) of - {Realm, Conn} -> - {ok, Conn}; - false -> - case maps:is_key(Realm, Realms) -> - true -> unassigned; - false -> unconfigured - end +cx_resolve(Realm, #cx{realms = Realms}) -> + case maps:find(Realm, Realms) of + {ok, #rmeta{assigned = none}} -> unassigned; + {ok, #rmeta{assigned = Conn}} -> {ok, Conn}; + error -> unconfigured end.