diff --git a/zomp/lib/otpr-zx/0.1.0/src/.zx_daemon.erl.swp b/zomp/lib/otpr-zx/0.1.0/src/.zx_daemon.erl.swp new file mode 100644 index 0000000..1b7e5a7 Binary files /dev/null and b/zomp/lib/otpr-zx/0.1.0/src/.zx_daemon.erl.swp differ diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl index 4cefbad..ec6644f 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_conn.erl @@ -303,3 +303,109 @@ handle_timeout(Socket) -> terminate() -> exit(normal). + + + +%-spec do_query_latest(Object, State) -> {Result, NewState} +% when Object :: zx:package() | zx:package_id(), +% State :: state(), +% Result :: {ok, zx:version()} +% | {error, Reason}, +% Reason :: bad_realm +% | bad_package +% | bad_version, +% NewState :: state(). +%% @private +%% Queries a zomp realm for the latest version of a package or package +%% version (complete or incomplete version number). +% +%do_query_latest(Socket, {Realm, Name}) -> +% ok = zx_net:send(Socket, {latest, Realm, Name}), +% receive +% {tcp, Socket, Bin} -> binary_to_term(Bin) +% after 5000 -> {error, timeout} +% end; +%do_query_latest(Socket, {Realm, Name, Version}) -> +% ok = zx_net:send(Socket, {latest, Realm, Name, Version}), +% receive +% {tcp, Socket, Bin} -> binary_to_term(Bin) +% after 5000 -> {error, timeout} +% end. + + +%-spec do_fetch(PackageIDs, State) -> NewState +% when PackageIDs :: [zx:package_id()], +% State :: state(), +% NewState :: state(), +% Result :: ok +% | {error, Reason}, +% Reason :: bad_realm +% | bad_package +% | bad_version +% | network. +%% @private +%% +% +%do_fetch(PackageIDs, State) -> +% FIXME: Need to create a job queue divided by realm and dispatched to connectors, +% and cleared from the master pending queue kept here by the daemon as the +% workers succeed. Basic task queue management stuff... which never existed +% in ZX before... grrr... +% case scrub(PackageIDs) of +% [] -> +% ok; +% Needed -> +% Partitioned = partition_by_realm(Needed), +% EnsureDeps = +% fun({Realm, Packages}) -> +% ok = zx_conn:queue_package(Pid, Realm, Packages), +% log(info, "Disconnecting from realm: ~ts", [Realm]) +% end, +% lists:foreach(EnsureDeps, Partitioned) +% end. +% +% +%partition_by_realm(PackageIDs) -> +% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs), +% maps:to_list(PartitionMap). +% +% +%partition_by_realm({R, P, V}, M) -> +% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M). +% +% +%ensure_deps(_, _, []) -> +% ok; +%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) -> +% ok = ensure_dep(Socket, {Realm, Name, Version}), +% ensure_deps(Socket, Realm, Rest). +% +% +%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return(). +%% @private +%% Given an PackageID as an argument, check whether its package file exists in the +%% system cache, and if not download it. Should return `ok' whenever the file is +%% sourced, but exit with an error if it cannot locate or acquire the package. +% +%ensure_dep(Socket, PackageID) -> +% ZrpFile = filename:join("zrp", namify_zrp(PackageID)), +% ok = +% case filelib:is_regular(ZrpFile) of +% true -> ok; +% false -> fetch(Socket, PackageID) +% end, +% ok = install(PackageID), +% build(PackageID). +% +% +%-spec scrub(Deps) -> Scrubbed +% when Deps :: [package_id()], +% Scrubbed :: [package_id()]. +%% @private +%% Take a list of dependencies and return a list of dependencies that are not yet +%% installed on the system. +% +%scrub([]) -> +% []; +%scrub(Deps) -> +% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps). diff --git a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl index caf10fe..b078f0b 100644 --- a/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl +++ b/zomp/lib/otpr-zx/0.1.0/src/zx_daemon.erl @@ -1,30 +1,31 @@ %%% @doc %%% ZX Daemon %%% -%%% Resident execution daemon and runtime interface to Zomp. +%%% Resident task daemon and runtime interface to Zomp. %%% %%% The daemon resides in the background once started and awaits query requests and -%%% subscriptions from from other processes. The daemon is only capable of handling +%%% subscriptions from other processes. The daemon is only capable of handling %%% unprivileged (user) actions. %%% %%% %%% Discrete state and local abstract data types %%% %%% The daemon must keep track of requestors, subscribers, and zx_conn processes by -%%% using monitors, and because the various types of clients are found in different -%%% locations the monitors are maintained in a data type called monitor_index(), +%%% using monitors. Because these various types of clients are found in different +%%% structures the monitors are maintained in a data type called monitor_index(), %%% shortened to "mx" throughout the module. This structure is treated as an opaque %%% data type and is handled by a set of functions defined toward the end of the module %%% as mx_*/N. %%% -%%% Node connections (cx_conn processes) must also be tracked for status and realm +%%% Node connections (zx_conn processes) must also be tracked for status and realm %%% availability. This is done using a type called conn_index(), shortened to "cx" %%% throughout the module. conn_index() is treated as an abstract, opaque datatype %%% throughout the module, and is handled via a set of cx_*/N functions (after the %%% mx_*/N section). %%% %%% Do NOT directly access data within these structures, use (or write) an accessor -%%% function that does what you want. +%%% function that does what you want. Accessor functions MUST be pure with the sole +%%% exception of the mx_* functions that create and destroy monitors. %%% %%% %%% Connection handling @@ -40,7 +41,7 @@ %%% determine what realms must be available and what cached Zomp nodes it is aware of. %%% It populates the CX (conn_index(), mentioned above) with realm config and host %%% cache data, and then immediately initiates three connection attempts to cached -%%% nodes for each realm configured (if possible; see init_connections/0). +%%% nodes for each realm configured if possible (see init_connections/0). %%% %%% Once connection attempts have been initiated the daemon waits in receive for %%% either a connection report (success or failure) or an action request from @@ -90,6 +91,44 @@ %%% a blocking way, establishing its own receive timeouts or implementing either an %%% asynchronous or synchronous interface library atop zx_daemon interface function, %%% but leaving zx_daemon and zx_conn alone to work asynchronously with one another. +%%% +%%% +%%% Race Avoidance +%%% +%%% Each runtime can only have one zx_daemon alive at a time, and each system can +%%% only have one zx_daemon directly performing actions at a time. This is to prevent +%%% problems where multiple zx instances are running at the same time using the same +%%% home directory and might clash with one another (overwriting each other's data, +%%% corrupting package or key files, etc.). OTP makes running a single registered +%%% process simple within a single runtime, but there is no standard cross-platform +%%% method for ensuring a given process is the only one of its type in a given scope +%%% within a host system. +%%% +%%% When zx starts the daemon will attempt an exclusive write to a lock file called +%%% $ZOMP_HOME/zomp.lock using file:open(LockFile, [exclusive]), writing a system +%%% timestamp. If the write succeeds then the daemon knows it is the master for the +%%% system and will begin initiating connections as described above as well as open a +%%% local socket to listen for other zx instances which will need to proxy their own +%%% actions through the master. Once the socket is open, the lock file is updated with +%%% the local port number. If the write fails then the file is read and if a port +%%% number is indicated then the daemon connects to the master zx_daemon and proxies +%%% its requests through it. If no port number exists then the daemon waits 5 seconds, +%%% checks again, and if there is still no port number then it checks whether the +%%% timestamp is more than 5 seconds old or in the future. If the timestamp is more +%%% than 5 seconds old or in the future then the file is deleted and the process +%%% of master identification starts again. +%%% +%%% If a master daemon's runtime is shutting down it will designate its oldest peer +%%% daemon connection as the new master. At that point the new master will open a port +%%% and rewrite the lock file. Once written the old master will drop all its node +%%% connections and dequeue all current requests, then pass a local redirect message +%%% to the subordinate daemons telling them the new port to which they should connect. +%%% +%%% Even if there is considrable churn within a system from, for example, scripted +%%% initiation of several small utilities that have never been executed before, the +%%% longest-living daemon should always become the master. This is not the most +%%% efficient procedure, but it is the easiest to understand and debug across various +%%% platforms. %%% @end -module(zx_daemon). @@ -274,12 +313,6 @@ %% the filesystem. This step allows running development code from any location in %% the filesystem against installed dependencies without requiring any magical %% references. -%% -%% This call blocks specifically so that we can be certain that the target application -%% cannot be started before the impact of this call has taken full effect. It cannot -%% be known whether the very first thing the target application will do is send this -%% process an async message. That implies that this should only ever be called once, -%% by the launching process (which normally terminates shortly thereafter). pass_meta(Meta, Dir, ArgV) -> gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}). @@ -288,11 +321,11 @@ pass_meta(Meta, Dir, ArgV) -> -spec subscribe(Package) -> ok when Package :: zx:package(). %% @doc -%% Subscribe to update notifications for a for a particular package. +%% Subscribe to update notifications for a for a package. %% The caller will receive update notifications of type `sub_message()' as Erlang %% messages whenever an update occurs. %% Crashes the caller if the Realm or Name of the Package argument are illegal -%% `zx:lower0_9' strings. +%% `zx:lower0_9()' strings. subscribe(Package = {Realm, Name}) -> true = zx_lib:valid_lower0_9(Realm), @@ -558,6 +591,8 @@ start_link() -> -spec init(none) -> {ok, state()}. +%% @private +%% TODO: Implement lockfile checking and master lock acquisition. init(none) -> Blank = blank_state(), @@ -674,7 +709,8 @@ handle_cast({result, Ref, Result}, State) -> {noreply, NewState}; handle_cast({notify, Conn, Package, Update}, State) -> ok = do_notify(Conn, Package, Update, State), - {noreply, State}; + NewState = eval_queue(State), + {noreply, NewState}; handle_cast(stop, State) -> {stop, normal, State}; handle_cast(Unexpected, State) -> @@ -704,6 +740,7 @@ code_change(_, State, _) -> %% @private %% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean %% termination request. +%% TODO: Implement new master selection, dequeuing and request queue passing. terminate(normal, #s{cx = CX}) -> ok = log(info, "zx_daemon shutting down..."), @@ -778,7 +815,7 @@ do_request(Requestor, Action, State = #s{id = ID, actions = Actions}) -> State :: state(), NewState :: state(). %% @private -%% Receive a report from a connection process, and update the connection index and +%% Receive a report from a connection process, update the connection index and %% possibly retry connections. do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> @@ -788,7 +825,7 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> {assigned, NextCX} -> {NextMX, NextCX}; {unassigned, NextCX} -> - {ok, ScrubbedMX} = mx_del_monitor(Conn, conn, NextMX), + ScrubbedMX = mx_del_monitor(Conn, conn, NextMX), ok = zx_conn:stop(Conn), {ScrubbedMX, NextCX} end, @@ -796,9 +833,9 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) -> do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) -> NextMX = mx_del_monitor(Conn, attempt, MX), {Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX), - {NewMX, NewCX} = ensure_connection(Unassigned, NextMX, NextCX), + {NewMX, NewCX} = ensure_connections(Unassigned, NextMX, NextCX), State#s{mx = NewMX, cx = NewCX}; -do_report(Conn, failed, State = #s{mx = MX, cx = CX}) -> +do_report(Conn, failed, State = #s{mx = MX}) -> NewMX = mx_del_monitor(Conn, attempt, MX), failed(Conn, State#s{mx = NewMX}); do_report(Conn, disconnected, State = #s{mx = MX}) -> @@ -806,21 +843,30 @@ do_report(Conn, disconnected, State = #s{mx = MX}) -> disconnected(Conn, State#s{mx = NewMX}). -failed(Conn, State#s{mx = MX, cx = CX}) -> +-spec failed(Conn, State) -> NewState + when Conn :: pid(), + State :: state(), + NewState :: state(). + +failed(Conn, State = #s{mx = MX, cx = CX}) -> {Realms, NextCX} = cx_failed(Conn, CX), - {NewMX, NewCX} = ensure_connection(Realms, MX, NextCX), + {NewMX, NewCX} = ensure_connections(Realms, MX, NextCX), State#s{mx = NewMX, cx = NewCX}. - + + +-spec disconnected(Conn, State) -> NewState + when Conn :: pid(), + State :: state(), + NewState :: state(). disconnected(Conn, State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) -> - {Pending, LostSubs, ScrubbedCX} = cx_disconnected(Conn, CX), - Unassigned = cx_unassigned(ScrubbedCX), + {Pending, LostSubs, Unassigned, ScrubbedCX} = cx_disconnected(Conn, CX), ReSubs = [{S, {subscribe, P}} || {S, P} <- LostSubs], {Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests), ReReqs = maps:to_list(Dequeued), NewActions = ReReqs ++ ReSubs ++ Actions, - {NewMX, NewCX} = ensure_connection(Unassigned, ScrubbedMX, ScrubbedCX), + {NewMX, NewCX} = ensure_connections(Unassigned, MX, ScrubbedCX), State#s{actions = NewActions, requests = NewRequests, mx = NewMX, @@ -848,32 +894,48 @@ dequeue(Pending) -> end. --spec ensure_connection(Realms, MX, CX) -> {NewMX, NewCX} +-spec ensure_connections(Realms, MX, CX) -> {NewMX, NewCX} when Realms :: [zx:realm()], MX :: monitor_index(), CX :: conn_index(), NewMX :: monitor_index(), NewCX :: conn_index(). %% @private -%% Initiates a connection to a single realm at a time, taking into account whether -%% connected but unassigned nodes are alterantive providers of the needed realm. -%% Returns updated monitor and connection indices. +%% Check the list of unprovided realms with all available connections that can provide +%% it, and allocate accordingly. If any realms cannot be provided by existing +%% connections, new connections are initiated for all unprovided realms. -ensure_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) -> - {NewMX, NewCX} = +ensure_connections(Realms, MX, CX) -> + {NextCX, Unavailable} = reassign_conns(Realms, CX, []), + {ok, NewMX, NewCX} = init_connections(Unavailable, MX, NextCX), + {NewMX, NewCX}. + + +-spec reassign_conns(Realms, CX, Unavailable) -> {NewCX, NewUnavailable} + when Realms :: [zx:realm()], + CX :: conn_index(), + Unavailable :: [zx:realm()], + NewCX :: conn_index(), + NewUnavailable :: [zx:realm()]. +%% @private +%% Finds connections that provide a requested realm and assigns that connection to +%% take over realm provision. Returns the updated CX and a list of all the realms +%% that could not be provided by any available connection. + +reassign_conns([Realm | Realms], CX = #cx{realms = RMetas}, Unassigned) -> + {NewUnassigned, NewCX} = case maps:get(Realm, RMetas) of #rmeta{available = []} -> - {ok, NextMX, NextCX} = init_connections([Realm], MX, CX), - {NextMX, NextCX}; + {[Realm | Unassigned], CX}; Meta = #rmeta{available = [Conn | Conns]} -> NewMeta = Meta#rmeta{assigned = Conn, available = Conns}, NewRMetas = maps:put(Realm, NewMeta, RMetas), NextCX = CX#cx{realms = NewRMetas}, - {MX, NextCX} + {Unassigned, NextCX} end, - ensure_connection(Realms, NewMX, NewCX); -ensure_connection([], MX, CX) -> - {MX, CX}. + reassign_conns(Realms, NewCX, NewUnassigned); +reassign_conns([], CX, Unassigned) -> + {CX, Unassigned}. -spec do_result(ID, Result, State) -> NewState @@ -915,7 +977,7 @@ handle_orphan_result(ID, Result, Dropped) -> ok = log(info, Message, [ID, Request, Result]), NewDropped; error -> - Message = "Received unknown request result ~tp: ~tp", + Message = "Received untracked request result ~tp: ~tp", ok = log(warning, Message, [ID, Result]), Dropped end. @@ -954,6 +1016,16 @@ eval_queue(State = #s{actions = Actions}) -> eval_queue(InOrder, State#s{actions = []}). +-spec eval_queue(Actions, State) -> NewState + when Actions :: [action()], + State :: state(), + NewState :: state(). +%% @private +%% This is essentially a big, gnarly fold over the action list with State as the +%% accumulator. It repacks the State#s.actions list with whatever requests were not +%% able to be handled and updates State in whatever way necessary according to the +%% handled requests. + eval_queue([], State) -> State; eval_queue([Action = {request, Pid, ID, Message} | Rest], @@ -966,7 +1038,7 @@ eval_queue([Action = {request, Pid, ID, Message} | Rest], {Actions, NextRequests, NextMX, NextCX}; {result, Response} -> Pid ! Response, - {Actions, Requests, CX}; + {Actions, Requests, MX, CX}; wait -> NextActions = [Action | Actions], NextMX = mx_add_monitor(Pid, requestor, MX), @@ -986,7 +1058,7 @@ eval_queue([Action = {subscribe, Pid, Package} | Rest], ok = zx_conn:subscribe(Conn, Package), NextMX = mx_add_monitor(Pid, subscriber, MX), {Actions, NextMX, NextCX}; - {have_sub, NextCX} + {have_sub, NextCX} -> NextMX = mx_add_monitor(Pid, subscriber, MX), {Actions, NextMX, NextCX}; unassigned -> @@ -1002,8 +1074,8 @@ eval_queue([{unsubscribe, Pid, Package} | Rest], {ok, NewMX} = mx_del_monitor(Pid, {subscription, Package}, MX), NewCX = case cx_del_sub(Pid, Package, CX) of - {drop_sub, NextCX} -> - ok = zx_conn:unsubscribe(Conn, Package), + {{drop_sub, ConnPid}, NextCX} -> + ok = zx_conn:unsubscribe(ConnPid, Package), NextCX; {keep_sub, NextCX} -> NextCX; @@ -1026,10 +1098,13 @@ eval_queue([{unsubscribe, Pid, Package} | Rest], | wait, NewCX :: conn_index(), Response :: result(). +%% @private +%% Routes a request to the correct realm connector, if it is available. If it is not +%% available but configured it will return `wait' indicating that the caller should +%% repack the request and attempt to re-evaluate it later. If the realm is not +%% configured at all, the process is short-circuited by forming an error response +%% directly. -dispatch_request(list, ID, CX) -> - Realms = cx_realms(CX), - {result, ID, Realms}; dispatch_request(Action, ID, CX) -> Realm = element(2, Action), case cx_pre_send(Realm, ID, CX) of @@ -1062,27 +1137,20 @@ clear_monitor(Pid, mx = MX, cx = CX}) -> case mx_crashed_monitor(Pid, MX) of - {attempt, NextMX} -> - failed( - {conn, NextMX} -> - disconnected(Pid, State#s{mx = NextMX}); - {{Reqs, Subs}, NextMX} -> - case mx_lookup_category(Pid, MX) of - attempt -> - do_report(Pid, failed, State); - conn -> - do_report(Pid, disconnected, State); - {Reqs, Subs} -> + {attempt, NewMX} -> + failed(Pid, State#s{mx = NewMX}); + {conn, NewMX} -> + disconnected(Pid, State#s{mx = NewMX}); + {{Reqs, Subs}, NewMX} -> NewActions = drop_actions(Pid, Actions), {NewDropped, NewRequests} = drop_requests(Pid, Dropped, Requests), NewCX = cx_clear_client(Pid, Reqs, Subs, CX), - NewMX = mx_del_monitor(Pid, crashed, MX), State#s{actions = NewActions, requests = NewRequests, dropped = NewDropped, mx = NewMX, cx = NewCX}; - error -> + unknown -> Unexpected = {'DOWN', Ref, process, Pid, Reason}, ok = log(warning, "Unexpected info: ~tp", [Unexpected]), State @@ -1121,116 +1189,8 @@ drop_requests(ReqIDs, Dropped, Requests) -> lists:fold(Partition, {Dropped, Requests}, ReqIDs). -%-spec do_query_latest(Object, State) -> {Result, NewState} -% when Object :: zx:package() | zx:package_id(), -% State :: state(), -% Result :: {ok, zx:version()} -% | {error, Reason}, -% Reason :: bad_realm -% | bad_package -% | bad_version, -% NewState :: state(). -%% @private -%% Queries a zomp realm for the latest version of a package or package -%% version (complete or incomplete version number). -% -%do_query_latest(Socket, {Realm, Name}) -> -% ok = zx_net:send(Socket, {latest, Realm, Name}), -% receive -% {tcp, Socket, Bin} -> binary_to_term(Bin) -% after 5000 -> {error, timeout} -% end; -%do_query_latest(Socket, {Realm, Name, Version}) -> -% ok = zx_net:send(Socket, {latest, Realm, Name, Version}), -% receive -% {tcp, Socket, Bin} -> binary_to_term(Bin) -% after 5000 -> {error, timeout} -% end. - - -%-spec do_fetch(PackageIDs, State) -> NewState -% when PackageIDs :: [zx:package_id()], -% State :: state(), -% NewState :: state(), -% Result :: ok -% | {error, Reason}, -% Reason :: bad_realm -% | bad_package -% | bad_version -% | network. -%% @private -%% -% -%do_fetch(PackageIDs, State) -> -% FIXME: Need to create a job queue divided by realm and dispatched to connectors, -% and cleared from the master pending queue kept here by the daemon as the -% workers succeed. Basic task queue management stuff... which never existed -% in ZX before... grrr... -% case scrub(PackageIDs) of -% [] -> -% ok; -% Needed -> -% Partitioned = partition_by_realm(Needed), -% EnsureDeps = -% fun({Realm, Packages}) -> -% ok = zx_conn:queue_package(Pid, Realm, Packages), -% log(info, "Disconnecting from realm: ~ts", [Realm]) -% end, -% lists:foreach(EnsureDeps, Partitioned) -% end. -% -% -%partition_by_realm(PackageIDs) -> -% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs), -% maps:to_list(PartitionMap). -% -% -%partition_by_realm({R, P, V}, M) -> -% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M). -% -% -%ensure_deps(_, _, []) -> -% ok; -%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) -> -% ok = ensure_dep(Socket, {Realm, Name, Version}), -% ensure_deps(Socket, Realm, Rest). -% -% -%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return(). -%% @private -%% Given an PackageID as an argument, check whether its package file exists in the -%% system cache, and if not download it. Should return `ok' whenever the file is -%% sourced, but exit with an error if it cannot locate or acquire the package. -% -%ensure_dep(Socket, PackageID) -> -% ZrpFile = filename:join("zrp", namify_zrp(PackageID)), -% ok = -% case filelib:is_regular(ZrpFile) of -% true -> ok; -% false -> fetch(Socket, PackageID) -% end, -% ok = install(PackageID), -% build(PackageID). -% -% -%-spec scrub(Deps) -> Scrubbed -% when Deps :: [package_id()], -% Scrubbed :: [package_id()]. -%% @private -%% Take a list of dependencies and return a list of dependencies that are not yet -%% installed on the system. -% -%scrub([]) -> -% []; -%scrub(Deps) -> -% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps). - - %%% Monitor Index ADT Interface Functions -%%% -%%% Very simple structure, but explicit handling of it becomes bothersome in other -%%% code, so it is all just packed down here. -spec mx_new() -> monitor_index(). %% @private @@ -1288,7 +1248,7 @@ mx_upgrade_conn(Pid, MX) -> when Conn :: pid(), Category :: attempt | conn - | {requestor, id()}, + | {requestor, id()} | {subscriber, Sub :: tuple()}, MX :: monitor_index(), NewMX :: monitor_index(). @@ -1311,12 +1271,12 @@ mx_del_monitor(Pid, {requestor, ID}, MX) -> true = demonitor(Ref, [flush]), NextMX; {{Ref, {Reqs, Subs}}, NextMX} when Reqs > 0 -> - NewReqs = lists:subtract(ID, Reqs), - maps:put(Pid, {NewReqs, Subs}, NextMX) - end, + NewReqs = lists:delete(ID, Reqs), + maps:put(Pid, {Ref, {NewReqs, Subs}}, NextMX) + end; mx_del_monitor(Pid, {subscriber, Sub}, MX) -> case maps:take(Pid, MX) of - {{Ref, {[], [Package]}}, NextMX} -> + {{Ref, {[], [Sub]}}, NextMX} -> true = demonitor(Ref, [flush]), NextMX; {{Ref, {Reqs, Subs}}, NextMX} when Subs > 0 -> @@ -1338,27 +1298,10 @@ mx_del_monitor(Pid, {subscriber, Sub}, MX) -> mx_crashed_monitor(Pid, MX) -> case maps:take(Pid, MX) of {{Ref, Type}, NewMX} -> - true = demonitor(Mon, [flush]), + true = demonitor(Ref, [flush]), {Type, NewMX}; error -> - error - end. - - --spec mx_lookup_category(Pid, MX) -> Result - when Pid :: pid(), - MX :: monitor_index(), - Result :: attempt - | conn - | {Reqs :: [reference()], Subs :: [zx:package()]} - | error. -%% @private -%% Lookup a monitor's categories. - -mx_lookup_category(Pid, MX) -> - case maps:find(Pid, MX) of - {ok, Mon} -> element(2, Mon); - error -> error + unknown end. @@ -1672,7 +1615,7 @@ cx_connected(Available, Pid, CX = #cx{attempts = Attempts}) -> -spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX} when A :: unassigned | assigned, Available :: [{zx:realm(), zx:serial()}], - Conn :: {pid(), zx:host(), [zx:realm()]}, + Conn :: connection(), CX :: conn_index(), NewA :: unassigned | assigned, NewCX :: conn_index(). @@ -1704,7 +1647,7 @@ cx_connected(A, -spec cx_connected(A, Realm, Conn, Meta, CX) -> {NewA, NewCX} when A :: unassigned | assigned, Realm :: zx:host(), - Conn :: {pid(), zx:host(), [zx:realm()]}, + Conn :: connection(), Meta :: realm_meta(), CX :: conn_index(), NewA :: unassigned | assigned, @@ -1785,6 +1728,12 @@ cx_failed(Conn, CX = #cx{attempts = Attempts}) -> CX :: conn_index(), Unassigned :: [zx:realm()], NewCX :: conn_index(). +%% @private +%% Remove a redirected connection attempt from CX, add its redirect hosts to the +%% mirror queue, and proceed to make further connection atempts to all unassigned +%% realms. This can cause an inflationary number of new connection attempts, but this +%% is considered preferrable because the more mirrors fail the longer the user is +%% waiting and the more urgent the need to discover a working node becomes. cx_redirect(Conn, Hosts, CX = #cx{attempts = Attempts}) -> NewAttempts = lists:keydelete(Conn, 1, Attempts), @@ -1810,7 +1759,7 @@ cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) -> NewMeta = Meta#rmeta{mirrors = NewMirrors}, maps:put(R, NewMeta, Rs); error -> - R + Rs end end, NewRealms = lists:foldl(Apply, Realms, Provided), @@ -1823,7 +1772,7 @@ cx_redirect([], CX) -> when CX :: conn_index(), Unassigned :: [zx:realm()]. %% @private -%% Scan the CX record for unassigned realms;return a list of all unassigned +%% Scan CX#cx.realms for unassigned realms and return a list of all unassigned %% realm names. cx_unassigned(#cx{realms = Realms}) -> @@ -1837,12 +1786,13 @@ cx_unassigned(#cx{realms = Realms}) -> maps:fold(NotAssigned, [], Realms). --spec cx_disconnected(Conn, CX) -> {Requests, Subs, NewCX} - when Conn :: pid(), - CX :: conn_index(), - Requests :: [reference()], - Subs :: [zx:package()], - NewCX :: conn_index(). +-spec cx_disconnected(Conn, CX) -> {Requests, Subs, Unassigned, NewCX} + when Conn :: pid(), + CX :: conn_index(), + Requests :: [id()], + Subs :: [zx:package()], + Unassigned :: [zx:realm()], + NewCX :: conn_index(). %% @private %% An abstract data handler which is called whenever a connection terminates. %% This function removes all data related to the disconnected pid and its assigned @@ -1854,7 +1804,8 @@ cx_disconnected(Pid, CX = #cx{realms = Realms, conns = Conns}) -> #conn{host = Host, requests = Requests, subs = Subs} = Conn, NewRealms = cx_scrub_assigned(Pid, Host, Realms), NewCX = CX#cx{realms = NewRealms, conns = NewConns}, - {Requests, Subs, NewCX}. + Unassigned = cx_unassigned(NewCX), + {Requests, Subs, Unassigned, NewCX}. -spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms @@ -1960,11 +1911,11 @@ cx_add_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> NewCX :: conn_index(). cx_maybe_new_sub(Conn = #conn{pid = ConnPid, subs = Subs}, - Sub = {Subscriber, Channel}, + Sub = {_, Channel}, CX = #cx{conns = Conns}) -> - NewSubs = [{Subscriber, Channel} | Subs], + NewSubs = [Sub | Subs], NewConn = Conn#conn{subs = NewSubs}, - NewConns = [NewConn | NextConns], + NewConns = [NewConn | Conns], NewCX = CX#cx{conns = NewConns}, case lists:keymember(Channel, 2, Subs) of false -> {need_sub, ConnPid, NewCX}; @@ -1990,25 +1941,35 @@ cx_del_sub(Subscriber, Channel, CX = #cx{conns = Conns}) -> case cx_resolve(Realm, CX) of {ok, Pid} -> {value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns), - cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}; + cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns}); Other -> Other end. +-spec cx_maybe_last_sub(Conn, Sub, CX) -> {Verdict, NewCX} + when Conn :: connection(), + Sub :: {pid(), term()}, + CX :: conn_index(), + Verdict :: {drop_sub, Conn :: pid()} | keep_sub, + NewCX :: conn_index(). +%% @private +%% Tells us whether a sub is still valid for any clients. If a sub is unsubbed by all +%% then it needs to be unsubscribed at the upstream node. + cx_maybe_last_sub(Conn = #conn{pid = ConnPid, subs = Subs}, - Sub = {Subscriber, Channel}, + Sub = {_, Channel}, CX = #cx{conns = Conns}) -> NewSubs = lists:delete(Sub, Subs), NewConn = Conn#conn{subs = NewSubs}, - NewConns = [NewConn | NextConns], + NewConns = [NewConn | Conns], NewCX = CX#cx{conns = NewConns}, - MaybeDrop = + Verdict = case lists:keymember(Channel, 2, NewSubs) of - false -> drop_sub; + false -> {drop_sub, ConnPid}; true -> keep_sub end, - {MaybeDrop, NewCX}. + {Verdict, NewCX}. -spec cx_get_subscribers(Conn, Channel, CX) -> Subscribers @@ -2022,6 +1983,16 @@ cx_get_subscribers(Conn, Channel, #cx{conns = Conns}) -> lists:fold(registered_to(Channel), [], Subs). +-spec registered_to(Channel) -> fun(({P, C}, A) -> NewA) + when Channel :: term(), + P :: pid(), + C :: term(), + A :: [pid()], + NewA :: [pid()]. +%% @private +%% Matching function that closes over a given channel in a subscriber list. +%% This function exists mostly to make its parent function read nicely. + registered_to(Channel) -> fun({P, C}, A) -> case C == Channel of @@ -2031,8 +2002,15 @@ registered_to(Channel) -> end. +-spec cx_clear_client(Pid, DeadReqs, DeadSubs, CX) -> NewCX + when Pid :: pid(), + DeadReqs :: [id()], + DeadSubs :: [term()], + CX :: conn_index(), + NewCX :: conn_index(). + cx_clear_client(Pid, DeadReqs, DeadSubs, CX = #cx{conns = Conns}) -> - DropSubs = [{S, Pid} || S <- DeadSubs], + DropSubs = [{Pid, Sub} || Sub <- DeadSubs], Clear = fun(C = #conn{requests = Requests, subs = Subs}) -> NewSubs = lists:subtract(Subs, DropSubs),