%%% @doc %%% ZX Daemon %%% %%% Resident execution daemon and runtime interface to Zomp. %%% %%% The daemon resides in the background once started and awaits query requests and %%% subscriptions from from other processes. The daemon is only capable of handling %%% unprivileged (user) actions. %%% %%% %%% Discrete state and local abstract data types %%% %%% The daemon must keep track of requestors, subscribers, and zx_conn processes by %%% using monitors, and because the various types of clients are found in different %%% locations the monitors are maintained in a data type called monitor_index(), %%% shortened to "mx" throughout the module. This structure is treated as an opaque %%% data type and is handled by a set of functions defined toward the end of the module %%% as mx_*/N. %%% %%% Node connections (cx_conn processes) must also be tracked for status and realm %%% availability. This is done using a type called conn_index(), shortened to "cx" %%% throughout the module. conn_index() is treated as an abstract, opaque datatype %%% throughout the module, and is handled via a set of cx_*/N functions (after the %%% mx_*/N section). %%% %%% Do NOT directly access data within these structures, use (or write) an accessor %%% function that does what you want. %%% %%% %%% Connection handling %%% %%% The daemon is structured as a service manager in a service -> worker structure. %%% http://zxq9.com/archives/1311 %%% It is in charge of the high-level task of servicing requested actions and returning %%% responses to callers as well as mapping successful connections to configured realms %%% and repairing failed connections to nodes that reduce availability of configured %%% realms. %%% %%% When the zx_daemon is started it checks local configuration and cache files to %%% determine what realms must be available and what cached Zomp nodes it is aware of. %%% It populates the CX (conn_index(), mentioned above) with realm config and host %%% cache data, and then immediately initiates three connection attempts to cached %%% nodes for each realm configured (if possible; see init_connections/0). %%% %%% Once connection attempts have been initiated the daemon waits in receive for %%% either a connection report (success or failure) or an action request from %%% elsewhere in the system. %%% %%% Connection status is relayed with report/1 and indicates to the daemon whether %%% a connection has failed, been disconneocted, redirected, or succeeded. See the %%% zx_conn internals for details. If a connection is successful then the zx_conn %%% will relay the connected node's realm availability status to the daemon, and %%% the daemon will match the node's provided realms with the configured realm list. %%% Any realms that are not yet provided by another connection will be assigned to %%% the reporting successful zx_conn. If no unserviced realms are provided by the %%% node the zx_conn will be shut down but the host info will be cached for future %%% use. Any realms that have an older serial than the serial currently known to %%% zx will be disregarded (which may result in termination of the connection if it %%% means there are no useful realms available on a given node). %%% %%% A failure can occur at any time. In the event a connected and assigned zx_conn %%% has failed the target host will be dropped from the hosts cache, the zx_conn will %%% terminate and a new one will be spawned in its place if there is a gap in %%% configured realm coverage. %%% %%% Nodes may be too busy (their client slots full) to accept a new connection. In %%% this case the node should give the zx_conn a redirect instruction during protocol %%% negotiation. The zx_conn will report the redirect and host list to the daemon, %%% and the daemon will add the hosts to the host cache and the redirecting host will %%% be placed at the rear of the host cache unless it is the prime node for the target %%% realm. %%% %%% %%% Request queues %%% %%% Requests, reports and subscription updates are all either forwarded to affected %%% processes or entered into a work queue. All such work requests are received as %%% asynchronous messages and cause the work queue to first be updated, and then, %%% as a separate step, the work queue is re-evaluated in its entirety. Any work that %%% cannot be completed (due to a realm not being available, for example) is recycled %%% to the queue. A connection report also triggers a queue re-evaluation, so there %%% should not be cases where the work queue stalls on active requests. %%% %%% Requestors sending either download or realm query requests are given a reference %%% to match on for receipt of their result messages or to be used to cancel the %%% requested work (timeouts are handled by the caller, not by the daemon). %%% %%% A bit of state handling is required (queueing requests and storing the current %%% action state), but this permits the system above the daemon to interact with it in %%% a blocking way, establishing its own receive timeouts or implementing either an %%% asynchronous or synchronous interface library atop zx_daemon interface function, %%% but leaving zx_daemon and zx_conn alone to work asynchronously with one another. %%% @end -module(zx_daemon). -behavior(gen_server). -author("Craig Everett "). -copyright("Craig Everett "). -license("GPL-3.0"). -export([pass_meta/3, subscribe/1, unsubscribe/1, list/0, list/1, list/2, list/3, latest/1, fetch/1, key/1, pending/1, packagers/1, maintainers/1, sysops/1]). -export([report/1, result/2, notify/2]). -export([start_link/0, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -export_type([id/0, result/0, realm_list/0, package_list/0, version_list/0, latest_result/0, fetch_result/0, key_result/0, pending_result/0, pack_result/0, maint_result/0, sysop_result/0, sub_message/0]). -include("zx_logger.hrl"). %%% Type Definitions -record(s, {meta = none :: none | zx:package_meta(), home = none :: none | file:filename(), argv = none :: none | [string()], id = 0 :: id(), actions = [] :: [request()], requests = maps:new() :: requests(), dropped = maps:new() :: requests(), mx = mx_new() :: monitor_index(), cx = cx_load() :: conn_index()}). -record(cx, {realms = #{} :: #{zx:realm() := realm_meta()}, attempts = [] :: [{pid(), zx:host(), [zx:realm()]}], conns = [] :: [connection()]}). -record(rmeta, {revision = 0 :: non_neg_integer(), serial = 0 :: non_neg_integer(), prime = {"zomp.tsuriai.jp", 11311} :: zx:host(), private = [] :: [zx:host()], mirrors = queue:new() :: queue:queue(zx:host()), realm_keys = [] :: [zx:key_meta()], package_keys = [] :: [zx:key_meta()], sysops = [] :: [zx:sysop_meta()], assigned = none :: none | pid(), available = [] :: [pid()]}). -record(conn, {pid :: pid(), host :: zx:host(), realms :: [zx:realm()], requests :: [id()], subs :: [{pid(), zx:package()}]}). %% State Types -type state() :: #s{}. -opaque id() :: non_neg_integer(). -type request() :: {subscribe, pid(), zx:package()} | {unsubscribe, pid(), zx:package()} | {request, pid(), id(), action()}. -type requests() :: #{id() := {pid(), action()}}. -type monitor_index() :: #{pid() := {reference(), category()}}. -type conn_index() :: #cx{}. -type realm_meta() :: #rmeta{}. -type connection() :: #conn{}. -type category() :: {Reqs :: [id()], Subs :: [zx:package()]} | attempt | conn. %% Conn Communication -type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} | {redirect, Hosts :: [zx:host()]} | disconnected. %% Subscriber / Requestor Communication % Incoming Request messages % This form allows a bit of cheating with blind calls to `element(2, Request)'. -type action() :: list | {list, zx:realm()} | {list, zx:realm(), zx:name()} | {list, zx:realm(), zx:name(), zx:version()} | {latest, zx:realm(), zx:name(), zx:version()} | {fetch, zx:realm(), zx:name(), zx:version()} | {key, zx:realm(), zx:key_name()} | {pending, zx:realm(), zx:name()} | {packagers, zx:realm(), zx:name()} | {maintainers, zx:realm(), zx:name()} | {sysops, zx:realm()}. % Outgoing Result Messages % % Results are sent wrapped a triple: {result, Ref, Result} % where the result itself is a triple: {Type, Identifier, Content} % % Subscription messages are a separate type below. -type result() :: {z_result, RequestID :: id(), Message :: realm_list() | package_list() | version_list() | latest_result() | fetch_result() | key_result() | pending_result() | pack_result() | maint_result() | sysop_result()}. -type realm_list() :: [zx:realm()]. -type package_list() :: {ok, [zx:name()]} | {error, bad_realm | timeout}. -type version_list() :: {ok, [zx:version()]} | {error, bad_realm | bad_package | timeout}. -type latest_result() :: {ok, zx:version()} | {error, bad_realm | bad_package | bad_version | timeout}. -type fetch_result() :: {hops, non_neg_integer()} | done | {error, bad_realm | bad_package | bad_version | timeout}. -type key_result() :: done | {error, bad_realm | bad_key | timeout}. -type pending_result() :: {ok, [zx:version()]} | {error, bad_realm | bad_package | timeout}. -type pack_result() :: {ok, [zx:user()]} | {error, bad_realm | bad_package | timeout}. -type maint_result() :: {ok, [zx:user()]} | {error, bad_realm | bad_package | timeout}. -type sysop_result() :: {ok, [zx:user()]} | {error, bad_host | timeout}. % Subscription Results -type sub_message() :: {z_sub, zx:package(), Message :: {update, zx:package_id()} | {error, bad_realm | bad_package}}. %%% Requestor Interface -spec pass_meta(Meta, Dir, ArgV) -> ok when Meta :: zx:package_meta(), Dir :: file:filename(), ArgV :: [string()]. %% @private %% Load the daemon with the primary running application's meta data and location within %% the filesystem. This step allows running development code from any location in %% the filesystem against installed dependencies without requiring any magical %% references. %% %% This call blocks specifically so that we can be certain that the target application %% cannot be started before the impact of this call has taken full effect. It cannot %% be known whether the very first thing the target application will do is send this %% process an async message. That implies that this should only ever be called once, %% by the launching process (which normally terminates shortly thereafter). pass_meta(Meta, Dir, ArgV) -> gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). -spec subscribe(Package) -> ok when Package :: zx:package(). %% @doc %% Subscribe to update notifications for a for a particular package. %% The caller will receive update notifications of type `sub_message()' as Erlang %% messages whenever an update occurs. %% Crashes the caller if the Realm or Name of the Package argument are illegal %% `zx:lower0_9' strings. subscribe(Package = {Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), gen_server:cast(?MODULE, {subscribe, self(), Package}). -spec unsubscribe(Package) -> ok when Package :: zx:package(). %% @doc %% Instructs the daemon to unsubscribe if subscribed. Has no effect if not subscribed. %% Crashes the caller if the Realm or Name of the Package argument are illegal %% `lower0_9' strings. unsubscribe(Package = {Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), gen_server:cast(?MODULE, {unsubscribe, self(), Package}). -spec list() -> realm_list(). %% @doc %% Request a list of currently configured realms. Because this call is entirely local %% it is the only one that does not involve a round-trip list() -> gen_server:call(?MODULE, {request, list}). -spec list(Realm) -> {ok, RequestID} when Realm :: zx:realm(), RequestID :: term(). %% @doc %% Requests a list of packages provided by the given realm. %% Returns a request ID which will be returned in a message with the result from an %% upstream zomp node. Crashes the caller if Realm is an illegal string. %% %% Response messages are of the type `result()' where the third element is of the %% type `package_list()'. list(Realm) -> true = zx_lib:valid_lower0_9(Realm), request({list, Realm}). -spec list(Realm, Name) -> {ok, RequestID} when Realm :: zx:realm(), Name :: zx:name(), RequestID :: term(). %% @doc %% Requests a list of package versions. %% Returns a request ID which will be returned in a message with the result from an %% upstream zomp node. Crashes the if Realm or Name are illegal strings. %% %% Response messages are of the type `result()' where the third element is of the %% type `version_list()'. list(Realm, Name) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), request({list, Realm, Name}). -spec list(Realm, Name, Version) -> {ok, RequestID} when Realm :: zx:realm(), Name :: zx:name(), Version :: zx:version(), RequestID :: term(). %% @doc %% Request a list of package versions constrained by a partial version. %% Returns a request ID which will be returned in a message with the result from an %% upstream zomp node. Can be used to check for a specific version by testing for a %% response of `{error, bad_version}' when a full version number is provided. %% Crashes the caller on an illegal realm name, package name, or version tuple. %% %% Response messages are of the type `result()' where the third element is of the %% type `list_result()'. list(Realm, Name, Version) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), true = zx_lib:valid_version(Version), request({list, Realm, Name, Version}). -spec latest(Identifier) -> {ok, RequestID} when Identifier :: zx:package() | zx:package_id(), RequestID :: integer(). %% @doc %% Request the lastest version of a package within the provided version constraint. %% If no version is provided then the latest version overall will be returned. %% Returns a request ID which will be returned in a message with the result from an %% upstream zomp node. Crashes the caller on an illegal realm name, package name or %% version tuple. %% %% Response messages are of the type `result()' where the third element is of the %% type `latest_result()'. latest({Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), request({latest, Realm, Name, {z, z, z}}); latest({Realm, Name, Version}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), true = zx_lib:valid_version(Version), request({latest, Realm, Name, Version}). -spec fetch(PackageID) -> {ok, RequestID} when PackageID :: zx:package_id(), RequestID :: integer(). %% @doc %% Ensure a package is available locally, or queue it for download otherwise. %% Returns a request ID which will be returned in a message with the result from an %% upstream zomp node. Crashes the caller on an illegal realm name, package name or %% version tuple. %% %% Response messages are of the type `result()' where the third element is of the %% type `fetch_result()'. fetch({Realm, Name, Version}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), true = zx_lib:valid_version(Version), request({fetch, Realm, Name, Version}). -spec key(KeyID) -> {ok, RequestID} when KeyID :: zx:key_id(), RequestID :: id(). %% @doc %% Request a public key be fetched from its relevant realm. %% Crashes the caller if either component of the KeyID is illegal. %% %% Response messages are of the type `result()' where the third element is of the %% type `key_result()'. key({Realm, KeyName}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(KeyName), request({key, Realm, KeyName}). -spec pending(Package) -> {ok, RequestID} when Package :: zx:package(), RequestID :: id(). %% @doc %% Request the list of versions of a given package that have been submitted but not %% signed and included in their relevant realm. %% Crashes the caller if either component of the Package is illegal. %% %% Response messages are of the type `result()' where the third element is of the %% type `pending_result()'. pending({Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), request({pending, Realm, Name}). -spec packagers(Package) -> {ok, RequestID} when Package :: zx:package(), RequestID :: id(). %% @doc %% Request a list of packagers assigned to work on a given package. %% Crashes the caller if either component of the Package is illegal. %% %% Response messages are of the type `result()' where the third element is of the %% type `pack_result()'. packagers({Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), request({packagers, Realm, Name}). -spec maintainers(Package) -> {ok, RequestID} when Package :: zx:package(), RequestID :: id(). %% @doc %% Request a list of maintainers assigned to work on a given package. %% Crashes the caller if either component of the Package is illegal. %% %% Response messages are of the type `result()' where the third element is of the %% type `maint_result()'. maintainers({Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), true = zx_lib:valid_lower0_9(Name), request({maintainers, Realm, Name}). -spec sysops(Realm) -> {ok, RequestID} when Realm :: zx:realm(), RequestID :: id(). %% @doc %% Request a list of sysops in charge of maintaining a given realm. What this %% effectively does is request the sysops of the prime host of the given realm. %% Crashes the caller if the Realm string is illegal. %% %% Response messages are of the type `result()' where the third element is of the %% type `sysops_result()'. sysops(Realm) -> true = zx_lib:valid_lower0_9(Realm), request({sysops, Realm}). %% Request Caster -spec request(action()) -> {ok, RequestID} when RequestID :: integer(). %% @private %% Private function to wrap the necessary bits up. request(Action) -> gen_server:call(?MODULE, {request, self(), Action}). %%% Upstream Zomp connection interface -spec report(Message) -> ok when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]} | {redirect, Hosts :: [{zx:host(), [zx:realm()]}]} | failed | disconnected. %% @private %% Should only be called by a zx_conn. This function is how a zx_conn reports its %% current connection status and job results. report(Message) -> gen_server:cast(?MODULE, {report, self(), Message}). -spec result(reference(), result()) -> ok. %% @private %% Return a tagged result back to the daemon to be forwarded to the original requestor. result(Reference, Result) -> gen_server:cast(?MODULE, {result, Reference, Result}). -spec notify(Package, Message) -> ok when Package :: zx:package(), Message :: term(). %% @private %% Function called by a connection when a subscribed update arrives. notify(Package, Message) -> gen_server:cast(?MODULE, {notify, self(), Package, Message}). %%% Startup -spec start_link() -> {ok, pid()} | {error, term()}. %% @private %% Startup function -- intended to be called by supervisor. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). -spec init(none) -> {ok, state()}. init(none) -> Blank = blank_state(), {ok, MX, CX} = init_connections(), State = Blank#s{mx = MX, cx = CX}, {ok, State}. -spec blank_state() -> state(). %% @private %% Used to generate a correct, but exactly empty state. %% Useful mostly for testing and validation, though also actually used in the program. blank_state() -> #s{}. -spec init_connections() -> {ok, MX, CX} when MX :: monitor_index(), CX :: conn_index(). %% @private %% Starting from a stateless condition, recruit and resolve all realm relevant data, %% populate host caches, and initiate connections to required realms. On completion %% return a populated MX and CX to the caller. Should only ever be called by init/1. %% Returns an `ok' tuple to disambiguate it from pure functions *and* to leave an %% obvious place to populate error returns in the future if desired. init_connections() -> CX = cx_load(), MX = mx_new(), Realms = cx_realms(CX), init_connections(Realms, MX, CX). -spec init_connections(Realms, MX, CX) -> {ok, NewMX, NewCX} when Realms :: [zx:realm()], MX :: monitor_index(), CX :: conn_index(), NewMX :: monitor_index(), NewCX :: conn_index(). init_connections([Realm | Realms], MX, CX) -> {ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX), MaybeAttempt = fun(Host, {M, C}) -> case cx_maybe_add_attempt(Host, Realm, C) of not_connected -> {ok, Pid} = zx_conn:start(Host), NewM = mx_add_monitor(Pid, attempt, M), NewC = cx_add_attempt(Pid, Host, Realm, C), {NewM, NewC}; {ok, NewC} -> {M, NewC} end end, {NewMX, NewCX} = lists:foldl(MaybeAttempt, {MX, NextCX}, Hosts), init_connections(Realms, NewMX, NewCX); init_connections([], MX, CX) -> {ok, MX, CX}. %%% Shutdown -spec stop() -> ok. %% @doc %% A polite way to shut down the daemon without doing a bunch of vile things. stop() -> gen_server:cast(?MODULE, stop). %%% gen_server %% @private %% gen_server callback for OTP calls handle_call({request, list}, _, State = #s{cx = CX}) -> Realms = cx_realms(CX), {reply, {ok, Realms}, State}; handle_call({request, Requestor, Action}, From, State = #s{id = ID}) -> NewID = ID + 1, _ = gen_server:reply(From, {ok, NewID}), NextState = do_request(Requestor, Action, State#s{id = NewID}), NewState = eval_queue(NextState), {noreply, NewState}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call ~tp: ~tp", [From, Unexpected]), {noreply, State}. %% @private %% gen_server callback for OTP casts handle_cast({pass_meta, Meta, Dir, ArgV}, State) -> NewState = do_pass_meta(Meta, Dir, ArgV, State), {noreply, NewState}; handle_cast({subscribe, Pid, Package}, State) -> NextState = do_subscribe(Pid, Package, State), NewState = eval_queue(NextState), {noreply, NewState}; handle_cast({unsubscribe, Pid, Package}, State) -> NextState = do_unsubscribe(Pid, Package, State), NewState = eval_queue(NextState), {noreply, NewState}; handle_cast({report, Conn, Message}, State) -> NextState = do_report(Conn, Message, State), NewState = eval_queue(NextState), {noreply, NewState}; handle_cast({result, Ref, Result}, State) -> NextState = do_result(Ref, Result, State), NewState = eval_queue(NextState), {noreply, NewState}; handle_cast({notify, Conn, Package, Update}, State) -> ok = do_notify(Conn, Package, Update, State), {noreply, State}; handle_cast(stop, State) -> {stop, normal, State}; handle_cast(Unexpected, State) -> ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), {noreply, State}. %% @private %% gen_sever callback for general Erlang message handling handle_info({'DOWN', Ref, process, Pid, Reason}, State) -> NewState = clear_monitor(Pid, Ref, Reason, State), {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. %% @private %% gen_server callback to handle state transformations necessary for hot %% code updates. This template performs no transformation. code_change(_, State, _) -> {ok, State}. %% @private %% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean %% termination request. terminate(normal, #s{cx = CX}) -> ok = log(info, "zx_daemon shutting down..."), case cx_store_cache(CX) of ok -> log(info, "Cache written."); {error, Reason} -> Message = "Cache write failed with ~tp", log(error, Message, [Reason]) end. %%% Doer Functions -spec do_pass_meta(Meta, Home, ArgV, State) -> NewState when Meta :: zx:package_meta(), Home :: file:filename(), ArgV :: [string()], State :: state(), NewState :: state(). do_pass_meta(Meta, Home, ArgV, State) -> PackageID = maps:get(package_id, Meta), {ok, PackageString} = zx_lib:package_string(PackageID), ok = log(info, "Received meta for ~tp.", [PackageString]), State#s{meta = Meta, home = Home, argv = ArgV}. -spec do_subscribe(Pid, Package, State) -> NextState when Pid :: pid(), Package :: zx:package(), State :: state(), NextState :: state(). %% @private %% Enqueue a subscription request. do_subscribe(Pid, Package, State = #s{actions = Actions}) -> NewActions = [{subscribe, Pid, Package} | Actions], State#s{actions = NewActions}. -spec do_unsubscribe(Pid, Package, State) -> NextState when Pid :: pid(), Package :: zx:package(), State :: state(), NextState :: state(). %% @private %% Clear or dequeue a subscription request. do_unsubscribe(Pid, Package, State = #s{actions = Actions}) -> NewActions = [{unsubscribe, Pid, Package} | Actions], State#s{actions = NewActions}. -spec do_request(Requestor, Action, State) -> NextState when Requestor :: pid(), Action :: action(), State :: state(), NextState :: state(). %% @private %% Enqueue requests and update relevant index. do_request(Requestor, Action, State = #s{id = ID, actions = Actions}) -> NewActions = [{request, Requestor, ID, Action} | Actions], State#s{actions = NewActions}. -spec do_report(Conn, Message, State) -> NewState when Conn :: pid(), Message :: conn_report(), State :: state(), NewState :: state(). %% @private %% Receive a report from a connection process, and update the connection index and %% possibly retry connections. do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> NextMX = mx_upgrade_conn(Conn, MX), {NewMX, NewCX} = case cx_connected(Realms, Conn, CX) of {assigned, NextCX} -> {NextMX, NextCX}; {unassigned, NextCX} -> {ok, ScrubbedMX} = mx_del_monitor(Conn, conn, NextMX), ok = zx_conn:stop(Conn), {ScrubbedMX, NextCX} end, State#s{mx = NewMX, cx = NewCX}; do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) -> NextMX = mx_del_monitor(Conn, attempt, MX), {Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX), {NewMX, NewCX} = ensure_connection(Unassigned, NextMX, NextCX), State#s{mx = NewMX, cx = NewCX}; do_report(Conn, failed, State = #s{mx = MX, cx = CX}) -> NewMX = mx_del_monitor(Conn, attempt, MX), failed(Conn, State#s{mx = NewMX}); do_report(Conn, disconnected, State = #s{mx = MX}) -> NewMX = mx_del_monitor(Conn, conn, MX), disconnected(Conn, State#s{mx = NewMX}). failed(Conn, State#s{mx = MX, cx = CX}) -> {Realms, NextCX} = cx_failed(Conn, CX), {NewMX, NewCX} = ensure_connection(Realms, MX, NextCX), State#s{mx = NewMX, cx = NewCX}. disconnected(Conn, State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> {Pending, LostSubs, ScrubbedCX} = cx_disconnected(Conn, CX), Unassigned = cx_unassigned(ScrubbedCX), ReSubs = [{S, {subscribe, P}} || {S, P} <- LostSubs], {Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests), ReReqs = maps:to_list(Dequeued), NewActions = ReReqs ++ ReSubs ++ Actions, {NewMX, NewCX} = ensure_connection(Unassigned, ScrubbedMX, ScrubbedCX), State#s{actions = NewActions, requests = NewRequests, mx = NewMX, cx = NewCX}. -spec dequeue(Pending) -> fun((K, V, {D, R}) -> {NewD, NewR}) when Pending :: [id()], K :: id(), V :: request(), D :: #{id() := request()}, R :: #{id() := request()}, NewD :: #{id() := request()}, NewR :: #{id() := request()}. %% @private %% Return a function that partitions the current Request map into two maps, one that %% matches the closed `Pending' list of references and ones that don't. dequeue(Pending) -> fun(K, V, {D, R}) -> case lists:member(K, Pending) of true -> {maps:put(K, V, D), R}; false -> {D, maps:put(K, V, R)} end end. -spec ensure_connection(Realms, MX, CX) -> {NewMX, NewCX} when Realms :: [zx:realm()], MX :: monitor_index(), CX :: conn_index(), NewMX :: monitor_index(), NewCX :: conn_index(). %% @private %% Initiates a connection to a single realm at a time, taking into account whether %% connected but unassigned nodes are alterantive providers of the needed realm. %% Returns updated monitor and connection indices. ensure_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> {NewMX, NewCX} = case maps:get(Realm, RMetas) of #rmeta{available = []} -> {ok, NextMX, NextCX} = init_connections([Realm], MX, CX), {NextMX, NextCX}; Meta = #rmeta{available = [Conn | Conns]} -> NewMeta = Meta#rmeta{assigned = Conn, available = Conns}, NewRMetas = maps:put(Realm, NewMeta, RMetas), NextCX = CX#cx{realms = NewRMetas}, {MX, NextCX} end, ensure_connection(Realms, NewMX, NewCX); ensure_connection([], MX, CX) -> {MX, CX}. -spec do_result(ID, Result, State) -> NewState when ID :: id(), Result :: result(), State :: state(), NewState :: state(). %% @private %% Receive the result of a sent request and route it back to the original requestor. do_result(ID, Result, State = #s{requests = Requests, dropped = Dropped, mx = MX}) -> {NewDropped, NewRequests, NewMX} = case maps:take(ID, Requests) of {Request, NextRequests} -> Requestor = element(1, Request), Requestor ! {z_result, ID, Result}, NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX), {Dropped, NextRequests, NextMX}; error -> NextDropped = handle_orphan_result(ID, Result, Dropped), {NextDropped, Requests, MX} end, State#s{requests = NewRequests, dropped = NewDropped, mx = NewMX}. -spec handle_orphan_result(ID, Result, Dropped) -> NewDropped when ID :: id(), Result :: result(), Dropped :: requests(), NewDropped :: requests(). %% @private %% Log request results if they have been orphaned by their original requestor. %% Log a warning if the result is totally unknown. handle_orphan_result(ID, Result, Dropped) -> case maps:take(ID, Dropped) of {Request, NewDropped} -> Message = "Received orphan result for ~tp, ~tp: ~tp", ok = log(info, Message, [ID, Request, Result]), NewDropped; error -> Message = "Received unknown request result ~tp: ~tp", ok = log(warning, Message, [ID, Result]), Dropped end. -spec do_notify(Conn, Channel, Message, State) -> ok when Conn :: pid(), Channel :: term(), Message :: term(), State :: state(). %% @private %% Broadcast a subscription message to all subscribers of a channel. %% At the moment the only possible sub channels are packages, but this will almost %% certainly change in the future to include general realm update messages (new keys, %% packages, user announcements, etc.) and whatever else becomes relevant as the %% system evolves. The types here are deliberately a bit abstract to prevent future %% type tracing with Dialyzer, since we know the functions calling this routine and %% are already tightly typed. do_notify(Conn, Channel, Message, #s{cx = CX}) -> Subscribers = cx_get_subscribers(Conn, Channel, CX), Notify = fun(P) -> P ! {z_sub, Channel, Message} end, lists:foreach(Notify, Subscribers). -spec eval_queue(State) -> NewState when State :: state(), NewState :: state(). %% @private %% This is one of the two engines that drives everything, the other being do_report/3. %% This function must iterate as far as it can into the request queue, adding response %% entries to the pending response structure as it goes. eval_queue(State = #s{actions = Actions}) -> InOrder = lists:reverse(Actions), eval_queue(InOrder, State#s{actions = []}). eval_queue([], State) -> State; eval_queue([Action = {request, Pid, ID, Message} | Rest], State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> {NewActions, NewRequests, NewMX, NewCX} = case dispatch_request(Message, ID, CX) of {dispatched, NextCX} -> NextRequests = maps:put(ID, {Pid, Message}, Requests), NextMX = mx_add_monitor(Pid, requestor, MX), {Actions, NextRequests, NextMX, NextCX}; {result, Response} -> Pid ! Response, {Actions, Requests, CX}; wait -> NextActions = [Action | Actions], NextMX = mx_add_monitor(Pid, requestor, MX), {NextActions, Requests, NextMX, CX} end, NewState = State#s{actions = NewActions, requests = NewRequests, mx = NewMX, cx = NewCX}, eval_queue(Rest, NewState); eval_queue([Action = {subscribe, Pid, Package} | Rest], State = #s{actions = Actions, mx = MX, cx = CX}) -> {NewActions, NewMX, NewCX} = case cx_add_sub(Pid, Package, CX) of {need_sub, Conn, NextCX} -> ok = zx_conn:subscribe(Conn, Package), NextMX = mx_add_monitor(Pid, subscriber, MX), {Actions, NextMX, NextCX}; {have_sub, NextCX} NextMX = mx_add_monitor(Pid, subscriber, MX), {Actions, NextMX, NextCX}; unassigned -> NextMX = mx_add_monitor(Pid, subscriber, MX), {[Action | Actions], NextMX, CX}; unconfigured -> Pid ! {z_sub, Package, {error, bad_realm}}, {Actions, MX, CX} end, eval_queue(Rest, State#s{actions = NewActions, mx = NewMX, cx = NewCX}); eval_queue([{unsubscribe, Pid, Package} | Rest], State = #s{mx = MX, cx = CX}) -> {ok, NewMX} = mx_del_monitor(Pid, {subscription, Package}, MX), NewCX = case cx_del_sub(Pid, Package, CX) of {drop_sub, NextCX} -> ok = zx_conn:unsubscribe(Conn, Package), NextCX; {keep_sub, NextCX} -> NextCX; unassigned -> CX; unconfigured -> Message = "Received 'unsubscribe' request for unconfigured realm: ~tp", ok = log(warning, Message, [Package]), CX end, eval_queue(Rest, State#s{mx = NewMX, cx = NewCX}). -spec dispatch_request(Action, ID, CX) -> Result when Action :: action(), ID :: id(), CX :: conn_index(), Result :: {dispatched, NewCX} | {result, Response} | wait, NewCX :: conn_index(), Response :: result(). dispatch_request(list, ID, CX) -> Realms = cx_realms(CX), {result, ID, Realms}; dispatch_request(Action, ID, CX) -> Realm = element(2, Action), case cx_pre_send(Realm, ID, CX) of {ok, Conn, NewCX} -> ok = zx_conn:make_request(Conn, ID, Action), {dispatched, NewCX}; unassigned -> wait; unconfigured -> Error = {error, bad_realm}, {result, ID, Error} end. -spec clear_monitor(Pid, Ref, Reason, State) -> NewState when Pid :: pid(), Ref :: reference(), Reason :: term(), State :: state(), NewState :: state(). %% @private %% Deal with a crashed requestor, subscriber or connector. clear_monitor(Pid, Ref, Reason, State = #s{actions = Actions, requests = Requests, dropped = Dropped, mx = MX, cx = CX}) -> case mx_crashed_monitor(Pid, MX) of {attempt, NextMX} -> failed( {conn, NextMX} -> disconnected(Pid, State#s{mx = NextMX}); {{Reqs, Subs}, NextMX} -> case mx_lookup_category(Pid, MX) of attempt -> do_report(Pid, failed, State); conn -> do_report(Pid, disconnected, State); {Reqs, Subs} -> NewActions = drop_actions(Pid, Actions), {NewDropped, NewRequests} = drop_requests(Pid, Dropped, Requests), NewCX = cx_clear_client(Pid, Reqs, Subs, CX), NewMX = mx_del_monitor(Pid, crashed, MX), State#s{actions = NewActions, requests = NewRequests, dropped = NewDropped, mx = NewMX, cx = NewCX}; error -> Unexpected = {'DOWN', Ref, process, Pid, Reason}, ok = log(warning, "Unexpected info: ~tp", [Unexpected]), State end. -spec drop_actions(Requestor, Actions) -> NewActions when Requestor :: pid(), Actions :: [request()], NewActions :: [request()]. drop_actions(Pid, Actions) -> Clear = fun ({request, P, _}) -> P /= Pid; ({subscribe, P, _}) -> P /= Pid; ({unsubscribe, _, _}) -> false end, lists:filter(Clear, Actions). -spec drop_requests(ReqIDs, Dropped, Requests) -> {NewDropped, NewRequests} when ReqIDs :: [id()], Dropped :: requests(), Requests :: requests(), NewDropped :: requests(), NewRequests :: requests(). drop_requests(ReqIDs, Dropped, Requests) -> Partition = fun(K, {Drop, Keep}) -> {V, NewKeep} = maps:take(K, Keep), NewDrop = maps:put(K, V, Drop), {NewDrop, NewKeep} end, lists:fold(Partition, {Dropped, Requests}, ReqIDs). %-spec do_query_latest(Object, State) -> {Result, NewState} % when Object :: zx:package() | zx:package_id(), % State :: state(), % Result :: {ok, zx:version()} % | {error, Reason}, % Reason :: bad_realm % | bad_package % | bad_version, % NewState :: state(). %% @private %% Queries a zomp realm for the latest version of a package or package %% version (complete or incomplete version number). % %do_query_latest(Socket, {Realm, Name}) -> % ok = zx_net:send(Socket, {latest, Realm, Name}), % receive % {tcp, Socket, Bin} -> binary_to_term(Bin) % after 5000 -> {error, timeout} % end; %do_query_latest(Socket, {Realm, Name, Version}) -> % ok = zx_net:send(Socket, {latest, Realm, Name, Version}), % receive % {tcp, Socket, Bin} -> binary_to_term(Bin) % after 5000 -> {error, timeout} % end. %-spec do_fetch(PackageIDs, State) -> NewState % when PackageIDs :: [zx:package_id()], % State :: state(), % NewState :: state(), % Result :: ok % | {error, Reason}, % Reason :: bad_realm % | bad_package % | bad_version % | network. %% @private %% % %do_fetch(PackageIDs, State) -> % FIXME: Need to create a job queue divided by realm and dispatched to connectors, % and cleared from the master pending queue kept here by the daemon as the % workers succeed. Basic task queue management stuff... which never existed % in ZX before... grrr... % case scrub(PackageIDs) of % [] -> % ok; % Needed -> % Partitioned = partition_by_realm(Needed), % EnsureDeps = % fun({Realm, Packages}) -> % ok = zx_conn:queue_package(Pid, Realm, Packages), % log(info, "Disconnecting from realm: ~ts", [Realm]) % end, % lists:foreach(EnsureDeps, Partitioned) % end. % % %partition_by_realm(PackageIDs) -> % PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs), % maps:to_list(PartitionMap). % % %partition_by_realm({R, P, V}, M) -> % maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M). % % %ensure_deps(_, _, []) -> % ok; %ensure_deps(Socket, Realm, [{Name, Version} | Rest]) -> % ok = ensure_dep(Socket, {Realm, Name, Version}), % ensure_deps(Socket, Realm, Rest). % % %-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return(). %% @private %% Given an PackageID as an argument, check whether its package file exists in the %% system cache, and if not download it. Should return `ok' whenever the file is %% sourced, but exit with an error if it cannot locate or acquire the package. % %ensure_dep(Socket, PackageID) -> % ZrpFile = filename:join("zrp", namify_zrp(PackageID)), % ok = % case filelib:is_regular(ZrpFile) of % true -> ok; % false -> fetch(Socket, PackageID) % end, % ok = install(PackageID), % build(PackageID). % % %-spec scrub(Deps) -> Scrubbed % when Deps :: [package_id()], % Scrubbed :: [package_id()]. %% @private %% Take a list of dependencies and return a list of dependencies that are not yet %% installed on the system. % %scrub([]) -> % []; %scrub(Deps) -> % lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps). %%% Monitor Index ADT Interface Functions %%% %%% Very simple structure, but explicit handling of it becomes bothersome in other %%% code, so it is all just packed down here. -spec mx_new() -> monitor_index(). %% @private %% Returns a new, empty monitor index. mx_new() -> maps:new(). -spec mx_add_monitor(Pid, Category, MX) -> NewMX when Pid :: pid(), Category :: subscriber | requestor | attempt, MX :: monitor_index(), NewMX :: monitor_index(). %% @private %% Begin monitoring the given Pid, keeping track of its category. mx_add_monitor(Pid, subscriber, MX) -> case maps:take(Pid, MX) of {{Ref, {Subs, Reqs}}, NextMX} -> maps:put(Pid, {Ref, {Subs + 1, Reqs}}, NextMX); error -> Ref = monitor(process, Pid), maps:put(Pid, {Ref, {1, 0}}, MX) end; mx_add_monitor(Pid, requestor, MX) -> case maps:take(Pid, MX) of {{Ref, {Subs, Reqs}}, NextMX} -> maps:put(Pid, {Ref, {Subs, Reqs + 1}}, NextMX); error -> Ref = monitor(process, Pid), maps:put(Pid, {Ref, {0, 1}}, MX) end; mx_add_monitor(Pid, attempt, MX) -> false = maps:is_key(Pid, MX), Ref = monitor(process, Pid), maps:put(Pid, {Ref, attempt}, MX). -spec mx_upgrade_conn(Pid, MX) -> NewMX when Pid :: pid(), MX :: monitor_index(), NewMX :: monitor_index(). %% @private %% Upgrade an `attempt' monitor to a `conn' monitor. mx_upgrade_conn(Pid, MX) -> {{Ref, attempt}, NextMX} = maps:take(Pid, MX), maps:put(Pid, {Ref, conn}, NextMX). -spec mx_del_monitor(Conn, Category, MX) -> NewMX when Conn :: pid(), Category :: attempt | conn | {requestor, id()}, | {subscriber, Sub :: tuple()}, MX :: monitor_index(), NewMX :: monitor_index(). %% @private %% Drop a monitor category, removing the entire monitor in the case only one category %% exists. Returns a tuple including the remaining request references in the case of %% a conn type. mx_del_monitor(Pid, attempt, MX) -> {{Ref, attempt}, NewMX} = maps:take(Pid, MX), true = demonitor(Ref, [flush]), NewMX; mx_del_monitor(Pid, conn, MX) -> {{Ref, conn}, NewMX} = maps:take(Pid, MX), true = demonitor(Ref, [flush]), NewMX; mx_del_monitor(Pid, {requestor, ID}, MX) -> case maps:take(Pid, MX) of {{Ref, {[ID], []}}, NextMX} -> true = demonitor(Ref, [flush]), NextMX; {{Ref, {Reqs, Subs}}, NextMX} when Reqs > 0 -> NewReqs = lists:subtract(ID, Reqs), maps:put(Pid, {NewReqs, Subs}, NextMX) end, mx_del_monitor(Pid, {subscriber, Sub}, MX) -> case maps:take(Pid, MX) of {{Ref, {[], [Package]}}, NextMX} -> true = demonitor(Ref, [flush]), NextMX; {{Ref, {Reqs, Subs}}, NextMX} when Subs > 0 -> NewSubs = lists:delete(Sub, Subs), maps:put(Pid, {Ref, {Reqs, NewSubs}}, NextMX) end. -spec mx_crashed_monitor(Pid, MX) -> Result when Pid :: pid(), MX :: monitor_index(), Result :: {Type, NewMX} | error, Type :: attempt | conn | {Reqs :: [id()], Subs :: [tuple()]}, NewMX :: monitor_index(). mx_crashed_monitor(Pid, MX) -> case maps:take(Pid, MX) of {{Ref, Type}, NewMX} -> true = demonitor(Mon, [flush]), {Type, NewMX}; error -> error end. -spec mx_lookup_category(Pid, MX) -> Result when Pid :: pid(), MX :: monitor_index(), Result :: attempt | conn | {Reqs :: [reference()], Subs :: [zx:package()]} | error. %% @private %% Lookup a monitor's categories. mx_lookup_category(Pid, MX) -> case maps:find(Pid, MX) of {ok, Mon} -> element(2, Mon); error -> error end. %%% Connection Index ADT Interface Functions %%% %%% Functions to manipulate the conn_index() data type are in this section. This %%% data should be treated as abstract by functions outside of this section, as it is %%% a deep structure and future requirements are likely to wind up causing it to %%% change a little. For the same reason, these functions are all pure, independently %%% testable, and have no side effects. %%% %%% Return values often carry some status information with them. -spec cx_load() -> conn_index(). %% @private %% Used to load a connection index populated with necessary realm configuration data %% and cached mirror data, if such things can be found in the system, otherwise return %% a blank connection index structure. cx_load() -> case cx_populate() of {ok, Realms} -> #cx{realms = maps:from_list(Realms)}; {error, Reason} -> Message = "Realm data and host cache load failed with : ~tp", ok = log(error, Message, [Reason]), ok = log(warning, "No realms configured."), #cx{} end. -spec cx_populate() -> Result when Result :: {ok, conn_index()} | {error, Reason}, Reason :: no_realms | file:posix(). %% @private %% This procedure, relying zx_lib:zomp_home() allows the system to load zomp data %% from any arbitrary home for zomp. This has been included mostly to make testing of %% Zomp and ZX easier, but incidentally opens the possibility that an arbitrary Zomp %% home could be selected by an installer (especially on consumer systems like Windows %% where any number of wild things might be going on in the user's filesystem). cx_populate() -> Home = zx_lib:zomp_home(), Pattern = filename:join(Home, "*.realm"), case filelib:wildcard(Pattern) of [] -> {error, no_realms}; RealmFiles -> {ok, cx_populate(RealmFiles, [])} end. -spec cx_populate(RealmFiles, Realms) -> NewRealms when RealmFiles :: file:filename(), Realms :: [{zx:realm(), realm_meta()}], NewRealms :: [{zx:realm(), realm_meta()}]. %% @private %% Pack an initially empty conn_index() with realm meta and host cache data. %% Should not halt on a corrupted, missing, malformed, etc. realm file but will log %% any loading errors. cx_populate([File | Files], Realms) -> NewRealms = case file:consult(File) of {ok, Meta} -> Realm = cx_load_realm_meta(Meta), [Realm | Realms]; {error, Reason} -> Message = "Loading realm file ~tp failed with: ~tp. Skipping...", ok = log(warning, Message, [File, Reason]), Realms end, cx_populate(Files, NewRealms); cx_populate([], Realms) -> Realms. -spec cx_load_realm_meta(Meta) -> Result when Meta :: [{Key :: atom(), Value :: term()}], Result :: {zx:realm(), realm_meta()}. %% @private %% This function MUST adhere to the realmfile definition found at. cx_load_realm_meta(Meta) -> {realm, Realm} = lists:keyfind(realm, 1, Meta), {revision, Revision} = lists:keyfind(revision, 1, Meta), {prime, Prime} = lists:keyfind(prime, 1, Meta), {realm_keys, RealmKeys} = lists:keyfind(realm_keys, 1, Meta), {package_keys, PackageKeys} = lists:keyfind(packge_keys, 1, Meta), {sysops, Sysops} = lists:keyfind(sysops, 1, Meta), Basic = #rmeta{revision = Revision, prime = Prime, realm_keys = RealmKeys, package_keys = PackageKeys, sysops = Sysops}, Complete = cx_load_cache(Realm, Basic), {Realm, Complete}. -spec cx_load_cache(Realm, Basic) -> Complete when Realm :: zx:realm(), Basic :: realm_meta(), Complete :: realm_meta(). %% @private %% Receive a realm_meta() that lacks any cache data and load the realm's cache file %% if it exists, and return it fully populated. %% FIXME: If several instances of zomp or zx are running at once it may be possible %% to run into file access contention or receive an incomplete read on some %% systems. At the moment this is only a remote possibility and for now should %% be handled by simply re-running whatever command caused the failure. %% Better file contention and parallel-executing system handling should %% eventually be implemented, especially if zx becomes common for end-user %% GUI programs. %% NOTE: This "fixme" will only apply until the zx universal lock is implemented. cx_load_cache(Realm, Basic) -> CacheFile = cx_cache_file(Realm), case file:consult(CacheFile) of {ok, Cache} -> {serial, Serial} = lists:keyfind(serial, 1, Cache), {private, Private} = lists:keyfind(private, 1, Cache), {mirrors, Mirrors} = lists:keyfind(mirrors, 1, Cache), PQueue = queue:from_list(Private), Enqueue = fun(H, Q) -> queue:in(H, Q) end, MQueue = lists:foldl(Enqueue, Mirrors, PQueue), Basic#rmeta{serial = Serial, private = Private, mirrors = MQueue}; {error, enoent} -> Basic end. -spec cx_store_cache(CX) -> Result when CX :: conn_index(), Result :: ok | {error, file:posix()}. cx_store_cache(#cx{realms = Realms}) -> lists:foreach(fun cx_write_cache/1, maps:to_list(Realms)). -spec cx_write_cache({zx:realm(), realm_meta()}) -> ok. %% @private %% FIXME: The same concerns as noted in the cx_load_cache/2 FIXME comment apply here. %% NOTE: This "fixme" will only apply until the zx universal lock is implemented. cx_write_cache({Realm, #rmeta{serial = Serial, private = Private, mirrors = Mirrors}}) -> CacheFile = cx_cache_file(Realm), MList = queue:to_list(Mirrors), ActualMirrors = lists:subtract(MList, Private), CacheMeta = [{serial, Serial}, {mirrors, ActualMirrors}], ok = zx_lib:write_terms(CacheFile, CacheMeta), log(info, "Wrote cache for realm ~ts", [Realm]). -spec cx_cache_file(zx:realm()) -> file:filename(). cx_cache_file(Realm) -> filename:join(zx_lib:zomp_home(), Realm ++ ".cache"). -spec cx_realms(conn_index()) -> [zx:realms()]. cx_realms(#cx{realms = Realms}) -> maps:keys(Realms). -spec cx_next_host(Realm, CX) -> Result when Realm :: zx:realm(), CX :: conn_index(), Result :: {ok, Next, NewCX} | {prime, Prime, NewCX} | {error, Reason, NewCX}, Next :: zx:host(), Prime :: zx:host(), NewCX :: conn_index(), Reason :: bad_realm | connected. %% @private %% Given a realm, retun the next cached host location to which to connect. Returns %% error if the realm is already assigned, if it is available but should have been %% assigned, or if the realm is not configured. %% If all cached mirrors are exhausted it will return the realm's prime host and %% reload the mirrors queue with private mirrors. cx_next_host(Realm, CX = #cx{realms = Realms}) -> case maps:find(Realm, Realms) of {ok, Meta = #rmeta{assigned = none, available = [Pid | Pids]}} -> ok = log(warning, "Call to cx_next_host/2 when connection available."), NewMeta = Meta#rmeta{assigned = Pid, available = Pids}, NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {error, connected, NewCX}; {ok, Meta = #rmeta{assigned = none, available = []}} -> {Outcome, Host, NewMeta} = cx_next_host(Meta), NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {Outcome, Host, NewCX}; {ok, #rmeta{assigned = Conn}} when is_pid(Conn) -> ok = log(warning, "Call to cx_next_host/2 when connection assigned."), {error, connected, CX}; error -> {error, bad_realm, CX} end. -spec cx_next_host(Meta) -> Result when Meta :: realm_meta(), Result :: {ok, Next, NewMeta} | {prime, Prime, NewMeta}, Next :: zx:host(), Prime :: zx:host(), NewMeta :: realm_meta(). cx_next_host(Meta = #rmeta{prime = Prime, private = Private, mirrors = Mirrors}) -> case queue:out(Mirrors) of {{value, Next}, NewMirrors} -> {ok, Next, Meta#rmeta{mirrors = NewMirrors}}; {empty, Mirrors} -> Enqueue = fun(H, Q) -> queue:in(H, Q) end, NewMirrors = lists:foldl(Enqueue, Private, Mirrors), {prime, Prime, Meta#rmeta{mirrors = NewMirrors}} end. -spec cx_next_hosts(N, Realm, CX) -> Result when N :: non_neg_integer(), Realm :: zx:realm(), CX :: conn_index(), Result :: {ok, Hosts, NewCX} | {error, Reason}, Hosts :: [zx:host()], NewCX :: conn_index(), Reason :: {connected, Conn :: pid()} | bad_realm. %% @private %% This function allows recruiting an arbitrary number of hosts from the host cache %% of a given realm, taking private mirrors first, then public mirrors, and ending %% with the prime node for the realm if no others exist. cx_next_hosts(N, Realm, CX) -> cx_next_hosts(N, Realm, [], CX). cx_next_hosts(N, Realm, Hosts, CX) when N > 0 -> case cx_next_host(Realm, CX) of {ok, Host, NewCX} -> cx_next_hosts(N - 1, Realm, [Host | Hosts], NewCX); {prime, Host, NewCX} -> {ok, [Host | Hosts], NewCX}; Error -> Error end; cx_next_hosts(0, _, Hosts, CX) -> {ok, Hosts, CX}. -spec cx_maybe_add_attempt(Host, Realm, CX) -> Result when Host :: zx:host(), Realm :: zx:realm(), CX :: conn_index(), Result :: not_connected | {ok, NewCX}, NewCX :: conn_index(). cx_maybe_add_attempt(Host, Realm, CX = #cx{attempts = Attempts}) -> case lists:keytake(Host, 2, Attempts) of false -> not_connected; {value, {Pid, Host, Realms}, NextAttempts} -> NewAttempts = [{Pid, Host, [Realm | Realms]} | NextAttempts], NewCX = CX#cx{attempts = NewAttempts}, {ok, NewCX} end. -spec cx_add_attempt(Pid, Host, Realm, CX) -> NewCX when Pid :: pid(), Host :: zx:host(), Realm :: zx:realm(), CX :: conn_index(), NewCX :: conn_index(). cx_add_attempt(Pid, Host, Realm, CX = #cx{attempts = Attempts}) -> CX#cx{attempts = [{Pid, Host, [Realm]} | Attempts]}. -spec cx_connected(Available, Pid, CX) -> Result when Available :: [{zx:realm(), zx:serial()}], Pid :: pid(), CX :: conn_index(), Result :: {Assignment, NewCX}, Assignment :: assigned | unassigned, NewCX :: conn_index(). %% @private %% An abstract data handler which is called whenever a new connection is successfully %% established by a zx_conn. Any unconnected realms with a valid serial will be %% assigned to the new connection; if none are needed then the connection is closed. %% The successful host is placed back in the hosts queue for each available realm. %% The return value is a tuple that indicates whether the new connection was assigned %% or not and the updated CX data value. cx_connected(Available, Pid, CX = #cx{attempts = Attempts}) -> {value, Attempt, NewAttempts} = lists:keytake(Pid, 1, Attempts), Host = element(2, Attempt), Realms = [element(1, R) || R <- Available], NewCX = CX#cx{attempts = NewAttempts}, Conn = #conn{pid = Pid, host = Host, realms = Realms}, cx_connected(unassigned, Available, Conn, NewCX). -spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX} when A :: unassigned | assigned, Available :: [{zx:realm(), zx:serial()}], Conn :: {pid(), zx:host(), [zx:realm()]}, CX :: conn_index(), NewA :: unassigned | assigned, NewCX :: conn_index(). %% @private %% Only accept a new realm as available if the reported serial is equal or newer to the %% highest known serial. cx_connected(A, [{Realm, Serial} | Rest], Conn = #conn{pid = Pid}, CX = #cx{realms = Realms}) -> case maps:find(Realm, Realms) of {ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial -> NewMeta = Meta#rmeta{serial = Serial, available = [Pid | Available]}, {NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX), cx_connected(NewA, Rest, Conn, NewCX); {ok, #rmeta{serial = S}} when S > Serial -> cx_connected(A, Rest, Conn, CX); error -> cx_connected(A, Rest, Conn, CX) end; cx_connected(A, [], Conn, CX = #cx{conns = Conns}) -> {A, CX#cx{conns = [Conn | Conns]}}. -spec cx_connected(A, Realm, Conn, Meta, CX) -> {NewA, NewCX} when A :: unassigned | assigned, Realm :: zx:host(), Conn :: {pid(), zx:host(), [zx:realm()]}, Meta :: realm_meta(), CX :: conn_index(), NewA :: unassigned | assigned, NewCX :: conn_index(). %% @private %% This function matches on two elements: %% - Whether or not the realm is already assigned (and if so, set `A' to `assigned' %% - Whether the host node is the prime node (and if not, ensure it is a member of %% of the mirror queue). cx_connected(_, Realm, #conn{pid = Pid, host = Prime}, Meta = #rmeta{prime = Prime, assigned = none}, CX = #cx{realms = Realms}) -> NewMeta = Meta#rmeta{assigned = Pid}, NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {assigned, NewCX}; cx_connected(_, Realm, #conn{pid = Pid, host = Host}, Meta = #rmeta{mirrors = Mirrors, assigned = none}, CX = #cx{realms = Realms}) -> NewMirrors = cx_enqueue_unique(Host, Mirrors), NewMeta = Meta#rmeta{mirrors = NewMirrors, assigned = Pid}, NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {assigned, NewCX}; cx_connected(A, _, #conn{host = Prime}, #rmeta{prime = Prime}, CX) -> {A, CX}; cx_connected(A, Realm, #conn{host = Host}, Meta = #rmeta{mirrors = Mirrors}, CX = #cx{realms = Realms}) -> NewMirrors = cx_enqueue_unique(Host, Mirrors), NewMeta = Meta#rmeta{mirrors = NewMirrors}, NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {A, NewCX}. -spec cx_enqueue_unique(term(), queue:queue()) -> queue:queue(). %% @private %% Simple function to ensure that only unique elements are added to a queue. Obviously %% this operation is extremely general and O(n) in complexity due to the use of %% queue:member/2. cx_enqueue_unique(Element, Queue) -> case queue:member(Element, Queue) of true -> Queue; false -> queue:in(Element, Queue) end. -spec cx_failed(Conn, CX) -> {Realms, NewCX} when Conn :: pid(), CX :: conn_index(), Realms :: [zx:realm()], NewCX :: conn_index(). %% @private %% Remove a failed attempt and all its associations. cx_failed(Conn, CX = #cx{attempts = Attempts}) -> {value, Attempt, NewAttempts} = lists:keytake(Conn, 1, Attempts), Realms = element(3, Attempt), {Realms, CX#cx{attempts = NewAttempts}}. -spec cx_redirect(Conn, Hosts, CX) -> {Unassigned, NewCX} when Conn :: pid(), Hosts :: [{zx:host(), [zx:realm()]}], CX :: conn_index(), Unassigned :: [zx:realm()], NewCX :: conn_index(). cx_redirect(Conn, Hosts, CX = #cx{attempts = Attempts}) -> NewAttempts = lists:keydelete(Conn, 1, Attempts), NextCX = CX#cx{attempts = NewAttempts}, NewCX = cx_redirect(Hosts, NextCX), Unassigned = cx_unassigned(NewCX), {Unassigned, NewCX}. -spec cx_redirect(Hosts, CX) -> NewCX when Hosts :: [{zx:host(), [zx:realm()]}], CX :: conn_index(), NewCX :: conn_index(). %% @private %% Add the host to any realm mirror queues that it provides. cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) -> Apply = fun(R, Rs) -> case maps:find(R, Rs) of {ok, Meta = #rmeta{mirrors = Mirrors}} -> NewMirrors = cx_enqueue_unique(Host, Mirrors), NewMeta = Meta#rmeta{mirrors = NewMirrors}, maps:put(R, NewMeta, Rs); error -> R end end, NewRealms = lists:foldl(Apply, Realms, Provided), cx_redirect(Rest, CX#cx{realms = NewRealms}); cx_redirect([], CX) -> CX. -spec cx_unassigned(CX) -> Unassigned when CX :: conn_index(), Unassigned :: [zx:realm()]. %% @private %% Scan the CX record for unassigned realms;return a list of all unassigned %% realm names. cx_unassigned(#cx{realms = Realms}) -> NotAssigned = fun(Realm, #rmeta{assigned = Conn}, Unassigned) -> case Conn == none of true -> [Realm | Unassigned]; false -> Unassigned end end, maps:fold(NotAssigned, [], Realms). -spec cx_disconnected(Conn, CX) -> {Requests, Subs, NewCX} when Conn :: pid(), CX :: conn_index(), Requests :: [reference()], Subs :: [zx:package()], NewCX :: conn_index(). %% @private %% An abstract data handler which is called whenever a connection terminates. %% This function removes all data related to the disconnected pid and its assigned %% realms, and returns the monitor reference of the pid, a list of realms that are now %% unassigned, and an updated connection index. cx_disconnected(Pid, CX = #cx{realms = Realms, conns = Conns}) -> {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), #conn{host = Host, requests = Requests, subs = Subs} = Conn, NewRealms = cx_scrub_assigned(Pid, Host, Realms), NewCX = CX#cx{realms = NewRealms, conns = NewConns}, {Requests, Subs, NewCX}. -spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms when Pid :: pid(), Host :: zx:host(), Realms :: [realm_meta()], NewRealms :: [realm_meta()]. %% @private %% This could have been performed as a set of two list operations (a partition and a %% map), but to make the procedure perfectly clear it is written out explicitly. cx_scrub_assigned(Pid, Host, Realms) -> Scrub = fun (_, V = #rmeta{mirrors = M, available = A, assigned = C}) when C == Pid -> V#rmeta{mirrors = cx_enqueue_unique(Host, M), available = lists:delete(Pid, A), assigned = none}; (_, V = #rmeta{mirrors = M, available = A}) -> V#rmeta{mirrors = cx_enqueue_unique(Host, M), available = lists:delete(Pid, A)} end, maps:map(Scrub, Realms). -spec cx_resolve(Realm, CX) -> Result when Realm :: zx:realm(), CX :: conn_index(), Result :: {ok, Conn :: pid()} | unassigned | unconfigured. %% @private %% Check the registry of assigned realms and return the pid of the appropriate %% connection, or an `unassigned' indication if the realm is not yet connected. cx_resolve(Realm, #cx{realms = Realms}) -> case maps:find(Realm, Realms) of {ok, #rmeta{assigned = none}} -> unassigned; {ok, #rmeta{assigned = Conn}} -> {ok, Conn}; error -> unconfigured end. -spec cx_pre_send(Realm, ID, CX) -> Result when Realm :: zx:realm(), ID :: id(), CX :: conn_index(), Result :: {ok, pid(), NewCX :: conn_index()} | unassigned | unconfigured. %% @private %% Prepare a request to be sent by queueing it in the connection active request %% reference list and returning the Pid of the connection handling the required realm %% if it is available, otherwise return an atom indicating the status of the realm.. cx_pre_send(Realm, ID, CX = #cx{conns = Conns}) -> case cx_resolve(Realm, CX) of {ok, Pid} -> {value, Conn, NextConns} = lists:keytake(Pid, #conn.pid, Conns), #conn{requests = Requests} = Conn, NewRequests = [ID | Requests], NewConn = Conn#conn{requests = NewRequests}, NewCX = CX#cx{conns = [NewConn | NextConns]}, {ok, Pid, NewCX}; NoGo -> NoGo end. -spec cx_add_sub(Subscriber, Channel, CX) -> Result when Subscriber :: pid(), Channel :: tuple(), CX :: conn_index(), Result :: {need_sub, Conn, NewCX} | {have_sub, NewCX} | unassigned | unconfigured, Conn :: pid(), NewCX :: conn_index(). %% @private %% Adds a subscription to the current list of subs, and returns a value indicating %% whether the connection needs to be told to subscribe or not based on whether it %% is already subscribed to that particular channel. cx_add_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> Realm = element(1, Channel), case cx_resolve(Realm, CX) of {ok, Pid} -> {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), cx_maybe_new_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}); Other -> Other end. -spec cx_maybe_new_sub(Conn, Sub, CX) -> Result when Conn :: connection(), Sub :: {pid(), tuple()}, CX :: conn_index(), Result :: {need_sub, ConnPid, NewCX} | {have_sub, NewCX}, ConnPid :: pid(), NewCX :: conn_index(). cx_maybe_new_sub(Conn = #conn{pid = ConnPid, subs = Subs}, Sub = {Subscriber, Channel}, CX = #cx{conns = Conns}) -> NewSubs = [{Subscriber, Channel} | Subs], NewConn = Conn#conn{subs = NewSubs}, NewConns = [NewConn | NextConns], NewCX = CX#cx{conns = NewConns}, case lists:keymember(Channel, 2, Subs) of false -> {need_sub, ConnPid, NewCX}; true -> {have_sub, NewCX} end. -spec cx_del_sub(Subscriber, Channel, CX) -> Result when Subscriber :: pid(), Channel :: tuple(), CX :: conn_index(), Result :: {drop_sub, NewCX} | {keep_sub, NewCX} | unassigned | unconfigured, NewCX :: conn_index(). %% @private %% Remove a subscription from the list of subs, and return a value indicating whether %% the connection needs to be told to unsubscribe entirely. cx_del_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> Realm = element(1, Channel), case cx_resolve(Realm, CX) of {ok, Pid} -> {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}; Other -> Other end. cx_maybe_last_sub(Conn = #conn{pid = ConnPid, subs = Subs}, Sub = {Subscriber, Channel}, CX = #cx{conns = Conns}) -> NewSubs = lists:delete(Sub, Subs), NewConn = Conn#conn{subs = NewSubs}, NewConns = [NewConn | NextConns], NewCX = CX#cx{conns = NewConns}, MaybeDrop = case lists:keymember(Channel, 2, NewSubs) of false -> drop_sub; true -> keep_sub end, {MaybeDrop, NewCX}. -spec cx_get_subscribers(Conn, Channel, CX) -> Subscribers when Conn :: pid(), Channel :: term(), CX :: conn_index(), Subscribers :: [pid()]. cx_get_subscribers(Conn, Channel, #cx{conns = Conns}) -> #conn{subs = Subs} = lists:keyfind(Conn, #conn.pid, Conns), lists:fold(registered_to(Channel), [], Subs). registered_to(Channel) -> fun({P, C}, A) -> case C == Channel of true -> [P | A]; false -> A end end. cx_clear_client(Pid, DeadReqs, DeadSubs, CX = #cx{conns = Conns}) -> DropSubs = [{S, Pid} || S <- DeadSubs], Clear = fun(C = #conn{requests = Requests, subs = Subs}) -> NewSubs = lists:subtract(Subs, DropSubs), NewRequests = lists:subtract(Requests, DeadReqs), C#conn{requests = NewRequests, subs = NewSubs} end, NewConns = lists:map(Clear, Conns), CX#cx{conns = NewConns}.