diff --git a/TODO b/TODO index 3ee9152..fed003f 100644 --- a/TODO +++ b/TODO @@ -2,11 +2,23 @@ On redirect a list of hosts of the form [{zx:host(), [zx:realm()]}] must be provided to the downstream client. This is the only way that downstream clients can determine which redirect hosts are useful to it. - - Connect attempts and established connections should have different report statuses. - An established connection does not necessarily have other attempts being made concurrently, so a termination should initiate 3 new connect attempts as init. - An attempt is just an attempt. It can fall flat and be replaced only 1::1. + - Change zx_daemon request references to counters. + Count everything. + This will be the only way to sort of track stats other than log analysis. + Log analysis sucks. + + - Double-indexing must happen everywhere for anything to be discoverable without traversing the entire state of zx_daemon whenever a client or conn crashes. + + - zx_daemon request() types have been changes to flat, wide tuples as in the main zx module. + This change is not yet refected in the using code. + + - Write a logging process. + Pick a log rotation scheme. + Eventually make it so that it can shed log loads in the event they get out of hand. + - Create zx_daemon primes. + See below New Feature: ZX Universal lock Cross-instance communication diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx.erl b/zomp/lib/otpr-zx/0.1.0/src/zx.erl index 9c5b057..306bb91 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx.erl @@ -50,7 +50,7 @@ -type option() :: {string(), term()}. -type host() :: {string() | inet:ip_address(), inet:port_number()}. -type key_id() :: {realm(), key_name()}. --type key_name() :: label(). +-type key_name() :: lower0_9(). -type user() :: {realm(), username()}. -type username() :: label(). -type lower0_9() :: [$a..$z | $0..$9 | $_]. diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl index ba8d1c5..4cefbad 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl @@ -11,7 +11,7 @@ -copyright("Craig Everett "). -license("GPL-3.0"). --export([start_monitor/1, stop/1]). +-export([start/1, stop/1]). -export([start_link/1]). -include("zx_logger.erl"). @@ -20,24 +20,17 @@ %%% Startup --spec start_monitor(Target) -> Result +-spec start(Target) -> Result when Target :: zx:host(), - Result :: {ok, PID :: pid(), Mon :: reference()} - | {error, Reason}, - Reason :: term(). + Result :: {ok, pid()} + | {error, Reason :: term()}. %% @doc %% Starts a connection to a given target Zomp node. This call itself should never fail, %% but this process may fail to connect or crash immediately after spawning. Should %% only be called by zx_daemon. -start_monitor(Target) -> - case zx_conn_sup:start_conn(Target) of - {ok, Pid} -> - Mon = monitor(process, Pid), - {ok, Pid, Mon}; - Error -> - Error - end. +start(Target) -> + zx_conn_sup:start_conn(Target). -spec stop(Conn :: pid()) -> ok. @@ -49,16 +42,24 @@ stop(Conn) -> ok. --spec subscribe(Conn, Realm) -> ok - when Conn :: pid(), - Realm :: zx:realm(), - Result :: ok. +-spec subscribe(Conn, Package) -> ok + when Conn :: pid(), + Package :: zx:package(). subscribe(Conn, Realm) -> Conn ! {subscribe, Realm}, ok. +-spec unsubscribe(Conn, Package) -> ok + when Conn :: pid(), + Package :: zx:package(). + +unsubscribe(Conn, Package) -> + Conn ! {unsubscribe, Package}, + ok. + + -spec start_link(Target) -> when Target :: zx:host(), Result :: {ok, pid()} @@ -172,6 +173,9 @@ loop(Parent, Debug, Socket) -> ok = handle(Bin, Socket), ok = inet:setopts(Socket, [{active, once}]), loop(Parent, Debug, Socket); + {subscribe, Package} -> + {unsubscribe, Package} -> + stop -> ok = zx_net:disconnect(Socket), terminate(); diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index c1f6bef..caf10fe 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -98,10 +98,9 @@ -copyright("Craig Everett "). -license("GPL-3.0"). - -export([pass_meta/3, subscribe/1, unsubscribe/1, - list/1, latest/1, + list/0, list/1, list/2, list/3, latest/1, fetch/1, key/1, pending/1, packagers/1, maintainers/1, sysops/1]). -export([report/1, result/2, notify/2]). @@ -109,6 +108,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). + +-export_type([id/0, result/0, + realm_list/0, package_list/0, version_list/0, latest_result/0, + fetch_result/0, key_result/0, + pending_result/0, pack_result/0, maint_result/0, sysop_result/0, + sub_message/0]). + + -include("zx_logger.hrl"). @@ -119,9 +126,10 @@ {meta = none :: none | zx:package_meta(), home = none :: none | file:filename(), argv = none :: none | [string()], - actions = [] :: [{pid(), subunsub()} | {reference(), request()}], - requests = maps:new() :: #{reference() := request()}, - subs = [] :: [{pid(), zx:package()}], + id = 0 :: id(), + actions = [] :: [request()], + requests = maps:new() :: requests(), + dropped = maps:new() :: requests(), mx = mx_new() :: monitor_index(), cx = cx_load() :: conn_index()}). @@ -149,18 +157,22 @@ {pid :: pid(), host :: zx:host(), realms :: [zx:realm()], - requests :: [reference()], - subs :: [zx:package()]}). + requests :: [id()], + subs :: [{pid(), zx:package()}]}). %% State Types -type state() :: #s{}. -%-type monitor_index() :: [{reference(), pid(), category()}]. +-opaque id() :: non_neg_integer(). +-type request() :: {subscribe, pid(), zx:package()} + | {unsubscribe, pid(), zx:package()} + | {request, pid(), id(), action()}. +-type requests() :: #{id() := {pid(), action()}}. -type monitor_index() :: #{pid() := {reference(), category()}}. -type conn_index() :: #cx{}. -type realm_meta() :: #rmeta{}. -type connection() :: #conn{}. --type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()} +-type category() :: {Reqs :: [id()], Subs :: [zx:package()]} | attempt | conn. @@ -171,19 +183,18 @@ %% Subscriber / Requestor Communication % Incoming Request messages --type action () :: {Requestor :: pid(), - Request :: request() | subunsub()}. - --type request() :: {list, realms | zx:identifier()} - | {latest, zx:identifier()} - | {fetch, zx:package_id()} - | {key, zx:key_id()} - | {pending, zx:package()} - | {packagers, zx:package()} - | {maintainers, zx:package()} +% This form allows a bit of cheating with blind calls to `element(2, Request)'. +-type action() :: list + | {list, zx:realm()} + | {list, zx:realm(), zx:name()} + | {list, zx:realm(), zx:name(), zx:version()} + | {latest, zx:realm(), zx:name(), zx:version()} + | {fetch, zx:realm(), zx:name(), zx:version()} + | {key, zx:realm(), zx:key_name()} + | {pending, zx:realm(), zx:name()} + | {packagers, zx:realm(), zx:name()} + | {maintainers, zx:realm(), zx:name()} | {sysops, zx:realm()}. --type subunsub() :: {subscribe, zx:package()} - | {unsubscribe, zx:package()}. % Outgoing Result Messages % @@ -192,67 +203,62 @@ % % Subscription messages are a separate type below. --type result() :: sub_message() - | list_result() - | latest_result() - | fetch_result() - | key_result() - | pending_result() - | pack_result() - | maint_result() - | sysop_result(). --type list_result() :: {list, realms, - Message :: {ok, [zx:realm()]}} - | {list, zx:realm(), - Message :: {ok, [zx:name()]} - | {error, bad_realm | timeout}} - | {list, zx:package(), - Message :: {ok, [zx:version()]} - | {error, bad_realm - | bad_package - | timeout}}. --type latest_result() :: {latest, - Package :: zx:package() - | zx:package_id(), - Message :: {ok, zx:package_id()} - | {error, bad_realm - | bad_package - | bad_version - | timeout}}. --type fetch_result() :: {fetch, zx:package_id(), - Message :: {hops, non_neg_integer()} - | done - | {error, bad_realm - | bad_package - | bad_version - | timeout}}. --type key_result() :: {key, zx:key_id(), - Message :: {ok, binary()} - | {error, bad_key - | timeout}}. --type pending_result() :: {pending, zx:package(), - Message :: {ok, [zx:version()]} - | {error, bad_realm - | bad_package - | timeout}}. --type pack_result() :: {packagers, zx:package(), - Message :: {ok, [zx:user()]} - | {error, bad_realm - | bad_package - | timeout}}. --type maint_result() :: {maintainers, zx:package(), - Message :: {ok, [zx:user()]} - | {error, bad_realm - | bad_package - | timeout}}. --type sysop_result() :: {sysops, zx:realm(), - Message :: {ok, [zx:user()]} - | {error, bad_host - | timeout}}. +-type result() :: {z_result, + RequestID :: id(), + Message :: realm_list() + | package_list() + | version_list() + | latest_result() + | fetch_result() + | key_result() + | pending_result() + | pack_result() + | maint_result() + | sysop_result()}. + +-type realm_list() :: [zx:realm()]. +-type package_list() :: {ok, [zx:name()]} + | {error, bad_realm + | timeout}. +-type version_list() :: {ok, [zx:version()]} + | {error, bad_realm + | bad_package + | timeout}. +-type latest_result() :: {ok, zx:version()} + | {error, bad_realm + | bad_package + | bad_version + | timeout}. +-type fetch_result() :: {hops, non_neg_integer()} + | done + | {error, bad_realm + | bad_package + | bad_version + | timeout}. +-type key_result() :: done + | {error, bad_realm + | bad_key + | timeout}. +-type pending_result() :: {ok, [zx:version()]} + | {error, bad_realm + | bad_package + | timeout}. +-type pack_result() :: {ok, [zx:user()]} + | {error, bad_realm + | bad_package + | timeout}. +-type maint_result() :: {ok, [zx:user()]} + | {error, bad_realm + | bad_package + | timeout}. +-type sysop_result() :: {ok, [zx:user()]} + | {error, bad_host + | timeout}. % Subscription Results --type sub_message() :: {subscription, zx:package(), +-type sub_message() :: {z_sub, + zx:package(), Message :: {update, zx:package_id()} | {error, bad_realm | bad_package}}. @@ -274,123 +280,236 @@ %% be known whether the very first thing the target application will do is send this %% process an async message. That implies that this should only ever be called once, %% by the launching process (which normally terminates shortly thereafter). -%% -%% FIXME: I don't like something about this idea. Either it is wrong to be passing in -%% the primary project's meta, or it is wrong to be doing it this way, or it is -%% wrong to have only *one* primary app in the EVM instance. Or something. -%% Something smells off about this. -%% We WILL need to maintain and know meta. -%% We DON'T know when or why it will be important to running programs. -%% Once we understand this better we need to come back and fix or replace it. -%% (2018-03-01 -CRE) pass_meta(Meta, Dir, ArgV) -> gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). --spec subscribe(zx:package()) -> ok. +-spec subscribe(Package) -> ok + when Package :: zx:package(). %% @doc %% Subscribe to update notifications for a for a particular package. -%% The caller will receive update notifications of type sub_result() as Erlang +%% The caller will receive update notifications of type `sub_message()' as Erlang %% messages whenever an update occurs. +%% Crashes the caller if the Realm or Name of the Package argument are illegal +%% `zx:lower0_9' strings. -subscribe(Package) -> +subscribe(Package = {Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), gen_server:cast(?MODULE, {subscribe, self(), Package}). --spec unsubscribe(zx:package()) -> ok. +-spec unsubscribe(Package) -> ok + when Package :: zx:package(). %% @doc %% Instructs the daemon to unsubscribe if subscribed. Has no effect if not subscribed. +%% Crashes the caller if the Realm or Name of the Package argument are illegal +%% `lower0_9' strings. -unsubscribe(Package) -> +unsubscribe(Package = {Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), gen_server:cast(?MODULE, {unsubscribe, self(), Package}). --spec list(Identifier) -> ok - when Identifier :: realms - | zx:identifier(). +-spec list() -> realm_list(). %% @doc -%% Requests a list of realms, packages, or package versions depending on the -%% identifier provided. +%% Request a list of currently configured realms. Because this call is entirely local +%% it is the only one that does not involve a round-trip -list(Identifier) -> - request({list, Identifier}). +list() -> + gen_server:call(?MODULE, {request, list}). --spec latest(Identifier) -> ok - when Identifier :: zx:package() | zx:package_id(). +-spec list(Realm) -> {ok, RequestID} + when Realm :: zx:realm(), + RequestID :: term(). %% @doc -%% Request the lastest version of a package within the scope of the identifier. -%% If a package with no version is provided, the absolute latest version will be -%% sent once queried. If a partial version is provided then only the latest version -%% within the provided scope will be returned. +%% Requests a list of packages provided by the given realm. +%% Returns a request ID which will be returned in a message with the result from an +%% upstream zomp node. Crashes the caller if Realm is an illegal string. %% -%% If package {"foo", "bar"} has [{1,2,3}, {1,2,5}, {1,3,1}, {2,4,5}] available, -%% querying {"foo", "bar"} or {"foo", "bar", {z,z,z}} will result in {2,4,5}, but -%% querying {"foo", "bar", {1,z,z}} will result in {1,3,1}, granted that the realm -%% "foo" is reachable. +%% Response messages are of the type `result()' where the third element is of the +%% type `package_list()'. -latest(Identifier) -> - request({latest, Identifier}). +list(Realm) -> + true = zx_lib:valid_lower0_9(Realm), + request({list, Realm}). --spec fetch(zx:package_id()) -> ok. +-spec list(Realm, Name) -> {ok, RequestID} + when Realm :: zx:realm(), + Name :: zx:name(), + RequestID :: term(). +%% @doc +%% Requests a list of package versions. +%% Returns a request ID which will be returned in a message with the result from an +%% upstream zomp node. Crashes the if Realm or Name are illegal strings. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `version_list()'. + +list(Realm, Name) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + request({list, Realm, Name}). + + +-spec list(Realm, Name, Version) -> {ok, RequestID} + when Realm :: zx:realm(), + Name :: zx:name(), + Version :: zx:version(), + RequestID :: term(). +%% @doc +%% Request a list of package versions constrained by a partial version. +%% Returns a request ID which will be returned in a message with the result from an +%% upstream zomp node. Can be used to check for a specific version by testing for a +%% response of `{error, bad_version}' when a full version number is provided. +%% Crashes the caller on an illegal realm name, package name, or version tuple. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `list_result()'. + +list(Realm, Name, Version) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + true = zx_lib:valid_version(Version), + request({list, Realm, Name, Version}). + + +-spec latest(Identifier) -> {ok, RequestID} + when Identifier :: zx:package() | zx:package_id(), + RequestID :: integer(). +%% @doc +%% Request the lastest version of a package within the provided version constraint. +%% If no version is provided then the latest version overall will be returned. +%% Returns a request ID which will be returned in a message with the result from an +%% upstream zomp node. Crashes the caller on an illegal realm name, package name or +%% version tuple. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `latest_result()'. + +latest({Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + request({latest, Realm, Name, {z, z, z}}); +latest({Realm, Name, Version}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + true = zx_lib:valid_version(Version), + request({latest, Realm, Name, Version}). + + +-spec fetch(PackageID) -> {ok, RequestID} + when PackageID :: zx:package_id(), + RequestID :: integer(). %% @doc %% Ensure a package is available locally, or queue it for download otherwise. +%% Returns a request ID which will be returned in a message with the result from an +%% upstream zomp node. Crashes the caller on an illegal realm name, package name or +%% version tuple. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `fetch_result()'. -fetch(PackageID) -> - request({fetch, PackageID}). +fetch({Realm, Name, Version}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + true = zx_lib:valid_version(Version), + request({fetch, Realm, Name, Version}). --spec key(zx:key_id()) -> ok. +-spec key(KeyID) -> {ok, RequestID} + when KeyID :: zx:key_id(), + RequestID :: id(). %% @doc %% Request a public key be fetched from its relevant realm. +%% Crashes the caller if either component of the KeyID is illegal. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `key_result()'. -key(KeyID) -> - request({key, KeyID}). +key({Realm, KeyName}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(KeyName), + request({key, Realm, KeyName}). --spec pending(zx:package()) -> ok. +-spec pending(Package) -> {ok, RequestID} + when Package :: zx:package(), + RequestID :: id(). %% @doc %% Request the list of versions of a given package that have been submitted but not %% signed and included in their relevant realm. +%% Crashes the caller if either component of the Package is illegal. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `pending_result()'. -pending(Package) -> - request({pending, Package}). +pending({Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + request({pending, Realm, Name}). --spec packagers(zx:package()) -> ok. +-spec packagers(Package) -> {ok, RequestID} + when Package :: zx:package(), + RequestID :: id(). %% @doc %% Request a list of packagers assigned to work on a given package. +%% Crashes the caller if either component of the Package is illegal. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `pack_result()'. -packagers(Package) -> - request({packagers, Package}). +packagers({Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + request({packagers, Realm, Name}). --spec maintainers(zx:package()) -> ok. +-spec maintainers(Package) -> {ok, RequestID} + when Package :: zx:package(), + RequestID :: id(). %% @doc %% Request a list of maintainers assigned to work on a given package. +%% Crashes the caller if either component of the Package is illegal. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `maint_result()'. -maintainers(Package) -> - request({maintainers, Package}). +maintainers({Realm, Name}) -> + true = zx_lib:valid_lower0_9(Realm), + true = zx_lib:valid_lower0_9(Name), + request({maintainers, Realm, Name}). --spec sysops(zx:realm()) -> ok. +-spec sysops(Realm) -> {ok, RequestID} + when Realm :: zx:realm(), + RequestID :: id(). %% @doc %% Request a list of sysops in charge of maintaining a given realm. What this %% effectively does is request the sysops of the prime host of the given realm. +%% Crashes the caller if the Realm string is illegal. +%% +%% Response messages are of the type `result()' where the third element is of the +%% type `sysops_result()'. sysops(Realm) -> + true = zx_lib:valid_lower0_9(Realm), request({sysops, Realm}). -%% Public Caster --spec request(action()) -> ok. +%% Request Caster +-spec request(action()) -> {ok, RequestID} + when RequestID :: integer(). %% @private %% Private function to wrap the necessary bits up. request(Action) -> - gen_server:cast(?MODULE, {request, make_ref(), self(), Action}). + gen_server:call(?MODULE, {request, self(), Action}). @@ -399,7 +518,7 @@ request(Action) -> -spec report(Message) -> ok when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]} | {redirect, Hosts :: [{zx:host(), [zx:realm()]}]} - | aborted + | failed | disconnected. %% @private %% Should only be called by a zx_conn. This function is how a zx_conn reports its @@ -424,7 +543,7 @@ result(Reference, Result) -> %% Function called by a connection when a subscribed update arrives. notify(Package, Message) -> - gen_server:cast(?MODULE, {notice, Package, Message}). + gen_server:cast(?MODULE, {notify, self(), Package, Message}). @@ -441,11 +560,21 @@ start_link() -> -spec init(none) -> {ok, state()}. init(none) -> + Blank = blank_state(), {ok, MX, CX} = init_connections(), - State = #s{mx = MX, cx = CX}, + State = Blank#s{mx = MX, cx = CX}, {ok, State}. +-spec blank_state() -> state(). +%% @private +%% Used to generate a correct, but exactly empty state. +%% Useful mostly for testing and validation, though also actually used in the program. + +blank_state() -> + #s{}. + + -spec init_connections() -> {ok, MX, CX} when MX :: monitor_index(), CX :: conn_index(). @@ -507,6 +636,15 @@ stop() -> %% @private %% gen_server callback for OTP calls +handle_call({request, list}, _, State = #s{cx = CX}) -> + Realms = cx_realms(CX), + {reply, {ok, Realms}, State}; +handle_call({request, Requestor, Action}, From, State = #s{id = ID}) -> + NewID = ID + 1, + _ = gen_server:reply(From, {ok, NewID}), + NextState = do_request(Requestor, Action, State#s{id = NewID}), + NewState = eval_queue(NextState), + {noreply, NewState}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call ~tp: ~tp", [From, Unexpected]), {noreply, State}. @@ -526,10 +664,6 @@ handle_cast({unsubscribe, Pid, Package}, State) -> NextState = do_unsubscribe(Pid, Package, State), NewState = eval_queue(NextState), {noreply, NewState}; -handle_cast({request, Ref, Requestor, Action}, State) -> - NextState = do_request(Ref, Requestor, Action, State), - NewState = eval_queue(NextState), - {noreply, NewState}; handle_cast({report, Conn, Message}, State) -> NextState = do_report(Conn, Message, State), NewState = eval_queue(NextState), @@ -538,8 +672,8 @@ handle_cast({result, Ref, Result}, State) -> NextState = do_result(Ref, Result, State), NewState = eval_queue(NextState), {noreply, NewState}; -handle_cast({notice, Package, Update}, State) -> - ok = do_notice(Package, Update, State), +handle_cast({notify, Conn, Package, Update}, State) -> + ok = do_notify(Conn, Package, Update, State), {noreply, State}; handle_cast(stop, State) -> {stop, normal, State}; @@ -551,6 +685,9 @@ handle_cast(Unexpected, State) -> %% @private %% gen_sever callback for general Erlang message handling +handle_info({'DOWN', Ref, process, Pid, Reason}, State) -> + NewState = clear_monitor(Pid, Ref, Reason, State), + {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. @@ -605,7 +742,7 @@ do_pass_meta(Meta, Home, ArgV, State) -> %% Enqueue a subscription request. do_subscribe(Pid, Package, State = #s{actions = Actions}) -> - NewActions = [{Pid, {subscribe, Package}} | Actions], + NewActions = [{subscribe, Pid, Package} | Actions], State#s{actions = NewActions}. @@ -618,21 +755,20 @@ do_subscribe(Pid, Package, State = #s{actions = Actions}) -> %% Clear or dequeue a subscription request. do_unsubscribe(Pid, Package, State = #s{actions = Actions}) -> - NewActions = [{Pid, {unsubscribe, Package}} | Actions], + NewActions = [{unsubscribe, Pid, Package} | Actions], State#s{actions = NewActions}. --spec do_request(Ref, Requestor, Action, State) -> NextState - when Ref :: reference(), - Requestor :: pid(), +-spec do_request(Requestor, Action, State) -> NextState + when Requestor :: pid(), Action :: action(), State :: state(), NextState :: state(). %% @private %% Enqueue requests and update relevant index. -do_request(Ref, Requestor, Action, State = #s{actions = Actions}) -> - NewActions = [{Ref, Requestor, Action} | Actions], +do_request(Requestor, Action, State = #s{id = ID, actions = Actions}) -> + NewActions = [{request, Requestor, ID, Action} | Actions], State#s{actions = NewActions}. @@ -652,47 +788,53 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> {assigned, NextCX} -> {NextMX, NextCX}; {unassigned, NextCX} -> - {ok, ScrubbedMX, []} = mx_del_monitor(Conn, conn, NextMX), + {ok, ScrubbedMX} = mx_del_monitor(Conn, conn, NextMX), ok = zx_conn:stop(Conn), {ScrubbedMX, NextCX} end, State#s{mx = NewMX, cx = NewCX}; do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) -> - {ok, NextMX} = mx_del_monitor(Conn, attempt, MX), + NextMX = mx_del_monitor(Conn, attempt, MX), {Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX), - {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX), + {NewMX, NewCX} = ensure_connection(Unassigned, NextMX, NextCX), State#s{mx = NewMX, cx = NewCX}; -do_report(Conn, aborted, State = #s{mx = MX, cx = CX}) -> - {ok, NewMX} = mx_del_monitor(Conn, attempt, MX), +do_report(Conn, failed, State = #s{mx = MX, cx = CX}) -> + NewMX = mx_del_monitor(Conn, attempt, MX), + failed(Conn, State#s{mx = NewMX}); +do_report(Conn, disconnected, State = #s{mx = MX}) -> + NewMX = mx_del_monitor(Conn, conn, MX), + disconnected(Conn, State#s{mx = NewMX}). + + +failed(Conn, State#s{mx = MX, cx = CX}) -> {Realms, NextCX} = cx_failed(Conn, CX), - {NewMX, NewCX} = init_connection(Realms, NextCX), - State#s{mx = NewMX, cx = NewCX}; -do_report(Conn, disconnected, State) -> - ScrubbedMX = mx_del_monitor(Conn, conn, MX), + {NewMX, NewCX} = ensure_connection(Realms, MX, NextCX), + State#s{mx = NewMX, cx = NewCX}. + + +disconnected(Conn, + State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> {Pending, LostSubs, ScrubbedCX} = cx_disconnected(Conn, CX), Unassigned = cx_unassigned(ScrubbedCX), - UnSub = fun(S) -> lists:member(element(2, S), LostSubs) end, - {UnSubbed, NewSubs} = lists:partition(UnSub, Subs), - ReSubs = [{S, {subscribe, P}} || {S, P} <- UnSubbed], + ReSubs = [{S, {subscribe, P}} || {S, P} <- LostSubs], {Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests), ReReqs = maps:to_list(Dequeued), NewActions = ReReqs ++ ReSubs ++ Actions, - {NewMX, NewCX} = init_connection(Unassigned, ScrubbedMX, ScrubbedCX), + {NewMX, NewCX} = ensure_connection(Unassigned, ScrubbedMX, ScrubbedCX), State#s{actions = NewActions, requests = NewRequests, - subs = NewSubs, mx = NewMX, cx = NewCX}. --spec dequeue(Pending) -> fun(K, V, {D, R}) -> {NewD, NewR} - when Pending :: [reference()], - K :: reference(), +-spec dequeue(Pending) -> fun((K, V, {D, R}) -> {NewD, NewR}) + when Pending :: [id()], + K :: id(), V :: request(), - D :: #{reference(), request()}, - R :: #{reference(), request()}, - NewD :: #{reference(), request()}, - NewR :: #{reference(), request()}. + D :: #{id() := request()}, + R :: #{id() := request()}, + NewD :: #{id() := request()}, + NewR :: #{id() := request()}. %% @private %% Return a function that partitions the current Request map into two maps, one that %% matches the closed `Pending' list of references and ones that don't. @@ -706,7 +848,7 @@ dequeue(Pending) -> end. --spec init_connection(Realms, MX, CX) -> {NewMX, NewCX} +-spec ensure_connection(Realms, MX, CX) -> {NewMX, NewCX} when Realms :: [zx:realm()], MX :: monitor_index(), CX :: conn_index(), @@ -717,7 +859,7 @@ dequeue(Pending) -> %% connected but unassigned nodes are alterantive providers of the needed realm. %% Returns updated monitor and connection indices. -init_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> +ensure_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> {NewMX, NewCX} = case maps:get(Realm, RMetas) of #rmeta{available = []} -> @@ -729,49 +871,74 @@ init_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> NextCX = CX#cx{realms = NewRMetas}, {MX, NextCX} end, - init_connection(Realms, NewMX, NewCX); -init_connection([], MX, CX) -> + ensure_connection(Realms, NewMX, NewCX); +ensure_connection([], MX, CX) -> {MX, CX}. --spec do_result(Reference, Result, State) -> NewState - when Reference :: reference(), - Result :: result(), - State :: state(), - NewState :: state(). +-spec do_result(ID, Result, State) -> NewState + when ID :: id(), + Result :: result(), + State :: state(), + NewState :: state(). %% @private %% Receive the result of a sent request and route it back to the original requestor. -do_result(Reference, Result, State = #s{requests = Requests}) -> - NewRequests = - case maps:take(Reference, Requests) of - {{Requestor, {Type, Object}}, NextRequests} -> - Requestor ! {result, {Type, Object, Result}}, - NextRequests; +do_result(ID, Result, State = #s{requests = Requests, dropped = Dropped, mx = MX}) -> + {NewDropped, NewRequests, NewMX} = + case maps:take(ID, Requests) of + {Request, NextRequests} -> + Requestor = element(1, Request), + Requestor ! {z_result, ID, Result}, + NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX), + {Dropped, NextRequests, NextMX}; error -> - Message = "Received unqueued result ~tp: ~tp", - ok = log(warning, Message, [Reference, Result]), - Requests + NextDropped = handle_orphan_result(ID, Result, Dropped), + {NextDropped, Requests, MX} end, - State#s{requests = NewRequests}. + State#s{requests = NewRequests, dropped = NewDropped, mx = NewMX}. --spec do_notice(Package, Update, State) -> ok - when Package :: zx:package(), - Update :: term(), +-spec handle_orphan_result(ID, Result, Dropped) -> NewDropped + when ID :: id(), + Result :: result(), + Dropped :: requests(), + NewDropped :: requests(). +%% @private +%% Log request results if they have been orphaned by their original requestor. +%% Log a warning if the result is totally unknown. + +handle_orphan_result(ID, Result, Dropped) -> + case maps:take(ID, Dropped) of + {Request, NewDropped} -> + Message = "Received orphan result for ~tp, ~tp: ~tp", + ok = log(info, Message, [ID, Request, Result]), + NewDropped; + error -> + Message = "Received unknown request result ~tp: ~tp", + ok = log(warning, Message, [ID, Result]), + Dropped + end. + + +-spec do_notify(Conn, Channel, Message, State) -> ok + when Conn :: pid(), + Channel :: term(), + Message :: term(), State :: state(). %% @private -%% Forward an update message to the subscriber. +%% Broadcast a subscription message to all subscribers of a channel. +%% At the moment the only possible sub channels are packages, but this will almost +%% certainly change in the future to include general realm update messages (new keys, +%% packages, user announcements, etc.) and whatever else becomes relevant as the +%% system evolves. The types here are deliberately a bit abstract to prevent future +%% type tracing with Dialyzer, since we know the functions calling this routine and +%% are already tightly typed. -do_notice(Package, Update, #s{subs = Subs}) -> - case maps:find(Package, Subs) of - {ok, Subscribers} -> - Notify = fun(P) -> P ! {Package, Update} end, - lists:foreach(Notify, Subscribers); - error -> - Message = "Received package update for 0 subscribers: {~tp, ~tp}", - log(warning, Message, [Package, Update]) - end. +do_notify(Conn, Channel, Message, #s{cx = CX}) -> + Subscribers = cx_get_subscribers(Conn, Channel, CX), + Notify = fun(P) -> P ! {z_sub, Channel, Message} end, + lists:foreach(Notify, Subscribers). -spec eval_queue(State) -> NewState @@ -789,92 +956,169 @@ eval_queue(State = #s{actions = Actions}) -> eval_queue([], State) -> State; -eval_queue([Action = {Pid, {subscribe, Package}} | Rest], - State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) -> - Realm = element(1, Package), - {NewActions, NewSubs, NewMX} = - case cx_resolve(Realm, CX) of - {ok, Conn} -> - ok = zx_conn:subscribe(Conn, Package), - NextSubs = [{Pid, Package} | Subs], - NextMX = mx_add_monitor(Pid, subscriber, MX), - {Actions, NextSubs, NextMX}; - unassigned -> - NextActions = [Action | Actions], - NextMX = mx_add_monitor(Pid, subscriber, MX), - {NextActions, Subs, NextMX}; - unconfigured -> - Pid ! {subscription, Realm, {error, bad_realm}}, - {Actions, Subs, MX} - end, - eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX}); -eval_queue([Action = {Pid, {unsubscribe, Package}} | Rest], - State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) -> - NewActions = lists:delete(Action, Actions), - NewSubs = lists:delete({Pid, Package}, Subs), - {ok, NewMX} = mx_del_monitor(Pid, subscriber, MX), - Realm = element(1, Package), - ok = - case cx_resolve(Realm, CX) of - {ok, Conn} -> cx_conn:unsubscribe(Conn, Package); - unassigned -> ok - end, - eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX}); -eval_queue([{Ref, Pid, {list, realms}} | Rest], State = #s{cx = CX}) -> - Realms = cx_realms(CX), - ok = send_result(Pid, Ref, {list, realms, {ok, Realms}}), - eval_queue(Rest, State); -eval_queue([Action = {Ref, Pid, {list, Realm}} | Rest], - State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) - when is_list(Realm) -> - {NewActions, NewRequests, NewMX} = - case cx_resolve(Realm, CX) of - {ok, Conn} -> - ok = cx_conn:list_packages(Conn, Ref), - Request = {Pid, list, Realm}, - NextRequests = maps:put(Ref, Request, Requests), - NextMX = mx_add_monitor(Pid, requestor, MX), - {Actions, NextRequests, NextMX}; - unassigned -> - NextMX = mx_add_monitor(Pid, requestor, MX), - NextActions = [Action | Actions], - {NextActions, Requests, NextMX}; - unconfigured -> - Pid ! {Ref, {list, Realm, {error, bad_realm}}}, - {Actions, Requests, MX} - end, - NewState = State#s{actions = NewActions, requests = NewRequests, mx = NewMX}, - eval_queue(Rest, NewState); -%% FIXME: Universalize the cx_resolve result, leave only message form explicit. -eval_queue([Action = {Ref, Pid, Message} | Rest], +eval_queue([Action = {request, Pid, ID, Message} | Rest], State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> - Realm = element(1, element(2, Message)), - {NewActions, NewRequests, NewMX} = - case cx_resolve(Realm, CX) of - {ok, Conn} -> - ok = cx_conn:list_packages(Conn, Ref), - NextRequests = maps:put(Ref, {Pid, Message}, Requests), + {NewActions, NewRequests, NewMX, NewCX} = + case dispatch_request(Message, ID, CX) of + {dispatched, NextCX} -> + NextRequests = maps:put(ID, {Pid, Message}, Requests), NextMX = mx_add_monitor(Pid, requestor, MX), - {Actions, NextRequests, NextMX}; - unassigned -> + {Actions, NextRequests, NextMX, NextCX}; + {result, Response} -> + Pid ! Response, + {Actions, Requests, CX}; + wait -> NextActions = [Action | Actions], - {NextActions, Requests, MX}; - unconfigured -> - Pid ! {Ref, {list, Realm, {error, bad_realm}}}, - {Actions, Requests, MX} + NextMX = mx_add_monitor(Pid, requestor, MX), + {NextActions, Requests, NextMX, CX} end, - NewState = State#s{actions = NewActions, requests = NewRequests, mx = NewMX}, - eval_queue(Rest, NewState). + NewState = + State#s{actions = NewActions, + requests = NewRequests, + mx = NewMX, + cx = NewCX}, + eval_queue(Rest, NewState); +eval_queue([Action = {subscribe, Pid, Package} | Rest], + State = #s{actions = Actions, mx = MX, cx = CX}) -> + {NewActions, NewMX, NewCX} = + case cx_add_sub(Pid, Package, CX) of + {need_sub, Conn, NextCX} -> + ok = zx_conn:subscribe(Conn, Package), + NextMX = mx_add_monitor(Pid, subscriber, MX), + {Actions, NextMX, NextCX}; + {have_sub, NextCX} + NextMX = mx_add_monitor(Pid, subscriber, MX), + {Actions, NextMX, NextCX}; + unassigned -> + NextMX = mx_add_monitor(Pid, subscriber, MX), + {[Action | Actions], NextMX, CX}; + unconfigured -> + Pid ! {z_sub, Package, {error, bad_realm}}, + {Actions, MX, CX} + end, + eval_queue(Rest, State#s{actions = NewActions, mx = NewMX, cx = NewCX}); +eval_queue([{unsubscribe, Pid, Package} | Rest], + State = #s{mx = MX, cx = CX}) -> + {ok, NewMX} = mx_del_monitor(Pid, {subscription, Package}, MX), + NewCX = + case cx_del_sub(Pid, Package, CX) of + {drop_sub, NextCX} -> + ok = zx_conn:unsubscribe(Conn, Package), + NextCX; + {keep_sub, NextCX} -> + NextCX; + unassigned -> + CX; + unconfigured -> + Message = "Received 'unsubscribe' request for unconfigured realm: ~tp", + ok = log(warning, Message, [Package]), + CX + end, + eval_queue(Rest, State#s{mx = NewMX, cx = NewCX}). --spec send_result(pid(), reference(), term()) -> ok. +-spec dispatch_request(Action, ID, CX) -> Result + when Action :: action(), + ID :: id(), + CX :: conn_index(), + Result :: {dispatched, NewCX} + | {result, Response} + | wait, + NewCX :: conn_index(), + Response :: result(). + +dispatch_request(list, ID, CX) -> + Realms = cx_realms(CX), + {result, ID, Realms}; +dispatch_request(Action, ID, CX) -> + Realm = element(2, Action), + case cx_pre_send(Realm, ID, CX) of + {ok, Conn, NewCX} -> + ok = zx_conn:make_request(Conn, ID, Action), + {dispatched, NewCX}; + unassigned -> + wait; + unconfigured -> + Error = {error, bad_realm}, + {result, ID, Error} + end. + + +-spec clear_monitor(Pid, Ref, Reason, State) -> NewState + when Pid :: pid(), + Ref :: reference(), + Reason :: term(), + State :: state(), + NewState :: state(). %% @private -%% Send a message by reference to a process. -%% Probably don't need this function. +%% Deal with a crashed requestor, subscriber or connector. -send_result(Pid, Ref, Message) -> - Pid ! {Ref, Message}, - ok. +clear_monitor(Pid, + Ref, + Reason, + State = #s{actions = Actions, + requests = Requests, + dropped = Dropped, + mx = MX, + cx = CX}) -> + case mx_crashed_monitor(Pid, MX) of + {attempt, NextMX} -> + failed( + {conn, NextMX} -> + disconnected(Pid, State#s{mx = NextMX}); + {{Reqs, Subs}, NextMX} -> + case mx_lookup_category(Pid, MX) of + attempt -> + do_report(Pid, failed, State); + conn -> + do_report(Pid, disconnected, State); + {Reqs, Subs} -> + NewActions = drop_actions(Pid, Actions), + {NewDropped, NewRequests} = drop_requests(Pid, Dropped, Requests), + NewCX = cx_clear_client(Pid, Reqs, Subs, CX), + NewMX = mx_del_monitor(Pid, crashed, MX), + State#s{actions = NewActions, + requests = NewRequests, + dropped = NewDropped, + mx = NewMX, + cx = NewCX}; + error -> + Unexpected = {'DOWN', Ref, process, Pid, Reason}, + ok = log(warning, "Unexpected info: ~tp", [Unexpected]), + State + end. + + +-spec drop_actions(Requestor, Actions) -> NewActions + when Requestor :: pid(), + Actions :: [request()], + NewActions :: [request()]. + +drop_actions(Pid, Actions) -> + Clear = + fun + ({request, P, _}) -> P /= Pid; + ({subscribe, P, _}) -> P /= Pid; + ({unsubscribe, _, _}) -> false + end, + lists:filter(Clear, Actions). + + +-spec drop_requests(ReqIDs, Dropped, Requests) -> {NewDropped, NewRequests} + when ReqIDs :: [id()], + Dropped :: requests(), + Requests :: requests(), + NewDropped :: requests(), + NewRequests :: requests(). + +drop_requests(ReqIDs, Dropped, Requests) -> + Partition = + fun(K, {Drop, Keep}) -> + {V, NewKeep} = maps:take(K, Keep), + NewDrop = maps:put(K, V, Drop), + {NewDrop, NewKeep} + end, + lists:fold(Partition, {Dropped, Requests}, ReqIDs). %-spec do_query_latest(Object, State) -> {Result, NewState} @@ -1042,10 +1286,10 @@ mx_upgrade_conn(Pid, MX) -> -spec mx_del_monitor(Conn, Category, MX) -> NewMX when Conn :: pid(), - Category :: subscriber - | requestor - | attempt - | conn, + Category :: attempt + | conn + | {requestor, id()}, + | {subscriber, Sub :: tuple()}, MX :: monitor_index(), NewMX :: monitor_index(). %% @private @@ -1053,26 +1297,52 @@ mx_upgrade_conn(Pid, MX) -> %% exists. Returns a tuple including the remaining request references in the case of %% a conn type. -mx_del_monitor(Pid, subscriber, MX) -> - case maps:take(Pid, MX) of - {{Ref, {1, 0}}, NewMX} -> - true = demonitor(Ref, [flush]), - NewMX; - {{Ref, {Subs, Reqs}}, NextMX} when Subs > 0 -> - maps:put(Pid, {Ref, {Subs - 1, Reqs}}, NextMX) - end; -mx_del_monitor(Pid, requestor, MX) -> - case maps:take(Pid, MX) of - {{Ref, {0, 1}}, NewMX} -> - true = demonitor(Ref, [flush]), - NewMX; - {{Ref, {Subs, Reqs}}, NextMX} when Reqs > 0 -> - maps:put(Pid, {Ref, {Subs, Reqs - 1}}, NextMX) - end; -mx_del_monitor(Pid, Category, MX) -> - {{Ref, Category}, NewMX} = maps;take(Pid, MX), +mx_del_monitor(Pid, attempt, MX) -> + {{Ref, attempt}, NewMX} = maps:take(Pid, MX), true = demonitor(Ref, [flush]), - NewMX. + NewMX; +mx_del_monitor(Pid, conn, MX) -> + {{Ref, conn}, NewMX} = maps:take(Pid, MX), + true = demonitor(Ref, [flush]), + NewMX; +mx_del_monitor(Pid, {requestor, ID}, MX) -> + case maps:take(Pid, MX) of + {{Ref, {[ID], []}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {{Ref, {Reqs, Subs}}, NextMX} when Reqs > 0 -> + NewReqs = lists:subtract(ID, Reqs), + maps:put(Pid, {NewReqs, Subs}, NextMX) + end, +mx_del_monitor(Pid, {subscriber, Sub}, MX) -> + case maps:take(Pid, MX) of + {{Ref, {[], [Package]}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {{Ref, {Reqs, Subs}}, NextMX} when Subs > 0 -> + NewSubs = lists:delete(Sub, Subs), + maps:put(Pid, {Ref, {Reqs, NewSubs}}, NextMX) + end. + + +-spec mx_crashed_monitor(Pid, MX) -> Result + when Pid :: pid(), + MX :: monitor_index(), + Result :: {Type, NewMX} + | error, + Type :: attempt + | conn + | {Reqs :: [id()], Subs :: [tuple()]}, + NewMX :: monitor_index(). + +mx_crashed_monitor(Pid, MX) -> + case maps:take(Pid, MX) of + {{Ref, Type}, NewMX} -> + true = demonitor(Mon, [flush]), + {Type, NewMX}; + error -> + error + end. -spec mx_lookup_category(Pid, MX) -> Result @@ -1080,20 +1350,15 @@ mx_del_monitor(Pid, Category, MX) -> MX :: monitor_index(), Result :: attempt | conn - | requestor - | subscriber - | sub_req. + | {Reqs :: [reference()], Subs :: [zx:package()]} + | error. %% @private %% Lookup a monitor's categories. mx_lookup_category(Pid, MX) -> - Monitor = maps:get(Pid, MX), - case element(2, Monitor) of - attempt -> attempt; - conn -> conn; - {0, _} -> requestor; - {_, 0} -> subscriber; - {_, _} -> sub_req + case maps:find(Pid, MX) of + {ok, Mon} -> element(2, Mon); + error -> error end. @@ -1294,7 +1559,7 @@ cx_next_host(Realm, CX = #cx{realms = Realms}) -> NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {Outcome, Host, NewCX}; - {ok, Meta = #rmeta{assigned = Conn}} when is_pid(Conn) -> + {ok, #rmeta{assigned = Conn}} when is_pid(Conn) -> ok = log(warning, "Call to cx_next_host/2 when connection assigned."), {error, connected, CX}; error -> @@ -1336,27 +1601,18 @@ cx_next_host(Meta = #rmeta{prime = Prime, private = Private, mirrors = Mirrors}) %% of a given realm, taking private mirrors first, then public mirrors, and ending %% with the prime node for the realm if no others exist. -cx_next_hosts(N, Realm, CX = #cx{realms = Realms}) -> - case maps:find(Realm, Realms) of - {ok, Meta = #rmeta{assigned = none}} -> cx_next_hosts2(N, Realm, Meta, CX); - {ok, #rmeta{assigned = Conn}} -> {error, {connected, Conn}}; - error -> {error, bad_realm} - end. +cx_next_hosts(N, Realm, CX) -> + cx_next_hosts(N, Realm, [], CX). -cx_next_hosts2(N, Realm, Meta, CX = #cx{realms = Realms}) -> - {ok, Hosts, NewMeta} = cx_next_hosts3(N, [], Meta), - NewRealms = maps:put(Realm, NewMeta, Realms), - {ok, Hosts, CX#cx{realms = NewRealms}}. - - -cx_next_hosts3(N, Hosts, Meta) when N < 1 -> - {ok, Hosts, Meta}; -cx_next_hosts3(N, Hosts, Meta) -> - case cx_next_host(Meta) of - {ok, Host, NewMeta} -> cx_next_hosts3(N - 1, [Host | Hosts], NewMeta); - {prime, Prime, NewMeta} -> {ok, [Prime | Hosts], NewMeta} - end. +cx_next_hosts(N, Realm, Hosts, CX) when N > 0 -> + case cx_next_host(Realm, CX) of + {ok, Host, NewCX} -> cx_next_hosts(N - 1, Realm, [Host | Hosts], NewCX); + {prime, Host, NewCX} -> {ok, [Host | Hosts], NewCX}; + Error -> Error + end; +cx_next_hosts(0, _, Hosts, CX) -> + {ok, Hosts, CX}. -spec cx_maybe_add_attempt(Host, Realm, CX) -> Result @@ -1372,7 +1628,7 @@ cx_maybe_add_attempt(Host, Realm, CX = #cx{attempts = Attempts}) -> false -> not_connected; {value, {Pid, Host, Realms}, NextAttempts} -> - NewAttempts = [{Pid, Host, [Realm | Realms]} | Attempts], + NewAttempts = [{Pid, Host, [Realm | Realms]} | NextAttempts], NewCX = CX#cx{attempts = NewAttempts}, {ok, NewCX} end. @@ -1459,7 +1715,7 @@ cx_connected(A, %% - Whether the host node is the prime node (and if not, ensure it is a member of %% of the mirror queue). -cx_connected(A, +cx_connected(_, Realm, #conn{pid = Pid, host = Prime}, Meta = #rmeta{prime = Prime, assigned = none}, @@ -1468,11 +1724,11 @@ cx_connected(A, NewRealms = maps:put(Realm, NewMeta, Realms), NewCX = CX#cx{realms = NewRealms}, {assigned, NewCX}; -cx_connected(A, +cx_connected(_, Realm, #conn{pid = Pid, host = Host}, Meta = #rmeta{mirrors = Mirrors, assigned = none}, - CX = #cx{realms = Realms, attempts = Attempts}) -> + CX = #cx{realms = Realms}) -> NewMirrors = cx_enqueue_unique(Host, Mirrors), NewMeta = Meta#rmeta{mirrors = NewMirrors, assigned = Pid}, NewRealms = maps:put(Realm, NewMeta, Realms), @@ -1518,7 +1774,7 @@ cx_enqueue_unique(Element, Queue) -> %% Remove a failed attempt and all its associations. cx_failed(Conn, CX = #cx{attempts = Attempts}) -> - {value, Attempt, NewAttempts = lists:keytake(Conn, 1, Attempts), + {value, Attempt, NewAttempts} = lists:keytake(Conn, 1, Attempts), Realms = element(3, Attempt), {Realms, CX#cx{attempts = NewAttempts}}. @@ -1640,3 +1896,148 @@ cx_resolve(Realm, #cx{realms = Realms}) -> {ok, #rmeta{assigned = Conn}} -> {ok, Conn}; error -> unconfigured end. + + +-spec cx_pre_send(Realm, ID, CX) -> Result + when Realm :: zx:realm(), + ID :: id(), + CX :: conn_index(), + Result :: {ok, pid(), NewCX :: conn_index()} + | unassigned + | unconfigured. +%% @private +%% Prepare a request to be sent by queueing it in the connection active request +%% reference list and returning the Pid of the connection handling the required realm +%% if it is available, otherwise return an atom indicating the status of the realm.. + +cx_pre_send(Realm, ID, CX = #cx{conns = Conns}) -> + case cx_resolve(Realm, CX) of + {ok, Pid} -> + {value, Conn, NextConns} = lists:keytake(Pid, #conn.pid, Conns), + #conn{requests = Requests} = Conn, + NewRequests = [ID | Requests], + NewConn = Conn#conn{requests = NewRequests}, + NewCX = CX#cx{conns = [NewConn | NextConns]}, + {ok, Pid, NewCX}; + NoGo -> + NoGo + end. + + +-spec cx_add_sub(Subscriber, Channel, CX) -> Result + when Subscriber :: pid(), + Channel :: tuple(), + CX :: conn_index(), + Result :: {need_sub, Conn, NewCX} + | {have_sub, NewCX} + | unassigned + | unconfigured, + Conn :: pid(), + NewCX :: conn_index(). +%% @private +%% Adds a subscription to the current list of subs, and returns a value indicating +%% whether the connection needs to be told to subscribe or not based on whether it +%% is already subscribed to that particular channel. + +cx_add_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> + Realm = element(1, Channel), + case cx_resolve(Realm, CX) of + {ok, Pid} -> + {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), + cx_maybe_new_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}); + Other -> + Other + end. + + +-spec cx_maybe_new_sub(Conn, Sub, CX) -> Result + when Conn :: connection(), + Sub :: {pid(), tuple()}, + CX :: conn_index(), + Result :: {need_sub, ConnPid, NewCX} + | {have_sub, NewCX}, + ConnPid :: pid(), + NewCX :: conn_index(). + +cx_maybe_new_sub(Conn = #conn{pid = ConnPid, subs = Subs}, + Sub = {Subscriber, Channel}, + CX = #cx{conns = Conns}) -> + NewSubs = [{Subscriber, Channel} | Subs], + NewConn = Conn#conn{subs = NewSubs}, + NewConns = [NewConn | NextConns], + NewCX = CX#cx{conns = NewConns}, + case lists:keymember(Channel, 2, Subs) of + false -> {need_sub, ConnPid, NewCX}; + true -> {have_sub, NewCX} + end. + + +-spec cx_del_sub(Subscriber, Channel, CX) -> Result + when Subscriber :: pid(), + Channel :: tuple(), + CX :: conn_index(), + Result :: {drop_sub, NewCX} + | {keep_sub, NewCX} + | unassigned + | unconfigured, + NewCX :: conn_index(). +%% @private +%% Remove a subscription from the list of subs, and return a value indicating whether +%% the connection needs to be told to unsubscribe entirely. + +cx_del_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> + Realm = element(1, Channel), + case cx_resolve(Realm, CX) of + {ok, Pid} -> + {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), + cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}; + Other -> + Other + end. + + +cx_maybe_last_sub(Conn = #conn{pid = ConnPid, subs = Subs}, + Sub = {Subscriber, Channel}, + CX = #cx{conns = Conns}) -> + NewSubs = lists:delete(Sub, Subs), + NewConn = Conn#conn{subs = NewSubs}, + NewConns = [NewConn | NextConns], + NewCX = CX#cx{conns = NewConns}, + MaybeDrop = + case lists:keymember(Channel, 2, NewSubs) of + false -> drop_sub; + true -> keep_sub + end, + {MaybeDrop, NewCX}. + + +-spec cx_get_subscribers(Conn, Channel, CX) -> Subscribers + when Conn :: pid(), + Channel :: term(), + CX :: conn_index(), + Subscribers :: [pid()]. + +cx_get_subscribers(Conn, Channel, #cx{conns = Conns}) -> + #conn{subs = Subs} = lists:keyfind(Conn, #conn.pid, Conns), + lists:fold(registered_to(Channel), [], Subs). + + +registered_to(Channel) -> + fun({P, C}, A) -> + case C == Channel of + true -> [P | A]; + false -> A + end + end. + + +cx_clear_client(Pid, DeadReqs, DeadSubs, CX = #cx{conns = Conns}) -> + DropSubs = [{S, Pid} || S <- DeadSubs], + Clear = + fun(C = #conn{requests = Requests, subs = Subs}) -> + NewSubs = lists:subtract(Subs, DropSubs), + NewRequests = lists:subtract(Requests, DeadReqs), + C#conn{requests = NewRequests, subs = NewSubs} + end, + NewConns = lists:map(Clear, Conns), + CX#cx{conns = NewConns}. diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_lib.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_lib.erl index 41e621e..935821c 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_lib.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_lib.erl @@ -14,13 +14,12 @@ -copyright("Craig Everett "). -license("GPL-3.0"). - -export([zomp_home/0, find_zomp_home/0, hosts_cache_file/1, get_prime/1, realm_meta/1, read_project_meta/0, read_project_meta/1, read_package_meta/1, write_project_meta/1, write_project_meta/2, write_terms/2, - valid_lower0_9/1, valid_label/1, + valid_lower0_9/1, valid_label/1, valid_version/1, string_to_version/1, version_to_string/1, package_id/1, package_string/1, package_dir/1, package_dir/2, @@ -139,7 +138,7 @@ read_project_meta(Dir) -> | {error, file:posix()}. read_package_meta({Realm, Name, Version}) -> - VersionString = Version, + {ok, VersionString} = version_to_string(Version), Path = filename:join([zomp_home(), "lib", Realm, Name, VersionString]), read_project_meta(Path). @@ -250,6 +249,26 @@ valid_label(_, _) -> false. +-spec valid_version(zx:version()) -> boolean(). + +valid_version({z, z, z}) -> + true; +valid_version({X, z, z}) + when is_integer(X), X >= 0 -> + true; +valid_version({X, Y, z}) + when is_integer(X), X >= 0, + is_integer(Y), Y >= 0 -> + true; +valid_version({X, Y, Z}) + when is_integer(X), X >= 0, + is_integer(Y), Y >= 0, + is_integer(Z), Z >= 0 -> + true; +valid_version(_) -> + false. + + -spec string_to_version(VersionString) -> Result when VersionString :: string(), Result :: {ok, zx:version()}