From 62385cd088cf7ffeb31021f95327c5ab095678a0 Mon Sep 17 00:00:00 2001 From: Craig Everett Date: Mon, 5 Mar 2018 00:24:09 +0900 Subject: [PATCH] blah --- TODO | 17 + zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl | 734 ++++++++++++++--------- 2 files changed, 469 insertions(+), 282 deletions(-) create mode 100644 TODO diff --git a/TODO b/TODO new file mode 100644 index 0000000..52ccf5f --- /dev/null +++ b/TODO @@ -0,0 +1,17 @@ + - zomp nodes must report the realms they provide to upstream nodes to which they connect. + On redirect a list of hosts of the form [{zx:host(), [zx:realm()]}] must be provided to the downstream client. + This is the only way that downstream clients can determine which redirect hosts are useful to it. + + - The ZX daemon should be able to retry requests that were submitted but did not receive a response before the relevant + connection was terminated for whatever reason. + The most obvious way to do this would be to keep a set or queue of references in each connection monitor's section, + clearing them when they receive responses, and pushing them back into the action queue (from the responses reference map) + when they fail. + + - The same issue as the one above, but with subscriptions. Currently there is no obvious way to track what subscriptions flow + through which connections, and on termination or change of a connection there is no way to ensure that the subscription request + finds its way back into the action queue to resubmission once a realm becomes available again. + + - The request tracking ref list is currently passing through the MX record. That is exactly the wrong place to put it. + It should DEFINITELY be in the CX record. + MOVE IT. diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index f2f4704..352839b 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -3,7 +3,7 @@ %%% %%% Resident execution daemon and runtime interface to Zomp. %%% -%%% The daemon lives in the background once started and awaits query requests and +%%% The daemon resides in the background once started and awaits query requests and %%% subscriptions from from other processes. The daemon is only capable of handling %%% unprivileged (user) actions. %%% @@ -18,11 +18,10 @@ %%% as mx_*/N. %%% %%% Node connections (cx_conn processes) must also be tracked for status and realm -%%% availability. This is done using a type called the conn_index(), shortened to -%%% "cx" throughout the modue. conn_index() is treated as an abstract, opaque data -%%% type across the module in the same way as the monitor_index() mentioned above, -%%% and is handled via a set of cx_*/N functions defined at the end of the module -%%% after the mx_*/N section. +%%% availability. This is done using a type called conn_index(), shortened to "cx" +%%% throughout the module. conn_index() is treated as an abstract, opaque datatype +%%% throughout the module, and is handled via a set of cx_*/N functions (after the +%%% mx_*/N section). %%% %%% Do NOT directly access data within these structures, use (or write) an accessor %%% function that does what you want. @@ -41,7 +40,7 @@ %%% determine what realms must be available and what cached Zomp nodes it is aware of. %%% It populates the CX (conn_index(), mentioned above) with realm config and host %%% cache data, and then immediately initiates three connection attempts to cached -%%% nodes for each realm configured (see init_connections/0). +%%% nodes for each realm configured (if possible; see init_connections/0). %%% %%% Once connection attempts have been initiated the daemon waits in receive for %%% either a connection report (success or failure) or an action request from @@ -88,7 +87,9 @@ %%% %%% A bit of state handling is required (queueing requests and storing the current %%% action state), but this permits the system above the daemon to interact with it in -%%% a blocking way, but zx_daemon and zx_conn to work asynchronously with one another. +%%% a blocking way, establishing its own receive timeouts or implementing either an +%%% asynchronous or synchronous interface library atop zx_daemon interface function, +%%% but leaving zx_daemon and zx_conn alone to work asynchronously with one another. %%% @end -module(zx_daemon). @@ -103,7 +104,7 @@ list/1, latest/1, fetch/1, key/1, pending/1, packagers/1, maintainers/1, sysops/1]). --export([report/1, result/2, notice/2]). +-export([report/1, result/2, notify/2]). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). @@ -115,20 +116,20 @@ %%% Type Definitions -record(s, - {meta = none :: none | zx:package_meta(), - home = none :: none | file:filename(), - argv = none :: none | [string()], - actions = [] :: [request() | {reference(), request()}], - responses = maps:new() :: #{reference() := request()}, - subs = [] :: [{pid(), zx:package()}], - mx = mx_new() :: monitor_index(), - cx = cx_load() :: conn_index()}). + {meta = none :: none | zx:package_meta(), + home = none :: none | file:filename(), + argv = none :: none | [string()], + actions = [] :: [request() | {reference(), request()}], + requests = maps:new() :: #{reference() := request()}, + subs = [] :: [{pid(), zx:package()}], + mx = mx_new() :: monitor_index(), + cx = cx_load() :: conn_index()}). -record(cx, {realms = #{} :: #{zx:realm() := realm_meta()}, attempts = [] :: [{pid(), zx:host(), [zx:realm()]}], - conns = [] :: [{pid(), zx:host(), [zx:realm()]}]}). + conns = [] :: [connection()]}). -record(rmeta, @@ -143,11 +144,21 @@ assigned = none :: none | pid(), available = [] :: [pid()]}). + +-record(conn, + {pid :: pid(), + host :: zx:host(), + realms :: [zx:realm()], + requests :: [reference()], + subs :: [zx:package()]}). + + %% State Types -type state() :: #s{}. --type monitor_index() :: [{reference(), pid(), category()}], +-type monitor_index() :: [{reference(), pid(), category()}]. -type conn_index() :: #cx{}. -type realm_meta() :: #rmeta{}. +-type connection() :: #conn{}. -type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()} | attempt | conn. @@ -180,7 +191,7 @@ % % Subscription messages are a separate type below. --type result() :: sub_result() +-type result() :: sub_message() | list_result() | latest_result() | fetch_result() @@ -195,7 +206,7 @@ Message :: {ok, [zx:name()]} | {error, bad_realm | timeout}} | {list, zx:package(), - Message :: {ok, [zx:version]} + Message :: {ok, [zx:version()]} | {error, bad_realm | bad_package | timeout}}. @@ -236,7 +247,7 @@ -type sysop_result() :: {sysops, zx:realm(), Message :: {ok, [zx:user()]} | {error, bad_host - | timeout}. + | timeout}}. % Subscription Results @@ -262,15 +273,23 @@ %% be known whether the very first thing the target application will do is send this %% process an async message. That implies that this should only ever be called once, %% by the launching process (which normally terminates shortly thereafter). +%% +%% FIXME: I don't like something about this idea. Either it is wrong to be passing in +%% the primary project's meta, or it is wrong to be doing it this way, or it is +%% wrong to have only *one* primary app in the EVM instance. Or something. +%% Something smells off about this. +%% We WILL need to maintain and know meta. +%% We DON'T know when or why it will be important to running programs. +%% Once we understand this better we need to come back and fix or replace it. +%% (2018-03-01 -CRE) pass_meta(Meta, Dir, ArgV) -> gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). --spec subscribe(Package) -> ok - when Package :: zx:package(). +-spec subscribe(zx:package()) -> ok. %% @doc -%% Subscribe to update notification for a for a particular package. +%% Subscribe to update notifications for a for a particular package. %% The caller will receive update notifications of type sub_result() as Erlang %% messages whenever an update occurs. @@ -297,7 +316,7 @@ list(Identifier) -> request({list, Identifier}). --spec latest(Identifier) -> ok. +-spec latest(Identifier) -> ok when Identifier :: zx:package() | zx:package_id(). %% @doc %% Request the lastest version of a package within the scope of the identifier. @@ -370,7 +389,7 @@ sysops(Realm) -> %% Private function to wrap the necessary bits up. request(Action) -> - gen_server:cast(?MODULE, {request, make_ref(), self(), Action}), + gen_server:cast(?MODULE, {request, make_ref(), self(), Action}). @@ -394,10 +413,10 @@ report(Message) -> %% Return a tagged result back to the daemon to be forwarded to the original requestor. result(Reference, Result) -> - gen_server:cast(?MODULE, {result, Reference, Result}). + gen_server:cast(?MODULE, {result, self(), Reference, Result}). --spec notice(Package, Message) -> ok +-spec notify(Package, Message) -> ok when Package :: zx:package(), Message :: term(). %% @private @@ -426,6 +445,16 @@ init(none) -> {ok, State}. +-spec init_connections() -> {ok, MX, CX} + when MX :: monitor_index(), + CX :: conn_index(). +%% @private +%% Starting from a stateless condition, recruit and resolve all realm relevant data, +%% populate host caches, and initiate connections to required realms. On completion +%% return a populated MX and CX to the caller. Should only ever be called by init/1. +%% Returns an `ok' tuple to disambiguate it from pure functions *and* to leave an +%% obvious place to populate error returns in the future if desired. + init_connections() -> CX = cx_load(), MX = mx_new(), @@ -433,22 +462,28 @@ init_connections() -> init_connections(Realms, MX, CX). -init_connections([Realm | Realms], MX, CX = #cx{attempts = Attempts}) -> +-spec init_connections(Realms, MX, CX) -> {ok, NewMX, NewCX} + when Realms :: [zx:realm()], + MX :: monitor_index(), + CX :: conn_index(), + NewMX :: monitor_index(), + NewCX :: conn_index(). + +init_connections([Realm | Realms], MX, CX) -> {ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX), - StartConn = - fun(Host, A) -> - case lists:keymember(Host, 2, Attempts) of - false -> + MaybeAttempt = + fun(Host, {M, C}) -> + case cx_maybe_add_attempt(Host, Realm, C) of + not_connected -> {ok, Pid} = zx_conn:start(Host), - [{Pid, Host} | A]; - true -> - A + NewM = mx_add_monitor(Pid, attempt, M), + NewC = cx_add_attempt(Pid, Host, Realm, C), + {NewM, NewC}; + {ok, NewC} -> + {M, NewC} end end, - NewAttempts = lists:foldl(StartConn, [], Hosts), - AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, attempt, M) end, - NewMX = lists:foldl(AddMonitor, MX, NewAttempts), - NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts), + {NewMX, NewCX} = lists:foldl(MaybeAttempt, {MX, NextCX}, Hosts), init_connections(Realms, NewMX, NewCX); init_connections([], MX, CX) -> {ok, MX, CX}. @@ -468,32 +503,32 @@ handle_call(Unexpected, From, State) -> %% @private %% gen_server callback for OTP casts +handle_cast({pass_meta, Meta, Dir, ArgV}, State) -> + NewState = do_pass_meta(Meta, Dir, ArgV, State), + {noreply, NewState}; +handle_cast({subscribe, Pid, Package}, State) -> + NextState = do_subscribe(Pid, Package, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({unsubscribe, Pid, Package}, State) -> + NextState = do_unsubscribe(Pid, Package, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({request, Ref, Requestor, Action}, State) -> + NextState = do_request(Ref, Requestor, Action, State), + NewState = eval_queue(NextState), + {noreply, NewState}; handle_cast({report, Conn, Message}, State) -> NextState = do_report(Conn, Message, State), NewState = eval_queue(NextState), {noreply, NewState}; -handle_cast({request, Ref, Requestor, Action}, State) -> - NextState = do_request(Ref, Requestpr, Action, State), - NewState = eval_queue(NextState), - {noreply, NewState}; -handle_cast({result, Ref, Result}, State) -> - NextState = do_result(Ref, Result, State), +handle_cast({result, Conn, Ref, Result}, State) -> + NextState = do_result(Conn, Ref, Result, State), NewState = eval_queue(NextState), {noreply, NewState}; handle_cast({notice, Package, Update}, State) -> ok = do_notice(Package, Update, State), {noreply, State}; -handle_cast({subscribe, Pid, Package}, State) -> - NextState = do_request(subscribe, Pid, Package, State), - NewState = eval_queue(NextState), - {noreply, NewState}; -handle_cast({unsubscribe, Pid, Package}, State) -> - NextState = do_request(unsubscribe, Pid, Package, State), - NewState = eval_queue(NextState), - {noreply, NewState}; -handle_cast({pass_meta, Meta, Dir, ArgV}, State) -> - NewState = do_pass_meta(Meta, Dir, ArgV, State), - {noreply, NewState}; handle_cast(Unexpected, State) -> ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), {noreply, State}. @@ -540,23 +575,43 @@ do_pass_meta(Meta, Home, ArgV, State) -> State#s{meta = Meta, home = Home, argv = ArgV}. --spec do_request(Ref, Req, Action, State) -> NextState +-spec do_subscribe(Pid, Package, State) -> NextState + when Pid :: pid(), + Package :: zx:package(), + State :: state(), + NextState :: state(). +%% @private +%% Enqueue a subscription request. + +do_subscribe(Pid, Package, State = #s{actions = Actions}) -> + NewActions = [{Pid, {subscribe, Package}} | Actions], + State#s{actions = NewActions}. + + +-spec do_unsubscribe(Pid, Package, State) -> NextState + when Pid :: pid(), + Package :: zx:package(), + State :: state(), + NextState :: state(). +%% @private +%% Clear or dequeue a subscription request. + +do_unsubscribe(Pid, Package, State = #s{actions = Actions}) -> + NewActions = [{Pid, {unsubscribe, Package}} | Actions], + State#s{actions = NewActions}. + + +-spec do_request(Ref, Requestor, Action, State) -> NextState when Ref :: reference(), - Req :: pid(), + Requestor :: pid(), Action :: action(), State :: state(), NextState :: state(). %% @private %% Enqueue requests and update relevant index. -do_request(subscribe, Req, Package, State = #s{actions = Actions}) -> - NewActions = [{Req, {subscribe, Package}} | Actions], - State#s{actions = NewActions}; -do_request(unsubscribe, Req, Package, State = #s{actions = Actions}) -> - NewActions = [{Req, {unsubscribe, Package}} | Actions], - State#s{actions = NewActions}; -do_request(Ref, Req, Action, State = #s{actions = Actions}) -> - NewActions = [{Ref, Req, Action} | Actions], +do_request(Ref, Requestor, Action, State = #s{actions = Actions}) -> + NewActions = [{Ref, Requestor, Action} | Actions], State#s{actions = NewActions}. @@ -566,90 +621,98 @@ do_request(Ref, Req, Action, State = #s{actions = Actions}) -> State :: state(), NewState :: state(). %% @private -%% Receive a report from a connection process and update the connection index and +%% Receive a report from a connection process, and update the connection index and %% possibly retry connections. do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> - {ok, NextMX} = mx_swap_categories(Conn, MX), + NextMX = mx_upgrade_conn(Conn, MX), {NewMX, NewCX} = case cx_connected(Realms, Conn, CX) of {assigned, NextCX} -> {NextMX, NextCX}; {unassigned, NextCX} -> - ScrubbedMX = mx_del_monitor(Conn, conn, NextMX), + {ok, ScrubbedMX, []} = mx_del_monitor(Conn, conn, NextMX), ok = zx_conn:stop(Conn), {ScrubbedMX, NextCX} end, State#s{mx = NewMX, cx = NewCX}; do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) -> - NextMX = mx_del_monitor(Conn, attempt, MX), + {ok, NextMX} = mx_del_monitor(Conn, attempt, MX), {Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX), - {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX) - State#s{mx = NewMX, cx = NewCX}; -do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) -> - {Unassigned, NextMX, NextCX} = - case mx_lookup_category(Conn, MX) of - attempt -> - ScrubbedMX = mx_del_monitor(Conn, attempt, MX), - ScrubbedCX = cx_failed(Conn, CX), - {[], ScrubbedMX, ScrubbedCX}; - conn -> - ScrubbedMX = mx_del_monitor(Conn, conn, MX), - {ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX), - {Dropped, ScrubbedMX, ScrubbedCX} - end, {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX), - State#s{mx = NewMX, cx = NewCX}. + State#s{mx = NewMX, cx = NewCX}; +do_report(Conn, + disconnected, + State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> + case mx_lookup_category(Conn, MX) of + attempt -> + {ok, NewMX} = mx_del_monitor(Conn, attempt, MX), + NewCX = cx_failed(Conn, CX), + State#s{mx = NewMX, cx = NewCX}; + conn -> + {ok, ScrubbedMX, ReqRefs} = mx_del_monitor(Conn, conn, MX), + ScrubbedCX = cx_disconnected(Conn, CX), + Unassigned = cx_unassigned(ScrubbedCX), + NewActions = maps:to_list(maps:with(ReqRefs, Requests)) ++ Actions, + NewRequests = maps:without(ReqRefs, Requests), + {NewMX, NewCX} = init_connection(Unassigned, ScrubbedMX, ScrubbedCX), + State#s{actions = NewActions, + requests = NewRequests, + mx = NewMX, + cx = NewCX} + end + -spec init_connection(Realms, MX, CX) -> {NewMX, NewCX} when Realms :: [zx:realm()], - MX :: monitor_index() + MX :: monitor_index(), CX :: conn_index(), NewMX :: monitor_index(), NewCX :: conn_index(). %% @private -%% Initiates a single connection and updates the relevant structures. +%% Initiates a connection to a single realm at a time, taking into account whether +%% connected but unassigned nodes are alterantive providers of the needed realm. +%% Returns updated monitor and connection indices. -init_connection([Realm | Realms], MX, CX = #cx{realms = Metas}) -> +init_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> {NewMX, NewCX} = - case maps:get(Realm) of + case maps:get(Realm, RMetas) of #rmeta{available = []} -> - {ok, Host, UpdatedCX} = cx_next_hosts(Realm, CX), - {ok, Pid} = zx_conn:start(Host), - NextMX = mx_add_monitor(Pid, attempt, MX), - NextCX = cx_add_attempt({Pid, Host}, UpdatedCX), + {ok, NextMX, NextCX} = init_connections([Realm], MX, CX), {NextMX, NextCX}; Meta = #rmeta{available = [Conn | Conns]} -> NewMeta = Meta#rmeta{assigned = Conn, available = Conns}, - NewMetas = maps:put(Realm, NewMeta, Metas), - NextCX = CX#cx{realms = NewMetas}, - {MX, NewCX} + NewRMetas = maps:put(Realm, NewMeta, RMetas), + NextCX = CX#cx{realms = NewRMetas}, + {MX, NextCX} end, init_connection(Realms, NewMX, NewCX); init_connection([], MX, CX) -> {MX, CX}. --spec do_result(Ref, Result, State) -> NewState - when Ref :: reference(), +-spec do_result(Conn, Ref, Result, State) -> NewState + when Conn :: pid(), + Ref :: reference(), Result :: result(), State :: state(), NewState :: state(). %% @private %% Receive the result of a sent request and route it back to the original requestor. -do_result(Ref, Result, State = #s{responses = Responses}) -> - NewResponses = - case maps:take(Ref, Responses) of - {{Req, {Type, Object}}, NextResponses} -> +do_result(Conn, Ref, Result, State = #s{requests = Requests, mx = MX}) -> + NewRequests = + case maps:take(Ref, Requests) of + {{Req, {Type, Object}}, NextRequests} -> Req ! {result, {Type, Object, Result}}, - NextResponses; + NextRequests; error -> ok = log(warning, "Received unqueued result ~tp:~tp", [Ref, Result]), - Responses + Requests end, - State#s{responses = NewResponses}. + NewMX = mx_clear_request(Conn, Ref, MX), + State#s{requests = NewRequests, mx = NewMX}. -spec do_notice(Package, Update, State) -> ok @@ -708,7 +771,7 @@ eval_queue([Action = {Pid, {unsubscribe, Package}} | Rest], State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) -> NewActions = lists:delete(Action, Actions), NewSubs = lists:delete({Pid, Package}, Subs), - NewMX = mx_del_monitor(Pid, subscriber, MX), + {ok, NewMX} = mx_del_monitor(Pid, subscriber, MX), Realm = element(1, Package), ok = case cx_resolve(Realm, CX) of @@ -721,9 +784,9 @@ eval_queue([{Ref, Pid, {list, realms}} | Rest], State = #s{cx = CX}) -> ok = send_result(Pid, Ref, {list, realms, {ok, Realms}}), eval_queue(Rest, State); eval_queue([Action = {Ref, Pid, {list, Realm}} | Rest], - State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX}) + State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) when is_list(Realm) -> - {NewActions, NewResponses, NewMX} = + {NewActions, NewRequests, NewMX} = case cx_resolve(Realm, CX) of {ok, Conn} -> ok = cx_conn:list_packages(Conn, Ref), @@ -734,32 +797,32 @@ eval_queue([Action = {Ref, Pid, {list, Realm}} | Rest], unassigned -> NextMX = mx_add_monitor(Pid, requestor, MX), NextActions = [Action | Actions], - {NextActions, Responses, NextMX}; + {NextActions, Requests, NextMX}; unconfigured -> Pid ! {Ref, {list, Realm, {error, bad_realm}}}, - {Actions, Responses, MX} + {Actions, Requests, MX} end, - NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX}, + NewState = State#s{actions = NewActions, requests = NewRequests, mx = NewMX}, eval_queue(Rest, NewState); %% FIXME: Universalize the cx_resolve result, leave only message form explicit. -eval_queue([Action = {Ref, Pid, Message} | Actions], - State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX}) +eval_queue([Action = {Ref, Pid, Message} | Rest], + State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> Realm = element(1, element(2, Message)), - {NewActions, NewResponses, NewMX} = + {NewActions, NewRequests, NewMX} = case cx_resolve(Realm, CX) of {ok, Conn} -> ok = cx_conn:list_packages(Conn, Ref), - NextResponses = maps:put(Ref, {Pid, Message}, Responses), + NextRequests = maps:put(Ref, {Pid, Message}, Requests), NextMX = mx_add_monitor(Pid, requestor, MX), - {Actions, NextResponses, NextMX}; + {Actions, NextRequests, NextMX}; unassigned -> NextActions = [Action | Actions], - {NextActions, Responses, MX}; + {NextActions, Requests, MX}; unconfigured -> - Pid ! {Ref, {list, Realm, {error, bad_realm}}} - {Actions, Responses, MX} + Pid ! {Ref, {list, Realm, {error, bad_realm}}}, + {Actions, Requests, MX} end, - NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX}, + NewState = State#s{actions = NewActions, requests = NewRequests, mx = NewMX}, eval_queue(Rest, NewState). @@ -903,7 +966,7 @@ scrub(Deps) -> mx_new() -> []. --spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index(). +-spec mx_add_monitor(pid(), category(), monitor_index()) -> monitor_index(). %% @private %% Begin monitoring the given Pid, keeping track of its category. @@ -926,41 +989,95 @@ mx_add_monitor(Pid, requestor, MX) -> mx_add_monitor(Pid, attempt, MX) -> false = lists:keymember(Pid, 2, MX), Ref = monitor(process, Pid), - [{Ref, Pid, attempt} | MX]; -mx_add_monitor(Pid, conn, MX) -> + [{Ref, Pid, attempt} | MX]. + + +-spec mx_upgrade_conn(Pid, MX) -> NewMX + when Pid :: pid(), + MX :: monitor_index(), + NewMX :: monitor_index(). +%% @private +%% Upgrade an `attempt' monitor to a `conn' monitor. + +mx_upgrade_conn(Pid, MX) -> {value, {Ref, Pid, attempt}, NextMX} = lists:keytake(Pid, 2, MX), - [{Ref, Pid, conn} | NextMX], + [{Ref, Pid, {conn, []}} | NextMX]. +-spec mx_del_monitor(Conn, Category, MX) -> Result + when Conn :: pid(), + Category :: subscriber + | requestor + | attempt + | conn, + MX :: monitor_index(), + Result :: {ok, NewMX} + | {ok, NewMX, RequestRefs}, + NewMX :: monitor_index(), + RequestRefs :: [reference()]. + -spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index(). %% @private %% Drop a monitor category, removing the entire monitor in the case only one category -%% exists. +%% exists. Returns a tuple including the remaining request references in the case of +%% a conn type. mx_del_monitor(Pid, subscriber, MX) -> - case lists:keytake(Pid, 2, MX) of - {value, {Ref, Pid, {1, 0}}, NextMX} -> - true = demonitor(Ref, [flush]), - NextMX; - {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> - [{Ref, Pid, {Subs - 1, Reqs}} | NextMX]; - false -> - MX - end; + NewMX = + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {1, 0}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} when Subs > 0 -> + [{Ref, Pid, {Subs - 1, Reqs}} | NextMX] + end, + {ok, NewMX}; mx_del_monitor(Pid, requestor, MX) -> - case lists:keytake(Pid, 2, MX) of - {value, {Ref, Pid, {0, 1}}, NextMX} -> - true = demonitor(Ref, [flush]), - NextMX; - {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> - [{Ref, Pid, {Subs, Reqs - 1}} | NextMX]; - false -> - MX - end; -mx_del_monitor(Pid, Category, MX) -> - {value, {Ref, Pid, Category}, NewMX} = lists:keytake(Pid, 2, MX), + NewMX = + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {0, 1}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} when Reqs > 0 -> + [{Ref, Pid, {Subs, Reqs - 1}} | NextMX] + end, + {ok, NewMX}; +mx_del_monitor(Pid, attempt, MX) -> + {value, {Ref, Pid, attempt}, NewMX} = lists:keytake(Pid, 2, MX), true = demonitor(Ref, [flush]), - NewMX. + {ok, NewMX}; +mx_del_monitor(Pid, conn, MX) -> + {value, {Ref, Pid, {conn, RequestRefs}}, NewMX} = lists:keytake(Pid, 2, MX), + true = demonitor(Ref, [flush]), + {ok, NewMX, RequestRefs}. + + +-spec mx_stash_request(Conn, RequestRef, MX) -> NewMX + when Conn :: pid(), + RequestRef :: reference(), + MX :: monitor_index(), + NewMX :: monitor_index(). +%% @private +%% Add a pending request reference to the connection monitor's record. + +mx_stash_request(Conn, RequestRef, MX) -> + {value, {Ref, Conn, {conn, Pending}}, NextMX} = lists:keytake(Conn, 2, MX), + NewPending = [RequestRef | Pending], + [{Ref, Conn, {conn, NewPending}} | NextMX]. + + +-spec mx_clear_request(Conn, RequestRef, MX) -> NewMX + when Conn :: pid(), + RequestRef :: reference(), + MX :: monitor_index(), + NewMX :: monitor_index(). +%% @private +%% Remove a pending request reference from the connection monitor's record. + +mx_clear_request(Conn, RequestRef, MX) -> + {value, {Ref, Conn, {conn, Pending}}, NextMX} = lists:keytake(Conn, 2, MX), + NewPending = lists:delete(RequestRef, Pending), + [{Ref, Conn, {conn, NewPending}} | NextMX]. -spec mx_lookup_category(pid(), monitor_index()) -> Result @@ -968,38 +1085,18 @@ mx_del_monitor(Pid, Category, MX) -> | conn | requestor | subscriber - | sub_req - | error. + | sub_req. %% @private %% Lookup a monitor's categories. mx_lookup_category(Pid, MX) -> - case lists:keyfind(Pid, 2, MX) of - {_, _, attempt} -> attempt; - {_, _, conn} -> conn; - {_, _, {0, _}} -> requestor; - {_, _, {_, 0}} -> subscriber; - {_, _, _} -> sub_req; - false -> error - end. - - --spec mx_upgrade_conn(Pid, MX) -> Result - when Pid :: pid(), - MX :: monitor_index(), - Result :: {ok, NewMX} - | {error, not_found}, - NewMX :: monitor_index(). -%% @private -%% Upgrade a attempt to a conn. - -mx_upgrade_conn(Pid, MX) -> - case lists:keytake(Pid, 2, MX) of - {value, {Ref, Pid, attempt}, NextMX} -> - NewMX = [{Ref, Pid, conn} | NextMX], - {ok, NewMX}; - false -> - {error, not_found} + Monitor = lists:keyfind(Pid, 2, MX), + case element(3, Monitor) of + attempt -> attempt; + {conn, _} -> conn; + {0, _} -> requestor; + {_, 0} -> subscriber; + {_, _} -> sub_req end. @@ -1009,10 +1106,10 @@ mx_upgrade_conn(Pid, MX) -> %%% Functions to manipulate the conn_index() data type are in this section. This %%% data should be treated as abstract by functions outside of this section, as it is %%% a deep structure and future requirements are likely to wind up causing it to -%%% change a little. For the same reason, these functions are all pure functions, -%%% testable independently of the rest of the module and having no side effects. -%%% Return values usually carry some status information with them, and it is up to the -%%% caller to react to those responses in an appropriate way. +%%% change a little. For the same reason, these functions are all pure, independently +%%% testable, and have no side effects. +%%% +%%% Return values often carry some status information with them. -spec cx_load() -> conn_index(). %% @private @@ -1025,7 +1122,8 @@ cx_load() -> {ok, CX} -> CX; {error, Reason} -> - ok = log(error, "Cache load failed with: ~tp", [Reason]), + Message = "Realm data and host cache load failed with : ~tp", + ok = log(error, Message, [Reason]), ok = log(warning, "No realms configured."), #cx{} end. @@ -1036,6 +1134,12 @@ cx_load() -> | {error, Reason}, Reason :: no_realms | file:posix(). +%% @private +%% This procedure, relying zx_lib:zomp_home() allows the system to load zomp data +%% from any arbitrary home for zomp. This has been included mostly to make testing of +%% Zomp and ZX easier, but incidentally opens the possibility that an arbitrary Zomp +%% home could be selected by an installer (especially on consumer systems like Windows +%% where any number of wild things might be going on in the user's filesystem). cx_populate() -> Home = zx_lib:zomp_home(), @@ -1046,6 +1150,15 @@ cx_populate() -> end. +-spec cx_populate(RealmFiles, CX) -> NewCX + when RealmFiles :: file:filename(), + CX :: conn_index(), + NewCX :: conn_index(). +%% @private +%% Pack an initially empty conn_index() with realm meta and host cache data. +%% Should not halt on a corrupted, missing, malformed, etc. realm file but will log +%% any loading errors. + cx_populate([File | Files], CX) -> case file:consult(File) of {ok, Meta} -> @@ -1060,6 +1173,13 @@ cx_populate([], CX) -> CX. +-spec cx_load_realm_meta(Meta, CX) -> NewCX + when Meta :: [{Key :: atom(), Value :: term()}], + CX :: conn_index(), + NewCX :: conn_index(). +%% @private +%% This function MUST adhere to the realmfile definition found at. + cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) -> {realm, Realm} = lists:keyfind(realm, 1, Meta), {revision, Revision} = lists:keyfind(revision, 1, Meta), @@ -1074,12 +1194,27 @@ cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) -> realm_keys = RealmKeys, package_keys = PackageKeys, sysops = Sysops}, - Complete = cx_fetch_cache(Realm, Basic), + Complete = cx_load_cache(Realm, Basic), NewRealms = maps:put(Realm, Complete, Realms), CX#cx{realms = NewRealms}. -cx_fetch_cache(Realm, Basic) -> +-spec cx_load_cache(Realm, Basic) -> Complete + when Realm :: zx:realm(), + Basic :: realm_meta(), + Complete :: realm_meta(). +%% @private +%% Receive a realm_meta() that lacks any cache data and load the realm's cache file +%% if it exists, and return it fully populated. +%% FIXME: If several instances of zomp or zx are running at once it may be possible +%% to run into file access contention or receive an incomplete read on some +%% systems. At the moment this is only a remote possibility and for now should +%% be handled by simply re-running whatever command caused the failure. +%% Better file contention and parallel-executing system handling should +%% eventually be implemented, especially if zx becomes common for end-user +%% GUI programs. + +cx_load_cache(Realm, Basic) -> CacheFile = cx_cache_file(Realm), case file:consult(CacheFile) of {ok, Cache} -> @@ -1105,6 +1240,8 @@ cx_store_cache(#cx{realms = Realms}) -> -spec cx_write_cache({zx:realm(), realm_meta()}) -> ok. +%% @private +%% FIXME: The same concerns as noted in the cx_load_cache/2 FIXME comment apply here. cx_write_cache({Realm, #rmeta{serial = Serial, private = Private, mirrors = Mirrors}}) -> @@ -1183,7 +1320,7 @@ cx_next_host2(Realm, CX = #cx{realm = Realms}) -> %% indicate to the caller that any iterative host scanning or connection procedures %% should treat this as the last value of interest (and stop spawning connections). -cx_next_host3(Meta = #rmeta{prime = Prime, private = Privae, mirrors = Mirrors}) -> +cx_next_host3(Meta = #rmeta{prime = Prime, private = Private, mirrors = Mirrors}) -> case queue:is_empty(Mirrors) of false -> {{value, Next}, NewMirrors} = queue:out(Mirrors), @@ -1203,8 +1340,8 @@ cx_next_host3(Meta = #rmeta{prime = Prime, private = Privae, mirrors = Mirrors}) | {error, Reason}, Hosts :: [zx:host()], NewCX :: conn_index(), - Reason :: bad_realm - | connected. + Reason :: {connected, Conn :: pid()} + | bad_realm. %% @private %% This function allows recruiting an arbitrary number of hosts from the host cache %% of a given realm, taking private mirrors first, then public mirrors, and ending @@ -1212,21 +1349,16 @@ cx_next_host3(Meta = #rmeta{prime = Prime, private = Privae, mirrors = Mirrors}) cx_next_hosts(N, Realm, CX = #cx{realms = Realms}) -> case maps:find(Realm, Realms) of - {ok, #rmeta{assigned = none}} -> cx_next_hosts2(N, Realm, CX); - {ok, _} -> {error, connected}; - error -> {error, bad_realm} + {ok, Meta = #rmeta{assigned = none}} -> cx_next_hosts2(N, Realm, Meta, CX); + {ok, #rmeta{assigned = Conn}} -> {error, {connected, Conn}}; + error -> {error, bad_realm} end. -cx_next_hosts2(N, Realm, CX = #cx{realms = Realms}) -> - case maps:find(Realm, Realms) of - {ok, Meta} -> - {ok, Hosts, NewMeta} = cx_next_hosts3(N, [], Meta), - NewRealms = maps:put(Realm, NewMeta, Realms), - {ok, Hosts, CX#cx{realms = NewRealms}}; - error -> - {error, bad_realm} - end. +cx_next_hosts2(N, Realm, Meta, CX = #cx{realms = Realms}) -> + {ok, Hosts, NewMeta} = cx_next_hosts3(N, [], Meta), + NewRealms = maps:put(Realm, NewMeta, Realms), + {ok, Hosts, CX#cx{realms = NewRealms}}. cx_next_hosts3(N, Hosts, Meta) when N < 1 -> @@ -1238,13 +1370,34 @@ cx_next_hosts3(N, Hosts, Meta) -> end. --spec cx_add_attempt(New, CX) -> NewCX - when New :: {pid(), zx:host()}, +-spec cx_maybe_add_attempt(Host, Realm, CX) -> Result + when Host :: zx:host(), + Realm :: zx:realm(), + CX :: conn_index(), + Result :: not_connected + | {ok, NewCX}, + NewCX :: conn_index(). + +cx_maybe_add_attempt(Host, Realm, CX = #cx{attempts = Attempts}) -> + case lists:keytake(Host, 2, Attempts) of + false -> + not_connected; + {value, {Pid, Host, Realms}, NextAttempts} -> + NewAttempts = [{Pid, Host, [Realm | Realms]} | Attempts], + NewCX = CX#cx{attempts = NewAttempts}, + {ok, NewCX} + end. + + +-spec cx_add_attempt(Pid, Host, Realm, CX) -> NewCX + when Pid :: pid(), + Host :: zx:host(), + Realm :: zx:realm(), CX :: conn_index(), NewCX :: conn_index(). -cx_add_attempt(New, CX = #cx{attempts = Attempts}) -> - CX#cx{attempts = [New | Attempts]}. +cx_add_attempt(Pid, Host, Realm, CX = #cx{attempts = Attempts}) -> + CX#cx{attempts = [{Pid, Host, [Realm]} | Attempts]}. -spec cx_connected(Available, Conn, CX) -> Result @@ -1262,18 +1415,29 @@ cx_add_attempt(New, CX = #cx{attempts = Attempts}) -> %% The return value is a tuple that indicates whether the new connection was assigned %% or not and the updated CX data value. -cx_connected(Available, Conn, CX = #cx{attempts = Attempts, conns = Conns}) -> - {value, {Conn, Host}, NewAttempts} = lists:keytake(Conn, 1, Attempts), - Realms = [element(1, A) || A <- Available], - NewConns = [{Conn, Host, Realms} | Conns], - NewCX = CX#cx{attempts = NewAttempts, conns = NewConns}, - cx_connected(unassigned, Available, Conn, NewCX). +cx_connected(Available, Conn, CX = #cx{attempts = Attempts}) -> + {value, {Pid, Host, _}, NewAttempts} = lists:keytake(Conn, 1, Attempts), + Realms = [R || {R, _} <- Available], + NewCX = CX#cx{attempts = NewAttempts}, + cx_connected(unassigned, Available, {Pid, Host, Realms}, NewCX). +-spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX} + when A :: unassigned | assigned, + Available :: [{zx:realm(), zx:serial()}], + Conn :: {pid(), zx:host(), [zx:realm()]}, + CX :: conn_index(), + NewA :: unassigned | assigned, + NewCX :: conn_index(). +%% @private +%% Only accept a new realm as available if the reported serial is equal or newer to the +%% highest known serial. + cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) -> case maps:find(Realm, Realms) of {ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial -> - NewMeta = Meta#rmeta{serial = Serial, available = [Conn | Available]}, + Pid = element(1, Conn), + NewMeta = Meta#rmeta{serial = Serial, available = [Pid | Available]}, {NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX), cx_connected(NewA, Rest, Conn, NewCX); {ok, #rmeta{serial = S}} when S > Serial -> @@ -1281,32 +1445,59 @@ cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) -> error -> cx_connected(A, Rest, Conn, CX) end; -cx_connected(A, [], Conn, CX = #cx{attempts = Attempts, conns = Conns}) -> - {value, ConnRec, NewAttempts} = lists:keytake(Conn, 1, Attempts), - NewConns = [ConnRec | Conns], - NewCX = CX#cx{attempts = NewAttempts, conns = NewConns}, - {A, NewCX}. +cx_connected(A, [], Conn, CX = #cx{conns = Conns}) -> + {A, CX#cx{conns = [Conn | Conns]}}. +-spec cx_connected(A, Realm, Conn, Meta, CX) -> {NewA, NewCX} + when A :: unassigned | assigned, + Realm :: zx:host(), + Conn :: {pid(), zx:host(), [zx:realm()]}, + Meta :: realm_meta(), + CX :: conn_index(), + NewA :: unassigned | assigned, + NewCX :: conn_index(). +%% @private +%% This function matches on two elements: +%% - Whether or not the realm is already assigned (and if so, set `A' to `assigned' +%% - Whether the host node is the prime node (and if not, ensure it is a member of +%% of the mirror queue). + cx_connected(A, Realm, - Conn, - Meta = #rmeta{prime = Prime, mirrors = Mirrors}, - CX = #cx{realms = Realms, attempts = Attempts}) -> - {NewMirrors, Node} = - case lists:keyfind(Conn, 1, Attempts) of - {_, Prime} -> {Mirrors, Prime}; - {_, Host} -> {enqueue_unique(Host, Mirrors), Host} - end, - {NewA, NewAssigned} = - case lists:keymember(Realm, 1, Assigned) of - true -> {A, Assigned}; - false -> {assigned, [{Realm, Pid} | Assigned]} - end, - NewMeta = Meta#rmeta{mirrors = NewMirrors}, + {Pid, Prime, _}, + Meta = #rmeta{prime = Prime, assigned = none}, + CX = #cx{realms = Realms}) -> + NewMeta = Meta#rmeta{assigned = Pid}, NewRealms = maps:put(Realm, NewMeta, Realms), - NewCX = CX#cx{realms = NewRealms, assigned = NewAssigned}, - {NewA, NewCX}. + NewCX = CX#cx{realms = NewRealms}, + {assigned, NewCX}; +cx_connected(A, + Realm, + {Pid, Host, _}, + Meta = #rmeta{mirrors = Mirrors, assigned = none}, + CX = #cx{realms = Realms, attempts = Attempts}) -> + NewMirrors = enqueue_unique(Host, Mirrors), + NewMeta = Meta#rmeta{mirrors = NewMirrors, assigned = Pid}, + NewRealms = maps:put(Realm, NewMeta, Realms), + NewCX = CX#cx{realms = NewRealms}, + {assigned, NewCX}; +cx_connected(A, + _, + {_, Prime, _}, + #rmeta{prime = Prime}, + CX) -> + {A, CX}; +cx_connected(A, + Realm, + {_, Host, _}, + Meta = #rmeta{mirrors = Mirrors}, + CX = #cx{realms = Realms}) -> + NewMirrors = enqueue_unique(Host, Mirrors), + NetMeta = Meta#rmeta{mirrors = NewMirrors}, + NewRealms = maps:put(Realm, NewMeta, Realms), + NewCX = CX#cx{realms = NewRealms}, + {A, NewCX}. -spec enqueue_unique(term(), queue:queue()) -> queue:queue(). @@ -1353,22 +1544,18 @@ cx_redirect(Conn, Hosts, CX) -> CX :: conn_index(), NextCX :: conn_index(). %% @private -%% Add host to any realm mirror queues that are known to provide the realm. +%% Add the host to any realm mirror queues that it provides. cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) -> Apply = - fun(R, M) -> - case maps:find(R, M) of + fun(R, Rs) -> + case maps:find(R, Rs) of {ok, Meta = #rmeta{mirrors = Mirrors}} -> - NewMirrors = - case queue:member(Host, Mirrors) of - true -> Mirrors; - false -> queue:in(Host, Mirrors) - end, + NewMirrors = enqueue_unique(Host, Mirrors), NewMeta = Meta#rmeta{mirrors = NewMirrors}, - maps:put(R, Meta, M); + maps:put(R, NewMeta, Rs); error -> - M + R end end, NewRealms = lists:foldl(Apply, Realms, Provided), @@ -1377,6 +1564,13 @@ cx_redirect([], CX) -> CX. +-spec cx_unassigned(CX) -> Unassigned + when CX :: conn_index(), + Unassigned :: [zx:realm()]. +%% @private +%% Scan the CX record for unassigned realms;return a list of all unassigned +%% realm names. + cx_unassigned(#cx{realms = Realms}) -> NotAssigned = fun(Realm, #rmeta{assigned = Conn}, Unassigned) -> @@ -1388,14 +1582,10 @@ cx_unassigned(#cx{realms = Realms}) -> maps:fold(NotAssigned, [], Realms). --spec cx_disconnected(Conn, CX) -> Result - when Conn :: pid(), - CX :: conn_index(), - Result :: {ok, UnassignedRealms, NewCX} - | {error, unknown}, - Mon :: reference(), - UnassignedRealms :: [zx:realm()], - NewCX :: conn_index(). +-spec cx_disconnected(Conn, CX) -> NewCX + when Conn :: pid(), + CX :: conn_index(), + NewCX :: conn_index(). %% @private %% An abstract data handler which is called whenever a connection terminates. %% This function removes all data related to the disconnected pid and its assigned @@ -1403,22 +1593,16 @@ cx_unassigned(#cx{realms = Realms}) -> %% unassigned, and an updated connection index. cx_disconnected(Conn, CX = #cx{realms = Realms, conns = Conns}) -> - case lists:keytake(Conn, 1, Conns) of - {value, {Conn, Host, Realms}, NewConns} -> - {UnassignedRealms, NewRealms} = cx_scrub_assigned(Conn, Host, Realms), - NewCX = CX#cx{realms = NewRealms, conns = NewConns}, - {ok, UnassignedRealms, NewCX}; - false -> - {error, unknown} - end. + {value, {Pid, Host, _}, NewConns} = lists:keytake(Conn, 1, Conns), + NewRealms = cx_scrub_assigned(Pid, Host, Realms), + CX#cx{realms = NewRealms, conns = NewConns}. --spec cx_scrub_assigned(Pid, Host, Realms) -> {UnassignedRealms, NewRealms} +-spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms when Pid :: pid(), Host :: zx:host(), - Realms :: realm_meta(), - UnassignedRealms :: [zx:realm()], - NewRealms :: realm_meta(). + Realms :: [realm_meta()], + NewRealms :: [realm_meta()]. %% @private %% This could have been performed as a set of two list operations (a partition and a %% map), but to make the procedure perfectly clear it is written out explicitly. @@ -1426,29 +1610,15 @@ cx_disconnected(Conn, CX = #cx{realms = Realms, conns = Conns}) -> cx_scrub_assigned(Pid, Host, Realms) -> Scrub = fun - (Realm, - Meta = #rmeta{mirrors = Mirrors, - assigned = Assigned, - available = Available}, - {Unassigned, Metas}) -> - {NewUnassigned, NewAssigned} = - case Assigned of - Pid -> {[Realm | Unassigned], none}; - Other -> {Unassigned, Other} - end, - NewMirrors = - case queue:member(Host, Mirrors) of - false -> queue:in(Host, Mirrors); - true -> Mirrors - end, - NewAvailable = lists:delete(Pid, Available), - NewMeta = Meta#rmeta{mirrors = NewMirrors, - assigned = NewAssigned, - available = NewAvailable}, - NewMetas = maps:put(Realm, NewMeta, Metas), - {NewUnassigned, NewMetas} + (_, V = #rmeta{mirrors = M, available = A, assigned = C}) when C == Pid -> + V#rmeta{mirrors = enqueue_unique(Host, M), + available = lists:delete(Pid, A), + assigned = none}; + (_, V = #rmeta{mirrors = M, available = A}) -> + V#rmeta{mirrors = enqueue_unique(Host, M), + available = lists:delete(Pid, A)} end, - maps:fold(Scrub, {[], maps:new()}, Realms). + maps:map(Scrub, Realms). -spec cx_resolve(Realm, CX) -> Result