Indexing madness

This commit is contained in:
Craig Everett 2018-02-22 08:19:44 +09:00
parent e5308bbad0
commit 107da5f41d
3 changed files with 619 additions and 203 deletions

View File

@ -25,6 +25,7 @@
-export([start/2, stop/1]).
-export_type([serial/0, package_id/0, package/0, realm/0, name/0, version/0,
identifier/0,
option/0,
host/0,
key_id/0, key_name/0,
@ -45,6 +46,7 @@
-type version() :: {Major :: non_neg_integer() | z,
Minor :: non_neg_integer() | z,
Patch :: non_neg_integer() | z}.
-type identifier() :: realm() | package() | package_id().
-type option() :: {string(), term()}.
-type host() :: {string() | inet:ip_address(), inet:port_number()}.
-type key_id() :: {realm(), key_name()}.
@ -55,7 +57,7 @@
-type label() :: [$a..$z | $0..$9 | $_ | $- | $.].
-type package_meta() :: #{package_id := package_id(),
deps := [package_id()],
type := app | lib,}.
type := app | lib}.

View File

@ -99,7 +99,7 @@ connect(Parent, Debug, {Host, Port}) ->
confirm_service(Parent, Debug, Socket);
{error, Error} ->
ok = log(warning, "Connection problem with ~tp: ~tp", [Node, Error]),
ok = zx_daemon:report(failed)
ok = zx_daemon:report(disconnected)
terminate()
end.
@ -174,7 +174,7 @@ loop(Parent, Debug, Socket) ->
loop(Parent, Debug, Socket);
stop ->
ok = zx_net:disconnect(Socket),
terminat();
terminate();
Unexpected ->
ok = log(warning, "Unexpected message: ~tp", [Unexpected]),
loop(Parent, Debug, Socket)

View File

@ -85,10 +85,11 @@
-export([pass_meta/3,
subscribe/1, unsubscribe/0,
list_packages/1, list_versions/1, query_latest/1,
fetch/1]).
-export([report/1]).
subscribe/1, unsubscribe/1,
list/1, latest/1,
fetch/1, key/1,
pending/1, packagers/1, maintainers/1, sysops/1]).
-export([report/1, result/2, notice/2]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
@ -103,56 +104,121 @@
{meta = none :: none | zx:package_meta(),
home = none :: none | file:filename(),
argv = none :: none | [string()],
reqp = none :: none | pid(),
reqm = none :: none | reference(),
action = none :: none | action(),
actions = queue:new() :: queue:queue(action()),
actions = [] :: [request() | {reference(), request()}],
responses = maps:new() :: #{reference() := request()},
subs = [] :: [{pid(), zx:package()}],
mx = mx_new() :: monitor_index(),
cx = cx_load() :: conn_index()}).
-record(cx,
{realms = #{} :: #{zx:realm() := realm_meta()},
assigned = [] :: [{zx:realm(), pid()}],
attempts = [] :: [{pid(), reference(), zx:host()}],
conns = [] :: [{pid(), reference(), zx:host()}]}).
attempts = [] :: [{pid(), zx:host()}],
conns = [] :: [{pid(), zx:host()}]}).
-record(rmeta,
{revision = 0 :: non_neg_integer(),
serial = 0 :: non_neg_integer(),
prime = {"zomp.psychobitch.party", 11311} :: zx:host(),
prime = {"zomp.tsuriai.jp", 11311} :: zx:host(),
private = [] :: [zx:host()],
mirrors = queue:new() :: queue:queue(zx:host()),
realm_keys = [] :: [zx:key_meta()],
package_keys = [] :: [zx:key_meta()],
sysops = [] :: [zx:sysop_meta()]}).
%% State Types
-type state() :: #s{}.
-type monitor_index() :: [{reference(), pid(), category()}],
-type conn_index() :: #cx{}.
-type realm_meta() :: #rmeta{}.
-type category() :: {Subs :: non_neg_integer(), Reqs :: non_neg_integer()}
| conn_attempt
| conn.
-type action() :: {subscribe, zx:package()}
| unsubscribe
| {list, zx:realm()}
| {list, zx:realm(), zx:name()}
| {list, zx:realm(), zx:name(), zx:version()}
| {latest, zx:realm()}
| {latest, zx:realm(), zx:name()}
| {latest, zx:realm(), zx:name(), zx:version()}
%% Conn Communication
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
| disconnected.
%% Subscriber / Requestor Communication
% Incoming Request messages
-type request() :: {Requestor :: pid(), Action :: action() | subunsub()}.
-type action() :: {list, realms | zx:identifier()}
| {latest, zx:identifier()}
| {fetch, zx:package_id()}
| {key, zx:key_id()}
| {pending, zx:package()}
| {packagers, zx:package()}
| {maintainers, zx:package()}
| sysops
| {subscribe, zx:realm(), zx:name()}.
-type conn_report() :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
| failed
| disconnected.
| {sysops, zx:realm()}.
-type subunsub() :: {subscribe, zx:package()}
| {unsubscribe, zx:package()}.
% Outgoing Result Messages
-type result() :: sub_result()
| list_result()
| latest_result()
| fetch_result()
| key_result()
| pending_result()
| pack_result()
| maint_result()
| sysop_result().
-type sub_result() :: {subscription, zx:package(),
Message :: {update, zx:package_id()}
| {error, bad_realm | bad_package}}.
-type list_result() :: {list, realms,
Message :: {ok, [zx:realm()]}}
| {list, zx:realm(),
Message :: {ok, [zx:name()]}
| {error, bad_realm | timeout}}
| {list, zx:package(),
Message :: {ok, [zx:version]}
| {error, bad_realm
| bad_package
| timeout}}
-type latest_result() :: {latest,
Package :: zx:package()
| zx:package_id(),
Message :: {ok, zx:package_id()}
| {error, bad_realm
| bad_package
| bad_version
| timeout}}.
-type fetch_result() :: {fetch, zx:package_id(),
Message :: {hops, non_neg_integer()}
| done
| {error, bad_realm
| bad_package
| bad_version
| timeout}}.
-type key_result() :: {key, zx:key_id(),
Message :: {ok, binary()}
| {error, bad_key
| timeout}}.
-type pending_result() :: {pending, zx:package(),
Message :: {ok, [zx:version()]}
| {error, bad_realm
| bad_package
| timeout}}.
-type pack_result() :: {packagers, zx:package(),
Message :: {ok, [zx:user()]}
| {error, bad_realm
| bad_package
| timeout}}.
-type maint_result() :: {maintainers, zx:package(),
Message :: {ok, [zx:user()]}
| {error, bad_realm
| bad_package
| timeout}}.
-type sysop_result() :: {sysops, zx:realm(),
Message :: {ok, [zx:user()]}
| {error, bad_host
| timeout}.
%%% Service Interface
%%% Requestor Interface
-spec pass_meta(Meta, Dir, ArgV) -> ok
when Meta :: zx:package_meta(),
@ -171,7 +237,7 @@
%% by the launching process (which normally terminates shortly thereafter).
pass_meta(Meta, Dir, ArgV) ->
gen_server:call(?MODULE, {pass_meta, Meta, Dir, ArgV}).
gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}).
-spec subscribe(Package) -> Result
@ -189,83 +255,102 @@ pass_meta(Meta, Dir, ArgV) ->
%% Other functions can be used to query the status of a package at an arbitrary time.
subscribe(Package) ->
gen_server:call(?MODULE, {subscribe, self(), Package}).
gen_server:cast(?MODULE, {subscribe, self(), Package}).
-spec unsubscribe() -> ok.
-spec unsubscribe(zx:package()) -> ok.
%% @doc
%% Instructs the daemon to unsubscribe if subscribed. Has no effect if not subscribed.
unsubscribe() ->
gen_server:call(?MODULE, unsubscribe).
unsubscribe(Package) ->
gen_server:cast(?MODULE, {unsubscribe, self(), Package}).
-spec list_packages(Realm) -> Result
when Realm :: zx:realm(),
Result :: {ok, Packages :: [zx:package()]}
| {error, Reason},
Reason :: bad_realm
| no_realm
| network.
list_packages(Realm) ->
gen_server:call(?MODULE, {list, Realm}).
-spec list_versions(Package) -> Result
when Package :: zx:package(),
Result :: {ok, Versions :: [zx:version()]}
| {error, Reason},
Reason :: bad_realm
| bad_package
| network.
-spec list(Identifier) -> ok
when Identifier :: realms
| zx:identifier().
%% @doc
%% List all versions of a given package. Useful especially for developers wanting to
%% see a full list of maintained packages to include as dependencies.
%% Requests a list of realms, packages, or package versions depending on the
%% identifier provided.
list_versions(Package) ->
gen_server:call(?MODULE, {list_versions, Package}).
list(Identifier) ->
request({list, Identifier}).
-spec query_latest(Object) -> Result
when Object :: zx:package() | zx:package_id(),
Result :: {ok, zx:version()}
| {error, Reason},
Reason :: bad_realm
| bad_package
| bad_version
| network.
-spec latest(Identifier) -> ok.
when Identifier :: zx:package() | zx:package_id().
%% @doc
%% Check for the latest version of a package, with or without a version provided to
%% indicate subversion limit. Useful mostly for developers checking for a latest
%% version of a package.
%% Request the lastest version of a package within the scope of the identifier.
%% If a package with no version is provided, the absolute latest version will be
%% sent once queried. If a partial version is provided then only the latest version
%% within the provided scope will be returned.
%%
%% While this function could be used as a primitive operation in a dynamic dependency
%% upgrade scheme, that is not its intent. You will eventually divide by zero trying
%% to implement such a feature, open a portal to Oblivion, and monsters will consume
%% all you love. See? That's a horrible idea. You have been warned.
%% If package {"foo", "bar"} has [{1,2,3}, {1,2,5}, {1,3,1}, {2,4,5}] available,
%% querying {"foo", "bar"} or {"foo", "bar", {z,z,z}} will result in {2,4,5}, but
%% querying {"foo", "bar", {1,z,z}} will result in {1,3,1}, granted that the realm
%% "foo" is reachable.
query_latest(Object) ->
gen_server:call(?MODULE, {query_latest, Object}).
latest(Identifier) ->
request({latest, Identifier}).
-spec fetch(PackageIDs) -> Result
when PackageIDs :: [zx:package_id()],
Result :: {{ok, [zx:package_id()]},
{error, [{zx:package_id(), Reason}]}},
Reason :: bad_realm
| bad_package
| bad_version
| network.
-spec fetch(zx:package_id()) -> ok.
%% @doc
%% Ensure a list of packages is available locally, fetching any missing packages in
%% the process. This is intended to abstract the task of ensuring that a list of
%% dependencies is available locally prior to building/running a dependent app or lib.
%% Ensure a package is available locally, or queue it for download otherwise.
fetch([]) ->
{{ok, []}, {error, []}};
fetch(PackageIDs) ->
gen_server:call(?MODULE, {fetch, PackageIDs}).
fetch(PackageID) ->
request({fetch, PackageID}).
-spec key(zx:key_id()) -> ok.
%% @doc
%% Request a public key be fetched from its relevant realm.
key(KeyID) ->
request({key, KeyID}).
-spec pending(zx:package()) -> ok.
%% @doc
%% Request the list of versions of a given package that have been submitted but not
%% signed and included in their relevant realm.
pending(Package) ->
request({pending, Package}).
-spec packagers(zx:package()) -> ok.
%% @doc
%% Request a list of packagers assigned to work on a given package.
packagers(Package) ->
request({packagers, Package}).
-spec maintainers(zx:package()) -> ok.
%% @doc
%% Request a list of maintainers assigned to work on a given package.
maintainers(Package) ->
request({maintainers, Package}).
-spec sysops(zx:realm()) -> ok.
%% @doc
%% Request a list of sysops in charge of maintaining a given realm. What this
%% effectively does is request the sysops of the prime host of the given realm.
sysops(Realm) ->
request({sysops, Realm}).
%% Public Caster
-spec request(action()) -> ok.
%% @private
%% Private function to wrap the necessary bits up.
request(Action) ->
gen_server:cast(?MODULE, {request, make_ref(), self(), Action}),
@ -284,6 +369,25 @@ report(Message) ->
gen_server:cast(?MODULE, {report, self(), Message}).
-spec result(reference(), result()) -> ok.
%% @private
%% Return a tagged result back to the daemon to be forwarded to the original requestor.
result(Reference, Result) ->
gen_server:cast(?MODULE, {result, Reference, Result}).
-spec notice(Package, Message) -> ok
when Package :: zx:package(),
Message :: term().
%% @private
%% Function called by a connection when a subscribed update arrives.
notify(Package, Message) ->
gen_server:cast(?MODULE, {notice, Package, Message}).
%%% Startup
-spec start_link() -> {ok, pid()} | {error, term()}.
@ -297,29 +401,33 @@ start_link() ->
-spec init(none) -> {ok, state()}.
init(none) ->
CX = cx_load(),
{ok, NewCX} = init_connections(CX),
State = #s{cx = NewCX},
{ok, MX, CX} = init_connections(),
State = #s{mx = MX, cx = NewCX},
{ok, State}.
init_connections(CX) ->
init_connections() ->
CX = cx_load(),
MX = mx_new(),
Realms = cx_realms(CX),
init_connections(Realms, CX).
init_connections(Realms, MX, CX).
init_connections([Realm | Realms], CX}) ->
init_connections([Realm | Realms], Monitors, CX) ->
{ok, Hosts, NextCX} = cx_next_hosts(3, Realm, CX),
StartConn =
fun(Host) ->
{ok, Pid, Mon} = zx_conn:start_monitor(Host),
{Pid, Mon, Host}
{ok, Pid} = zx_conn:start(Host),
{Pid, Host}
end,
NewAttempts = lists:map(StartConn, Hosts),
NewCX = lists:map(fun cx_add_attempt/2, NextCX, NewAttempts),
init_connections(Realms, NewCX);
init_connections([], CX) ->
{ok, CX}.
AddMonitor = fun({P, _}, M) -> mx_add_monitor(P, conn_attempt, M) end,
NewMX = lists:foldl(AddMonitor, MX, NewAttempts),
NewCX = lists:foldl(fun cx_add_attempt/2, NextCX, NewAttempts),
init_connections(Realms, NewMX, NewCX);
init_connections([], MX, CX) ->
{ok, MX, CX}.
%%% gen_server
@ -327,18 +435,6 @@ init_connections([], CX) ->
%% @private
%% gen_server callback for OTP calls
handle_call({pass_meta, Meta, Dir, ArgV}, _, State) ->
{Result, NewState} = do_pass_meta(Requestor, Package, ArgV, State),
{reply, Result, NewState};
handle_call({subscribe, Requestor, Package}, _, State) ->
{Result, NewState} = do_subscribe(Requestor, Package, State),
{reply, Result, NewState};
handle_call({query_latest, Object}, _, State) ->
{Result, NewState} = do_query_latest(Object, State),
{reply, Result, NewState};
handle_call({fetch, Packages}, _, State) ->
{Result, NewState} = do_fetch(Packages, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
@ -347,11 +443,31 @@ handle_call(Unexpected, From, State) ->
%% @private
%% gen_server callback for OTP casts
handle_cast(unsubscribe, State) ->
NewState = do_unsubscribe(State),
handle_cast({report, Conn, Message}, State) ->
NextState = do_report(Conn, Message, State),
NewState = eval_queue(NextState),
{noreply, NewState};
handle_cast({report, From, Message}, State) ->
NewState = do_report(From, Message, State),
handle_cast({request, Ref, Requestor, Action}, State) ->
NextState = do_request(Ref, Requestpr, Action, State),
NewState = eval_queue(NextState),
{noreply, NewState};
handle_cast({result, Ref, Result}, State) ->
NextState = do_result(Ref, Result, State),
NewState = eval_queue(NextState),
{noreply, NewState};
handle_cast({notice, Package, Update}, State) ->
ok = do_notice(Package, Update, State),
{noreply, State};
handle_cast({subscribe, Pid, Package}, State) ->
NextState = do_request(subscribe, Pid, Package, State),
NewState = eval_queue(NextState),
{noreply, NewState};
handle_cast({unsubscribe, Pid, Package}, State) ->
NextState = do_request(unsubscribe, Pid, Package, State),
NewState = eval_queue(NextState),
{noreply, NewState};
handle_cast({pass_meta, Meta, Dir, ArgV}, State) ->
NewState = do_pass_meta(Meta, Dir, ArgV, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
@ -385,49 +501,236 @@ terminate(_, _) ->
%%% Doer Functions
-spec do_pass_meta(Meta, Dir, ArgV, State) -> {Result, NewState}
-spec do_pass_meta(Meta, Home, ArgV, State) -> NewState
when Meta :: zx:package_meta(),
Dir :: file:filename(),
Home :: file:filename(),
ArgV :: [string()],
State :: state(),
Result :: ok,
Newstate :: state().
do_pass_meta(Meta, Dir, ArgV, State) ->
NewState = State#s{meta = Meta, dir = Dir, argv = ArgV},
{ok, NewState}.
-spec do_subscribe(Requestor, Package, State) -> {Result, NewState}
when Requestor :: pid(),
Package :: zx:package(),
State :: state(),
Result :: ok
| {error, Reason},
Reason :: illegal_requestor
| {already_subscribed, zx:package()},
NewState :: state().
do_subscribe(Requestor,
{Realm, Name},
State = #s{meta = none, reqp = none,
hosts = Hosts, serials = Serials}) ->
Monitor = monitor(process, Requestor),
{Host, NewHosts} = select_host(Realm, Hosts),
Serial =
case lists:keyfind(Realm, 1, Serials) of
false -> 0;
{Realm, S} -> S
do_pass_meta(Meta, Home, ArgV, State) ->
PackageID = maps:get(package_id, Meta),
{ok, PackageString} = zx_lib:package_string(PackageID),
ok = log(info, "Received meta for ~tp.", [PackageString]),
State#s{meta = Meta, home = Home, argv = ArgV}.
-spec do_request(Ref, Req, Action, State) -> NextState
when Ref :: reference(),
Req :: pid(),
Action :: action(),
State :: state(),
NextState :: state().
%% @private
%% Enqueue requests and update relevant index.
do_request(subscribe, Req, Package, State = #s{actions = Actions}) ->
NewActions = [{Req, {subscribe, Package}} | Actions],
State#s{actions = NewActions};
do_request(unsubscribe, Req, Package, State = #s{actions = Actions}) ->
NewActions = [{Req, {unsubscribe, Package}} | Actions],
State#s{actions = NewActions};
do_request(Ref, Req, Action, State = #s{actions = Actions}) ->
NewActions = [{Ref, Req, Action} | Actions],
State#s{actions = NewActions}.
-spec do_report(Conn, Message, State) -> NewState
when Conn :: pid(),
Message :: conn_report(),
State :: state(),
NewState :: state().
%% @private
%% Receive a report from a connection process and update the connection index and
%% possibly retry connections.
do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
{ok, NextMX} = mx_swap_categories(Conn, MX),
{NewMX, NewCX} =
case cx_connected(Realms, Conn, CX) of
{assigned, NextCX} ->
{NextMX, NextCX};
{unassigned, NextCX} ->
ScrubbedMX = mx_del_monitor(Conn, conn, NextMX),
ok = zx_conn:stop(Conn),
{ScrubbedMX, NextCX}
end,
{ok, ConnP} = zx_conn:start(Host),
ConnM = monitor(process, ConnP),
NewState = State#s{realm = Realm, name = Name,
connp = ConnP, connm = ConnM,
reqp = Requestor, reqm = Monitor,
hosts = NewHosts},
{ok, NewState};
do_subscribe(_, _, State = #s{realm = Realm, name = Name}) ->
{{error, {already_subscribed, {Realm, Name}}}, State}.
State#s{mx = NewMX, cx = NewCX};
do_report(Conn, disconnected, State = #s{mx = MX, cx = CX}) ->
{Unassigned, NextMX, NextCX} =
case mx_lookup_category(Conn, MX) of
conn_attempt ->
{ok, [], ScrubbedCX} cx_disconnected(Conn, attempt, CX),
ScrubbedMX = mx_del_monitor(Conn, conn_attempt, MX),
{[], ScrubbedMX, ScrubbedCX};
conn ->
{ok, Dropped, ScrubbedCX} = cx_disconnected(Conn, conn, CX),
ScrubbedMX = mx_del_monitor(Conn, conn, MX),
{Dropped, ScrubbedMX, ScrubbedCX}
end,
{NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX),
State#s{mx = NewMX, cx = NewCX}.
-spec init_connection(Realms, MX, CX) -> {NewMX, NewCX}
when Realms :: [zx:realm()],
MX :: monitor_index()
CX :: conn_index(),
NewMX :: monitor_index(),
NewCX :: conn_index().
%% @private
%% Initiates a single connection and updates the relevant structures.
init_connection([Realm | Realms], MX, CX) ->
{ok, Host, NextCX} = cx_next_hosts(Realm, CX),
{ok, Pid} = zx_conn:start(Host),
NewMX = mx_add_monitor(Pid, conn_attempt, MX),
NewCX = cx_add_attempt({Pid, Host}, CX),
init_connection(Realms, NewMX, NewCX);
init_connection([], MX, CX) ->
{MX, CX}.
-spec do_result(Ref, Result, State) -> NewState
when Ref :: reference(),
Result :: result(),
State :: state(),
NewState :: state().
%% @private
%% Receive the result of a sent request and route it back to the original requestor.
do_result(Ref, Result, State = #s{responses = Responses}) ->
NewResponses =
case maps:take(Ref, Responses) of
{{Req, {Type, Object}}, NextResponses} ->
Req ! {result, {Type, Object, Result}},
NextResponses;
error ->
ok = log(warning, "Received unqueued result ~tp:~tp", [Ref, Result]),
Responses
end,
State#s{responses = NewResponses}.
-spec do_notice(Package, Update, State) -> ok
when Package :: zx:package(),
Update :: term(),
State :: state().
%% @private
%% Forward an update message to the subscriber.
do_notice(Package, Update, #s{subs = Subs}) ->
case maps:find(Package, Subs) of
{ok, Subscribers} ->
Notify = fun(P) -> P ! {Package, Update} end,
lists:foreach(Notify, Subscribers);
error ->
Message = "Received package update for 0 subscribers: {~tp, ~tp}",
log(warning, Message, [Package, Update])
end.
-spec eval_queue(State) -> NewState
when State :: state(),
NewState :: state().
%% @private
%% This is one of the two engines that drives everything, the other being do_report/3.
%% This function must iterate as far as it can into the request queue, adding response
%% entries to the pending response structure as it goes.
eval_queue(State = #s{actions = Actions}) ->
InOrder = lists:reverse(Actions),
eval_queue(InOrder, State#s{actions = []}).
eval_queue([], State) ->
State;
eval_queue([Action = {Pid, {subscribe, Package}} | Rest],
State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) ->
Realm = element(1, Package),
{NewActions, NewSubs, NewMX} =
case cx_resolve(Realm, CX) of
{ok, Conn} ->
ok = zx_conn:subscribe(Package),
NextSubs = [{Pid, Package} | Subs],
NextMX = mx_add_monitor(Pid, subscriber, MX),
{Actions, NextSubs, NextMX};
unassigned ->
NextActions = [Action | Actions],
NextMX = mx_add_monitor(Pid, subscriber, MX),
{NextActions, Subs, NextMX};
unconfigured ->
Pid ! {subscription, Realm, {error, bad_realm}},
{Actions, Subs, MX}
end,
eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX});
eval_queue([Action = {Pid, {unsubscribe, Package}} | Rest],
State = #s{actions = Actions, subs = Subs, mx = MX, cx = CX}) ->
NewActions = lists:delete(Action, Actions),
NewSubs = lists:delete({Pid, Package}, Subs),
NewMX = mx_del_monitor(Pid, subscriber, MX),
Realm = element(1, Package),
ok =
case cx_resolve(Realm, CX) of
{ok, Conn} -> cx_conn:unsubscribe(Package);
unassigned -> ok
end,
eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX});
eval_queue([{Ref, Pid, {list, realms}} | Rest], State = #s{cx = CX}) ->
Realms = cx_realms(CX),
ok = send_result(Pid, Ref, {list, realms, {ok, Realms}}),
eval_queue(Rest, State);
eval_queue([Action = {Ref, Pid, {list, Realm}} | Rest],
State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX})
when is_list(Realm) ->
{NewActions, NewResponses, NewMX} =
case cx_resolve(Realm, CX) of
{ok, Conn} ->
ok = cx_conn:list_packages(Conn, Ref),
Request = {Pid, list, Realm},
NextRequests = maps:put(Ref, Request, Requests),
NextMX = mx_add_monitor(Pid, requestor, MX),
{Actions, NextRequests, NextMX};
unassigned ->
NextMX = mx_add_monitor(Pid, requestor, MX),
NextActions = [Action | Actions],
{NextActions, Responses, NextMX};
unconfigured ->
Pid ! {Ref, {list, Realm, {error, bad_realm}}},
{Actions, Responses, MX}
end,
NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX},
eval_queue(Rest, NewState);
%% FIXME: Universalize the cx_resolve result, leave only message form explicit.
eval_queue([Action = {Ref, Pid, Message} | Actions],
State = #s{actions = Actions, responses = Responses, mx = MX, cx = CX})
Realm = element(1, element(2, Message)),
{NewActions, NewResponses, NewMX} =
case cx_resolve(Realm, CX) of
{ok, Conn} ->
ok = cx_conn:list_packages(Conn, Ref),
NextResponses = maps:put(Ref, {Pid, Message}, Responses),
NextMX = mx_add_monitor(Pid, requestor, MX),
{Actions, NextResponses, NextMX};
unassigned ->
NextActions = [Action | Actions],
{NextActions, Responses, MX};
unconfigured ->
Pid ! {Ref, {list, Realm, {error, bad_realm}}}
{Actions, Responses, MX}
end,
NewState = State#s{actions = NewActions, responses = NewResponses, mx = NewMX},
eval_queue(Rest, NewState).
-spec send_result(pid(), reference(), term()) -> ok.
%% @private
%% Send a message by reference to a process.
%% Probably don't need this function.
send_result(Pid, Ref, Message) ->
Pid ! {Ref, Message},
ok.
-spec do_query_latest(Object, State) -> {Result, NewState}
@ -443,14 +746,14 @@ do_subscribe(_, _, State = #s{realm = Realm, name = Name}) ->
%% Queries a zomp realm for the latest version of a package or package
%% version (complete or incomplete version number).
query_latest(Socket, {Realm, Name}) ->
ok = send(Socket, {latest, Realm, Name}),
do_query_latest(Socket, {Realm, Name}) ->
ok = zx_net:send(Socket, {latest, Realm, Name}),
receive
{tcp, Socket, Bin} -> binary_to_term(Bin)
after 5000 -> {error, timeout}
end;
query_latest(Socket, {Realm, Name, Version}) ->
ok = send(Socket, {latest, Realm, Name, Version}),
do_query_latest(Socket, {Realm, Name, Version}) ->
ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
receive
{tcp, Socket, Bin} -> binary_to_term(Bin)
after 5000 -> {error, timeout}
@ -471,22 +774,6 @@ do_unsubscribe(State = #s{connp = ConnP, connm = ConnM}) ->
{ok, NewState}.
-spec do_report(From, Message, State) -> NewState
when From :: pid(),
Message :: conn_report(),
State :: state(),
NewState :: state().
do_report(From, {connected, Realms}, State) ->
% FIXME
ok = log(info,
"Would do_report(~tp, {connected, ~tp}, ~tp) here",
[From, Realms, State]),
State.
-spec do_fetch(PackageIDs) -> Result
when PackageIDs :: [zx:package_id()],
Result :: ok
@ -564,7 +851,120 @@ scrub(Deps) ->
%%% Connection Cache ADT Interface Functions
%%% Monitor Index ADT Interface Functions
%%%
%%% Very simple structure, but explicit handling of it becomes bothersome in other
%%% code, so it is all just packed down here.
-spec mx_new() -> monitor_index().
%% @private
%% Returns a new, empty monitor index.
mx_new() -> [].
-spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index().
%% @private
%% Begin monitoring the given Pid, keeping track of its category.
mx_add_monitor(Pid, subscriber, MX) ->
case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
[{Ref, Pid, {Subs + 1, Reqs}} | NextMX];
false ->
Ref = monitor(process, Pid),
[{Ref, Pid, {1, 0}} | MX]
end;
mx_add_monitor(Pid, requestor, MX) ->
case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
[{Ref, Pid, {Subs, Reqs + 1}} | NextMX];
false ->
Ref = monitor(process, Pid),
[{Ref, Pid, {0, 1}} | MX]
end;
mx_add_monitor(Pid, conn_attempt, MX) ->
false = lists:keymember(Pid, 2, MX),
Ref = monitor(process, Pid),
[{Ref, Pid, conn_attempt} | MX];
mx_add_monitor(Pid, conn, MX) ->
{value, {Ref, Pid, conn_attempt}, NextMX} = lists:keytake(Pid, 2, MX),
[{Ref, Pid, conn} | NextMX],
-spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index().
%% @private
%% Drop a monitor category, removing the entire monitor in the case only one category
%% exists.
mx_del_monitor(Pid, subscriber, MX) ->
case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, {1, 0}}, NextMX} ->
true = demonitor(Ref, [flush]),
NextMX;
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
[{Ref, Pid, {Subs - 1, Reqs}} | NextMX];
false ->
MX
end;
mx_del_monitor(Pid, requestor, MX) ->
case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, {0, 1}}, NextMX} ->
true = demonitor(Ref, [flush]),
NextMX;
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
[{Ref, Pid, {Subs, Reqs - 1}} | NextMX];
false ->
MX
end;
mx_del_monitor(Pid, Category, MX) ->
{value, {Ref, Pid, Category}, NewMX} = lists:keytake(Pid, 2, MX),
true = demonitor(Ref, [flush]),
NewMX.
-spec mx_lookup_category(pid(), monitor_index()) -> Result
when Result :: conn_attempt
| conn
| requestor
| subscriber
| sub_req
| error.
%% @private
%% Lookup a monitor's categories.
mx_lookup_category(Pid, MX) ->
case lists:keyfind(Pid, 2, MX) of
{_, _, conn_attempt} -> conn_attempt;
{_, _, conn} -> conn;
{_, _, {0, _}} -> requestor;
{_, _, {_, 0}} -> subscriber;
{_, _, _} -> sub_req;
false -> error
end.
-spec mx_upgrade_conn(Pid, MX) -> Result
when Pid :: pid(),
MX :: monitor_index(),
Result :: {ok, NewMX}
| {error, not_found},
NewMX :: monitor_index().
%% @private
%% Upgrade a conn_attempt to a conn.
mx_upgrade_conn(Pid, MX) ->
case lists:keytake(Pid, 2, MX) of
{value, {Ref, Pid, conn_attempt}, NextMX} ->
NewMX = [{Ref, Pid, conn} | NextMX],
{ok, NewMX};
false ->
{error, not_found}
end.
%%% Connection Index ADT Interface Functions
%%%
%%% Functions to manipulate the conn_index() data type are in this section. This
%%% data should be treated as abstract by functions outside of this section, as it is
@ -880,7 +1280,7 @@ enqueue_unique(Element, Queue) ->
-spec cx_disconnected(Conn, CX) -> Result
when Conn :: pid(),
CX :: conn_index(),
Result :: {ok, Mon, UnassignedRealms, NewCX}
Result :: {ok, UnassignedRealms, NewCX}
| {error, unknown},
Mon :: reference(),
UnassignedRealms :: [zx:realm()],
@ -891,12 +1291,20 @@ enqueue_unique(Element, Queue) ->
%% realms, and returns the monitor reference of the pid, a list of realms that are now
%% unassigned, and an updated connection index.
cx_disconnected(Conn, CX = #cx{assigned = Assigned, conns = Conns}) ->
case lists:keytake(Con, Conns) of
{value, {Pid, Mon, Conn}, NewConns} ->
{UnassignedRealms, NewAssigned} = cx_scrub_assigned(Pid, Assigned),
cx_disconnected(Conn, attempt, #cx{attempts = Attempts}) ->
case lists:keytake(Conn, 1, Attempts) of
{value, {Conn, Host}, NewAttempts} ->
NewCX = CX#cx{attempts = NewAttempts},
{ok, [], NewCX};
false ->
{error, unknown}
end;
cx_disconnected(Conn, conn, CX = #cx{assigned = Assigned, conns = Conns}) ->
case lists:keytake(Conn, 1, Conns) of
{value, {Conn, Host}, NewConns} ->
{UnassignedRealms, NewAssigned} = cx_scrub_assigned(Conn, Assigned),
NewCX = CX#cx{assigned = NewAssigned, conns = NewConns},
{ok, Mon, UnassignedRealms, NewCX};
{ok, UnassignedRealms, NewCX};
false ->
{error, unknown}
end.
@ -927,13 +1335,19 @@ cx_scrub_assigned(_, [], Unassigned, Assigned) ->
when Realm :: zx:realm(),
CX :: conn_index(),
Result :: {ok, Conn :: pid()}
| unassigned.
| unassigned
| unconfigured.
%% @private
%% Check the registry of assigned realms and return the pid of the appropriate
%% connection, or an `unassigned' indication if the realm is not yet connected.
cx_resolve(Realm, #cx{assigned = Assigned}) ->
cx_resolve(Realm, #cx{realms = Realms, assigned = Assigned}) ->
case lists:keyfind(Realm, 1, Assigned) of
{Realm, Conn} -> {ok, Conn};
false -> unassigned
{Realm, Conn} ->
{ok, Conn};
false ->
case maps:is_key(Realm, Realms) ->
true -> unassigned;
false -> unconfigured
end
end.