From 107da5f41d79811e714cca99a0923bf2341b9878 Mon Sep 17 00:00:00 2001 From: Craig Everett Date: Thu, 22 Feb 2018 08:19:44 +0900 Subject: [PATCH] Indexing madness --- zomp/lib/otpr-zx/0.1.0/src/zx.erl | 4 +- zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl | 4 +- zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl | 814 +++++++++++++++++------ 3 files changed, 619 insertions(+), 203 deletions(-) diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx.erl b/zomp/lib/otpr-zx/0.1.0/src/zx.erl index 4d10cf2..9c5b057 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx.erl @@ -25,6 +25,7 @@ -export([start/2, stop/1]). -export_type([serial/0, package_id/0, package/0, realm/0, name/0, version/0, + identifier/0, option/0, host/0, key_id/0, key_name/0, @@ -45,6 +46,7 @@ -type version() :: {Major :: non_neg_integer() | z, Minor :: non_neg_integer() | z, Patch :: non_neg_integer() | z}. +-type identifier() :: realm() | package() | package_id(). -type option() :: {string(), term()}. -type host() :: {string() | inet:ip_address(), inet:port_number()}. -type key_id() :: {realm(), key_name()}. @@ -55,7 +57,7 @@ -type label() :: [$a..$z | $0..$9 | $_ | $- | $.]. -type package_meta() :: #{package_id := package_id(), deps := [package_id()], - type := app | lib,}. + type := app | lib}. diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl index efad4ba..ba8d1c5 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl @@ -99,7 +99,7 @@ connect(Parent, Debug, {Host, Port}) -> confirm_service(Parent, Debug, Socket); {error, Error} -> ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]), - ok = zx_daemon:report(failed) + ok = zx_daemon:report(disconnected) terminate() end. @@ -174,7 +174,7 @@ loop(Parent, Debug, Socket) -> loop(Parent, Debug, Socket); stop -> ok = zx_net:disconnect(Socket), - terminat(); + terminate(); Unexpected -> ok = log(warning, "Unexpected message: ~tp", [Unexpected]), loop(Parent, Debug, Socket) diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index 2b0dc65..a684394 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -85,10 +85,11 @@ -export([pass_meta/3, - subscribe/1, unsubscribe/0, - list_packages/1, list_versions/1, query_latest/1, - fetch/1]). --export([report/1]). + subscribe/1, unsubscribe/1, + list/1, latest/1, + fetch/1, key/1, + pending/1, packagers/1, maintainers/1, sysops/1]). +-export([report/1, result/2, notice/2]). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). @@ -100,59 +101,124 @@ %%% Type Definitions -record(s, - {meta = none :: none | zx:package_meta(), - home = none :: none | file:filename(), - argv = none :: none | [string()], - reqp = none :: none | pid(), - reqm = none :: none | reference(), - action = none :: none | action(), - actions = queue:new() :: queue:queue(action()), - cx = cx_load() :: conn_index()}). + {meta = none :: none | zx:package_meta(), + home = none :: none | file:filename(), + argv = none :: none | [string()], + actions = [] :: [request() | {reference(), request()}], + responses = maps:new() :: #{reference() := request()}, + subs = [] :: [{pid(), zx:package()}], + mx = mx_new() :: monitor_index(), + cx = cx_load() :: conn_index()}). -record(cx, {realms = #{} :: #{zx:realm() := realm_meta()}, assigned = [] :: [{zx:realm(), pid()}], - attempts = [] :: [{pid(), reference(), zx:host()}], - conns = [] :: [{pid(), reference(), zx:host()}]}). + attempts = [] :: [{pid(), zx:host()}], + conns = [] :: [{pid(), zx:host()}]}). -record(rmeta, - {revision = 0 :: non_neg_integer(), - serial = 0 :: non_neg_integer(), - prime = {"zomp.psychobitch.party", 11311} :: zx:host(), - private = [] :: [zx:host()], - mirrors = queue:new() :: queue:queue(zx:host()), - realm_keys = [] :: [zx:key_meta()], - package_keys = [] :: [zx:key_meta()], - sysops = [] :: [zx:sysop_meta()]}). + {revision = 0 :: non_neg_integer(), + serial = 0 :: non_neg_integer(), + prime = {"zomp.tsuriai.jp", 11311} :: zx:host(), + private = [] :: [zx:host()], + mirrors = queue:new() :: queue:queue(zx:host()), + realm_keys = [] :: [zx:key_meta()], + package_keys = [] :: [zx:key_meta()], + sysops = [] :: [zx:sysop_meta()]}). + +%% State Types +-type state() :: #s{}. +-type monitor_index() :: [{reference(), pid(), category()}], +-type conn_index() :: #cx{}. +-type realm_meta() :: #rmeta{}. +-type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()} + | conn_attempt + | conn. + +%% Conn Communication +-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} + | disconnected. + +%% Subscriber / Requestor Communication +% Incoming Request messages +-type request() :: {Requestor :: pid(), Action :: action() | subunsub()}. +-type action() :: {list, realms | zx:identifier()} + | {latest, zx:identifier()} + | {fetch, zx:package_id()} + | {key, zx:key_id()} + | {pending, zx:package()} + | {packagers, zx:package()} + | {maintainers, zx:package()} + | {sysops, zx:realm()}. +-type subunsub() :: {subscribe, zx:package()} + | {unsubscribe, zx:package()}. + +% Outgoing Result Messages +-type result() :: sub_result() + | list_result() + | latest_result() + | fetch_result() + | key_result() + | pending_result() + | pack_result() + | maint_result() + | sysop_result(). +-type sub_result() :: {subscription, zx:package(), + Message :: {update, zx:package_id()} + | {error, bad_realm | bad_package}}. +-type list_result() :: {list, realms, + Message :: {ok, [zx:realm()]}} + | {list, zx:realm(), + Message :: {ok, [zx:name()]} + | {error, bad_realm | timeout}} + | {list, zx:package(), + Message :: {ok, [zx:version]} + | {error, bad_realm + | bad_package + | timeout}} +-type latest_result() :: {latest, + Package :: zx:package() + | zx:package_id(), + Message :: {ok, zx:package_id()} + | {error, bad_realm + | bad_package + | bad_version + | timeout}}. +-type fetch_result() :: {fetch, zx:package_id(), + Message :: {hops, non_neg_integer()} + | done + | {error, bad_realm + | bad_package + | bad_version + | timeout}}. +-type key_result() :: {key, zx:key_id(), + Message :: {ok, binary()} + | {error, bad_key + | timeout}}. +-type pending_result() :: {pending, zx:package(), + Message :: {ok, [zx:version()]} + | {error, bad_realm + | bad_package + | timeout}}. +-type pack_result() :: {packagers, zx:package(), + Message :: {ok, [zx:user()]} + | {error, bad_realm + | bad_package + | timeout}}. +-type maint_result() :: {maintainers, zx:package(), + Message :: {ok, [zx:user()]} + | {error, bad_realm + | bad_package + | timeout}}. +-type sysop_result() :: {sysops, zx:realm(), + Message :: {ok, [zx:user()]} + | {error, bad_host + | timeout}. --type state() :: #s{}. --type conn_index() :: #cx{}. --type realm_meta() :: #rmeta{}. - --type action() :: {subscribe, zx:package()} - | unsubscribe - | {list, zx:realm()} - | {list, zx:realm(), zx:name()} - | {list, zx:realm(), zx:name(), zx:version()} - | {latest, zx:realm()} - | {latest, zx:realm(), zx:name()} - | {latest, zx:realm(), zx:name(), zx:version()} - | {fetch, zx:package_id()} - | {key, zx:key_id()} - | {pending, zx:package()} - | {packagers, zx:package()} - | {maintainers, zx:package()} - | sysops - | {subscribe, zx:realm(), zx:name()}. --type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]} - | failed - | disconnected. - - -%%% Service Interface +%%% Requestor Interface -spec pass_meta(Meta, Dir, ArgV) -> ok when Meta :: zx:package_meta(), @@ -171,7 +237,7 @@ %% by the launching process (which normally terminates shortly thereafter). pass_meta(Meta, Dir, ArgV) -> - gen_server:call(?MODULE, {pass_meta, Meta, Dir, ArgV}). + gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). -spec subscribe(Package) -> Result @@ -189,83 +255,102 @@ pass_meta(Meta, Dir, ArgV) -> %% Other functions can be used to query the status of a package at an arbitrary time. subscribe(Package) -> - gen_server:call(?MODULE, {subscribe, self(), Package}). + gen_server:cast(?MODULE, {subscribe, self(), Package}). --spec unsubscribe() -> ok. +-spec unsubscribe(zx:package()) -> ok. %% @doc %% Instructs the daemon to unsubscribe if subscribed. Has no effect if not subscribed. -unsubscribe() -> - gen_server:call(?MODULE, unsubscribe). +unsubscribe(Package) -> + gen_server:cast(?MODULE, {unsubscribe, self(), Package}). --spec list_packages(Realm) -> Result - when Realm :: zx:realm(), - Result :: {ok, Packages :: [zx:package()]} - | {error, Reason}, - Reason :: bad_realm - | no_realm - | network. - -list_packages(Realm) -> - gen_server:call(?MODULE, {list, Realm}). - - --spec list_versions(Package) -> Result - when Package :: zx:package(), - Result :: {ok, Versions :: [zx:version()]} - | {error, Reason}, - Reason :: bad_realm - | bad_package - | network. +-spec list(Identifier) -> ok + when Identifier :: realms + | zx:identifier(). %% @doc -%% List all versions of a given package. Useful especially for developers wanting to -%% see a full list of maintained packages to include as dependencies. +%% Requests a list of realms, packages, or package versions depending on the +%% identifier provided. -list_versions(Package) -> - gen_server:call(?MODULE, {list_versions, Package}). +list(Identifier) -> + request({list, Identifier}). --spec query_latest(Object) -> Result - when Object :: zx:package() | zx:package_id(), - Result :: {ok, zx:version()} - | {error, Reason}, - Reason :: bad_realm - | bad_package - | bad_version - | network. +-spec latest(Identifier) -> ok. + when Identifier :: zx:package() | zx:package_id(). %% @doc -%% Check for the latest version of a package, with or without a version provided to -%% indicate subversion limit. Useful mostly for developers checking for a latest -%% version of a package. +%% Request the lastest version of a package within the scope of the identifier. +%% If a package with no version is provided, the absolute latest version will be +%% sent once queried. If a partial version is provided then only the latest version +%% within the provided scope will be returned. %% -%% While this function could be used as a primitive operation in a dynamic dependency -%% upgrade scheme, that is not its intent. You will eventually divide by zero trying -%% to implement such a feature, open a portal to Oblivion, and monsters will consume -%% all you love. See? That's a horrible idea. You have been warned. +%% If package {"foo", "bar"} has [{1,2,3}, {1,2,5}, {1,3,1}, {2,4,5}] available, +%% querying {"foo", "bar"} or {"foo", "bar", {z,z,z}} will result in {2,4,5}, but +%% querying {"foo", "bar", {1,z,z}} will result in {1,3,1}, granted that the realm +%% "foo" is reachable. -query_latest(Object) -> - gen_server:call(?MODULE, {query_latest, Object}). +latest(Identifier) -> + request({latest, Identifier}). --spec fetch(PackageIDs) -> Result - when PackageIDs :: [zx:package_id()], - Result :: {{ok, [zx:package_id()]}, - {error, [{zx:package_id(), Reason}]}}, - Reason :: bad_realm - | bad_package - | bad_version - | network. +-spec fetch(zx:package_id()) -> ok. %% @doc -%% Ensure a list of packages is available locally, fetching any missing packages in -%% the process. This is intended to abstract the task of ensuring that a list of -%% dependencies is available locally prior to building/running a dependent app or lib. +%% Ensure a package is available locally, or queue it for download otherwise. -fetch([]) -> - {{ok, []}, {error, []}}; -fetch(PackageIDs) -> - gen_server:call(?MODULE, {fetch, PackageIDs}). +fetch(PackageID) -> + request({fetch, PackageID}). + + +-spec key(zx:key_id()) -> ok. +%% @doc +%% Request a public key be fetched from its relevant realm. + +key(KeyID) -> + request({key, KeyID}). + + +-spec pending(zx:package()) -> ok. +%% @doc +%% Request the list of versions of a given package that have been submitted but not +%% signed and included in their relevant realm. + +pending(Package) -> + request({pending, Package}). + + +-spec packagers(zx:package()) -> ok. +%% @doc +%% Request a list of packagers assigned to work on a given package. + +packagers(Package) -> + request({packagers, Package}). + + +-spec maintainers(zx:package()) -> ok. +%% @doc +%% Request a list of maintainers assigned to work on a given package. + +maintainers(Package) -> + request({maintainers, Package}). + + +-spec sysops(zx:realm()) -> ok. +%% @doc +%% Request a list of sysops in charge of maintaining a given realm. What this +%% effectively does is request the sysops of the prime host of the given realm. + +sysops(Realm) -> + request({sysops, Realm}). + + +%% Public Caster +-spec request(action()) -> ok. +%% @private +%% Private function to wrap the necessary bits up. + +request(Action) -> + gen_server:cast(?MODULE, {request, make_ref(), self(), Action}), @@ -284,6 +369,25 @@ report(Message) -> gen_server:cast(?MODULE, {report, self(), Message}). +-spec result(reference(), result()) -> ok. +%% @private +%% Return a tagged result back to the daemon to be forwarded to the original requestor. + +result(Reference, Result) -> + gen_server:cast(?MODULE, {result, Reference, Result}). + + +-spec notice(Package, Message) -> ok + when Package :: zx:package(), + Message :: term(). +%% @private +%% Function called by a connection when a subscribed update arrives. + +notify(Package, Message) -> + gen_server:cast(?MODULE, {notice, Package, Message}). + + + %%% Startup -spec start_link() -> {ok, pid()} | {error, term()}. @@ -297,29 +401,33 @@ start_link() -> -spec init(none) -> {ok, state()}. init(none) -> - CX = cx_load(), - {ok, NewCX} = init_connections(CX), - State = #s{cx = NewCX}, + {ok, MX, CX} = init_connections(), + State = #s{mx = MX, cx = NewCX}, {ok, State}. -init_connections(CX) -> +init_connections() -> + CX = cx_load(), + MX = mx_new(), Realms = cx_realms(CX), - init_connections(Realms, CX). + init_connections(Realms, MX, CX). -init_connections([Realm | Realms], CX}) -> +init_connections([Realm | Realms], Monitors, CX) -> {ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX), StartConn = fun(Host) -> - {ok, Pid, Mon} = zx_conn:start_monitor(Host), - {Pid, Mon, Host} + {ok, Pid} = zx_conn:start(Host), + {Pid, Host} end, NewAttempts = lists:map(StartConn, Hosts), - NewCX = lists:map(fun cx_add_attempt/2, NextCX, NewAttempts), - init_connections(Realms, NewCX); -init_connections([], CX) -> - {ok, CX}. + AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, conn_attempt, M) end, + NewMX = lists:foldl(AddMonitor, MX, NewAttempts), + NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts), + init_connections(Realms, NewMX, NewCX); +init_connections([], MX, CX) -> + {ok, MX, CX}. + %%% gen_server @@ -327,18 +435,6 @@ init_connections([], CX) -> %% @private %% gen_server callback for OTP calls -handle_call({pass_meta, Meta, Dir, ArgV}, _, State) -> - {Result, NewState} = do_pass_meta(Requestor, Package, ArgV, State), - {reply, Result, NewState}; -handle_call({subscribe, Requestor, Package}, _, State) -> - {Result, NewState} = do_subscribe(Requestor, Package, State), - {reply, Result, NewState}; -handle_call({query_latest, Object}, _, State) -> - {Result, NewState} = do_query_latest(Object, State), - {reply, Result, NewState}; -handle_call({fetch, Packages}, _, State) -> - {Result, NewState} = do_fetch(Packages, State), - {reply, Result, NewState}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call ~tp: ~tp", [From, Unexpected]), {noreply, State}. @@ -347,11 +443,31 @@ handle_call(Unexpected, From, State) -> %% @private %% gen_server callback for OTP casts -handle_cast(unsubscribe, State) -> - NewState = do_unsubscribe(State), +handle_cast({report, Conn, Message}, State) -> + NextState = do_report(Conn, Message, State), + NewState = eval_queue(NextState), {noreply, NewState}; -handle_cast({report, From, Message}, State) -> - NewState = do_report(From, Message, State), +handle_cast({request, Ref, Requestor, Action}, State) -> + NextState = do_request(Ref, Requestpr, Action, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({result, Ref, Result}, State) -> + NextState = do_result(Ref, Result, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({notice, Package, Update}, State) -> + ok = do_notice(Package, Update, State), + {noreply, State}; +handle_cast({subscribe, Pid, Package}, State) -> + NextState = do_request(subscribe, Pid, Package, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({unsubscribe, Pid, Package}, State) -> + NextState = do_request(unsubscribe, Pid, Package, State), + NewState = eval_queue(NextState), + {noreply, NewState}; +handle_cast({pass_meta, Meta, Dir, ArgV}, State) -> + NewState = do_pass_meta(Meta, Dir, ArgV, State), {noreply, NewState}; handle_cast(Unexpected, State) -> ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), @@ -385,49 +501,236 @@ terminate(_, _) -> %%% Doer Functions --spec do_pass_meta(Meta, Dir, ArgV, State) -> {Result, NewState} +-spec do_pass_meta(Meta, Home, ArgV, State) -> NewState when Meta :: zx:package_meta(), - Dir :: file:filename(), + Home :: file:filename(), ArgV :: [string()], State :: state(), - Result :: ok, - Newstate :: state(). + NewState :: state(). -do_pass_meta(Meta, Dir, ArgV, State) -> - NewState = State#s{meta = Meta, dir = Dir, argv = ArgV}, - {ok, NewState}. +do_pass_meta(Meta, Home, ArgV, State) -> + PackageID = maps:get(package_id, Meta), + {ok, PackageString} = zx_lib:package_string(PackageID), + ok = log(info, "Received meta for ~tp.", [PackageString]), + State#s{meta = Meta, home = Home, argv = ArgV}. --spec do_subscribe(Requestor, Package, State) -> {Result, NewState} - when Requestor :: pid(), - Package :: zx:package(), +-spec do_request(Ref, Req, Action, State) -> NextState + when Ref :: reference(), + Req :: pid(), + Action :: action(), State :: state(), - Result :: ok - | {error, Reason}, - Reason :: illegal_requestor - | {already_subscribed, zx:package()}, - NewState :: state(). + NextState :: state(). +%% @private +%% Enqueue requests and update relevant index. -do_subscribe(Requestor, - {Realm, Name}, - State = #s{meta = none, reqp = none, - hosts = Hosts, serials = Serials}) -> - Monitor = monitor(process, Requestor), - {Host, NewHosts} = select_host(Realm, Hosts), - Serial = - case lists:keyfind(Realm, 1, Serials) of - false -> 0; - {Realm, S} -> S +do_request(subscribe, Req, Package, State = #s{actions = Actions}) -> + NewActions = [{Req, {subscribe, Package}} | Actions], + State#s{actions = NewActions}; +do_request(unsubscribe, Req, Package, State = #s{actions = Actions}) -> + NewActions = [{Req, {unsubscribe, Package}} | Actions], + State#s{actions = NewActions}; +do_request(Ref, Req, Action, State = #s{actions = Actions}) -> + NewActions = [{Ref, Req, Action} | Actions], + State#s{actions = NewActions}. + + +-spec do_report(Conn, Message, State) -> NewState + when Conn :: pid(), + Message :: conn_report(), + State :: state(), + NewState :: state(). +%% @private +%% Receive a report from a connection process and update the connection index and +%% possibly retry connections. + +do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> + {ok, NextMX} = mx_swap_categories(Conn, MX), + {NewMX, NewCX} = + case cx_connected(Realms, Conn, CX) of + {assigned, NextCX} -> + {NextMX, NextCX}; + {unassigned, NextCX} -> + ScrubbedMX = mx_del_monitor(Conn, conn, NextMX), + ok = zx_conn:stop(Conn), + {ScrubbedMX, NextCX} end, - {ok, ConnP} = zx_conn:start(Host), - ConnM = monitor(process, ConnP), - NewState = State#s{realm = Realm, name = Name, - connp = ConnP, connm = ConnM, - reqp = Requestor, reqm = Monitor, - hosts = NewHosts}, - {ok, NewState}; -do_subscribe(_, _, State = #s{realm = Realm, name = Name}) -> - {{error, {already_subscribed, {Realm, Name}}}, State}. + State#s{mx = NewMX, cx = NewCX}; +do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) -> + {Unassigned, NextMX, NextCX} = + case mx_lookup_category(Conn, MX) of + conn_attempt -> + {ok, [], ScrubbedCX} cx_disconnected(Conn, attempt, CX), + ScrubbedMX = mx_del_monitor(Conn, conn_attempt, MX), + {[], ScrubbedMX, ScrubbedCX}; + conn -> + {ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX), + ScrubbedMX = mx_del_monitor(Conn, conn, MX), + {Dropped, ScrubbedMX, ScrubbedCX} + end, + {NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX), + State#s{mx = NewMX, cx = NewCX}. + + +-spec init_connection(Realms, MX, CX) -> {NewMX, NewCX} + when Realms :: [zx:realm()], + MX :: monitor_index() + CX :: conn_index(), + NewMX :: monitor_index(), + NewCX :: conn_index(). +%% @private +%% Initiates a single connection and updates the relevant structures. + +init_connection([Realm | Realms], MX, CX) -> + {ok, Host, NextCX} = cx_next_hosts(Realm, CX), + {ok, Pid} = zx_conn:start(Host), + NewMX = mx_add_monitor(Pid, conn_attempt, MX), + NewCX = cx_add_attempt({Pid, Host}, CX), + init_connection(Realms, NewMX, NewCX); +init_connection([], MX, CX) -> + {MX, CX}. + + +-spec do_result(Ref, Result, State) -> NewState + when Ref :: reference(), + Result :: result(), + State :: state(), + NewState :: state(). +%% @private +%% Receive the result of a sent request and route it back to the original requestor. + +do_result(Ref, Result, State = #s{responses = Responses}) -> + NewResponses = + case maps:take(Ref, Responses) of + {{Req, {Type, Object}}, NextResponses} -> + Req ! {result, {Type, Object, Result}}, + NextResponses; + error -> + ok = log(warning, "Received unqueued result ~tp:~tp", [Ref, Result]), + Responses + end, + State#s{responses = NewResponses}. + + +-spec do_notice(Package, Update, State) -> ok + when Package :: zx:package(), + Update :: term(), + State :: state(). +%% @private +%% Forward an update message to the subscriber. + +do_notice(Package, Update, #s{subs = Subs}) -> + case maps:find(Package, Subs) of + {ok, Subscribers} -> + Notify = fun(P) -> P ! {Package, Update} end, + lists:foreach(Notify, Subscribers); + error -> + Message = "Received package update for 0 subscribers: {~tp, ~tp}", + log(warning, Message, [Package, Update]) + end. + + +-spec eval_queue(State) -> NewState + when State :: state(), + NewState :: state(). +%% @private +%% This is one of the two engines that drives everything, the other being do_report/3. +%% This function must iterate as far as it can into the request queue, adding response +%% entries to the pending response structure as it goes. + +eval_queue(State = #s{actions = Actions}) -> + InOrder = lists:reverse(Actions), + eval_queue(InOrder, State#s{actions = []}). + + +eval_queue([], State) -> + State; +eval_queue([Action = {Pid, {subscribe, Package}} | Rest], + State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) -> + Realm = element(1, Package), + {NewActions, NewSubs, NewMX} = + case cx_resolve(Realm, CX) of + {ok, Conn} -> + ok = zx_conn:subscribe(Package), + NextSubs = [{Pid, Package} | Subs], + NextMX = mx_add_monitor(Pid, subscriber, MX), + {Actions, NextSubs, NextMX}; + unassigned -> + NextActions = [Action | Actions], + NextMX = mx_add_monitor(Pid, subscriber, MX), + {NextActions, Subs, NextMX}; + unconfigured -> + Pid ! {subscription, Realm, {error, bad_realm}}, + {Actions, Subs, MX} + end, + eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX}); +eval_queue([Action = {Pid, {unsubscribe, Package}} | Rest], + State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) -> + NewActions = lists:delete(Action, Actions), + NewSubs = lists:delete({Pid, Package}, Subs), + NewMX = mx_del_monitor(Pid, subscriber, MX), + Realm = element(1, Package), + ok = + case cx_resolve(Realm, CX) of + {ok, Conn} -> cx_conn:unsubscribe(Package); + unassigned -> ok + end, + eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX}); +eval_queue([{Ref, Pid, {list, realms}} | Rest], State = #s{cx = CX}) -> + Realms = cx_realms(CX), + ok = send_result(Pid, Ref, {list, realms, {ok, Realms}}), + eval_queue(Rest, State); +eval_queue([Action = {Ref, Pid, {list, Realm}} | Rest], + State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX}) + when is_list(Realm) -> + {NewActions, NewResponses, NewMX} = + case cx_resolve(Realm, CX) of + {ok, Conn} -> + ok = cx_conn:list_packages(Conn, Ref), + Request = {Pid, list, Realm}, + NextRequests = maps:put(Ref, Request, Requests), + NextMX = mx_add_monitor(Pid, requestor, MX), + {Actions, NextRequests, NextMX}; + unassigned -> + NextMX = mx_add_monitor(Pid, requestor, MX), + NextActions = [Action | Actions], + {NextActions, Responses, NextMX}; + unconfigured -> + Pid ! {Ref, {list, Realm, {error, bad_realm}}}, + {Actions, Responses, MX} + end, + NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX}, + eval_queue(Rest, NewState); +%% FIXME: Universalize the cx_resolve result, leave only message form explicit. +eval_queue([Action = {Ref, Pid, Message} | Actions], + State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX}) + Realm = element(1, element(2, Message)), + {NewActions, NewResponses, NewMX} = + case cx_resolve(Realm, CX) of + {ok, Conn} -> + ok = cx_conn:list_packages(Conn, Ref), + NextResponses = maps:put(Ref, {Pid, Message}, Responses), + NextMX = mx_add_monitor(Pid, requestor, MX), + {Actions, NextResponses, NextMX}; + unassigned -> + NextActions = [Action | Actions], + {NextActions, Responses, MX}; + unconfigured -> + Pid ! {Ref, {list, Realm, {error, bad_realm}}} + {Actions, Responses, MX} + end, + NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX}, + eval_queue(Rest, NewState). + + +-spec send_result(pid(), reference(), term()) -> ok. +%% @private +%% Send a message by reference to a process. +%% Probably don't need this function. + +send_result(Pid, Ref, Message) -> + Pid ! {Ref, Message}, + ok. -spec do_query_latest(Object, State) -> {Result, NewState} @@ -443,14 +746,14 @@ do_subscribe(_, _, State = #s{realm = Realm, name = Name}) -> %% Queries a zomp realm for the latest version of a package or package %% version (complete or incomplete version number). -query_latest(Socket, {Realm, Name}) -> - ok = send(Socket, {latest, Realm, Name}), +do_query_latest(Socket, {Realm, Name}) -> + ok = zx_net:send(Socket, {latest, Realm, Name}), receive {tcp, Socket, Bin} -> binary_to_term(Bin) after 5000 -> {error, timeout} end; -query_latest(Socket, {Realm, Name, Version}) -> - ok = send(Socket, {latest, Realm, Name, Version}), +do_query_latest(Socket, {Realm, Name, Version}) -> + ok = zx_net:send(Socket, {latest, Realm, Name, Version}), receive {tcp, Socket, Bin} -> binary_to_term(Bin) after 5000 -> {error, timeout} @@ -471,22 +774,6 @@ do_unsubscribe(State = #s{connp = ConnP, connm = ConnM}) -> {ok, NewState}. --spec do_report(From, Message, State) -> NewState - when From :: pid(), - Message :: conn_report(), - State :: state(), - NewState :: state(). - -do_report(From, {connected, Realms}, State) -> - % FIXME - ok = log(info, - "Would do_report(~tp, {connected, ~tp}, ~tp) here", - [From, Realms, State]), - State. - - - - -spec do_fetch(PackageIDs) -> Result when PackageIDs :: [zx:package_id()], Result :: ok @@ -564,7 +851,120 @@ scrub(Deps) -> -%%% Connection Cache ADT Interface Functions +%%% Monitor Index ADT Interface Functions +%%% +%%% Very simple structure, but explicit handling of it becomes bothersome in other +%%% code, so it is all just packed down here. + +-spec mx_new() -> monitor_index(). +%% @private +%% Returns a new, empty monitor index. + +mx_new() -> []. + + +-spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index(). +%% @private +%% Begin monitoring the given Pid, keeping track of its category. + +mx_add_monitor(Pid, subscriber, MX) -> + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> + [{Ref, Pid, {Subs + 1, Reqs}} | NextMX]; + false -> + Ref = monitor(process, Pid), + [{Ref, Pid, {1, 0}} | MX] + end; +mx_add_monitor(Pid, requestor, MX) -> + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> + [{Ref, Pid, {Subs, Reqs + 1}} | NextMX]; + false -> + Ref = monitor(process, Pid), + [{Ref, Pid, {0, 1}} | MX] + end; +mx_add_monitor(Pid, conn_attempt, MX) -> + false = lists:keymember(Pid, 2, MX), + Ref = monitor(process, Pid), + [{Ref, Pid, conn_attempt} | MX]; +mx_add_monitor(Pid, conn, MX) -> + {value, {Ref, Pid, conn_attempt}, NextMX} = lists:keytake(Pid, 2, MX), + [{Ref, Pid, conn} | NextMX], + + +-spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index(). +%% @private +%% Drop a monitor category, removing the entire monitor in the case only one category +%% exists. + +mx_del_monitor(Pid, subscriber, MX) -> + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {1, 0}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> + [{Ref, Pid, {Subs - 1, Reqs}} | NextMX]; + false -> + MX + end; +mx_del_monitor(Pid, requestor, MX) -> + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, {0, 1}}, NextMX} -> + true = demonitor(Ref, [flush]), + NextMX; + {value, {Ref, Pid, {Subs, Reqs}}, NextMX} -> + [{Ref, Pid, {Subs, Reqs - 1}} | NextMX]; + false -> + MX + end; +mx_del_monitor(Pid, Category, MX) -> + {value, {Ref, Pid, Category}, NewMX} = lists:keytake(Pid, 2, MX), + true = demonitor(Ref, [flush]), + NewMX. + + +-spec mx_lookup_category(pid(), monitor_index()) -> Result + when Result :: conn_attempt + | conn + | requestor + | subscriber + | sub_req + | error. +%% @private +%% Lookup a monitor's categories. + +mx_lookup_category(Pid, MX) -> + case lists:keyfind(Pid, 2, MX) of + {_, _, conn_attempt} -> conn_attempt; + {_, _, conn} -> conn; + {_, _, {0, _}} -> requestor; + {_, _, {_, 0}} -> subscriber; + {_, _, _} -> sub_req; + false -> error + end. + + +-spec mx_upgrade_conn(Pid, MX) -> Result + when Pid :: pid(), + MX :: monitor_index(), + Result :: {ok, NewMX} + | {error, not_found}, + NewMX :: monitor_index(). +%% @private +%% Upgrade a conn_attempt to a conn. + +mx_upgrade_conn(Pid, MX) -> + case lists:keytake(Pid, 2, MX) of + {value, {Ref, Pid, conn_attempt}, NextMX} -> + NewMX = [{Ref, Pid, conn} | NextMX], + {ok, NewMX}; + false -> + {error, not_found} + end. + + + +%%% Connection Index ADT Interface Functions %%% %%% Functions to manipulate the conn_index() data type are in this section. This %%% data should be treated as abstract by functions outside of this section, as it is @@ -880,7 +1280,7 @@ enqueue_unique(Element, Queue) -> -spec cx_disconnected(Conn, CX) -> Result when Conn :: pid(), CX :: conn_index(), - Result :: {ok, Mon, UnassignedRealms, NewCX} + Result :: {ok, UnassignedRealms, NewCX} | {error, unknown}, Mon :: reference(), UnassignedRealms :: [zx:realm()], @@ -891,12 +1291,20 @@ enqueue_unique(Element, Queue) -> %% realms, and returns the monitor reference of the pid, a list of realms that are now %% unassigned, and an updated connection index. -cx_disconnected(Conn, CX = #cx{assigned = Assigned, conns = Conns}) -> - case lists:keytake(Con, Conns) of - {value, {Pid, Mon, Conn}, NewConns} -> - {UnassignedRealms, NewAssigned} = cx_scrub_assigned(Pid, Assigned), +cx_disconnected(Conn, attempt, #cx{attempts = Attempts}) -> + case lists:keytake(Conn, 1, Attempts) of + {value, {Conn, Host}, NewAttempts} -> + NewCX = CX#cx{attempts = NewAttempts}, + {ok, [], NewCX}; + false -> + {error, unknown} + end; +cx_disconnected(Conn, conn, CX = #cx{assigned = Assigned, conns = Conns}) -> + case lists:keytake(Conn, 1, Conns) of + {value, {Conn, Host}, NewConns} -> + {UnassignedRealms, NewAssigned} = cx_scrub_assigned(Conn, Assigned), NewCX = CX#cx{assigned = NewAssigned, conns = NewConns}, - {ok, Mon, UnassignedRealms, NewCX}; + {ok, UnassignedRealms, NewCX}; false -> {error, unknown} end. @@ -927,13 +1335,19 @@ cx_scrub_assigned(_, [], Unassigned, Assigned) -> when Realm :: zx:realm(), CX :: conn_index(), Result :: {ok, Conn :: pid()} - | unassigned. + | unassigned + | unconfigured. %% @private %% Check the registry of assigned realms and return the pid of the appropriate %% connection, or an `unassigned' indication if the realm is not yet connected. -cx_resolve(Realm, #cx{assigned = Assigned}) -> +cx_resolve(Realm, #cx{realms = Realms, assigned = Assigned}) -> case lists:keyfind(Realm, 1, Assigned) of - {Realm, Conn} -> {ok, Conn}; - false -> unassigned + {Realm, Conn} -> + {ok, Conn}; + false -> + case maps:is_key(Realm, Realms) -> + true -> unassigned; + false -> unconfigured + end end.