blah
This commit is contained in:
parent
c26f6737dc
commit
08972f0160
BIN
zomp/lib/otpr-zx/0.1.0/src/.zx_daemon.erl.swp
Normal file
BIN
zomp/lib/otpr-zx/0.1.0/src/.zx_daemon.erl.swp
Normal file
Binary file not shown.
@ -303,3 +303,109 @@ handle_timeout(Socket) ->
|
|||||||
|
|
||||||
terminate() ->
|
terminate() ->
|
||||||
exit(normal).
|
exit(normal).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%-spec do_query_latest(Object, State) -> {Result, NewState}
|
||||||
|
% when Object :: zx:package() | zx:package_id(),
|
||||||
|
% State :: state(),
|
||||||
|
% Result :: {ok, zx:version()}
|
||||||
|
% | {error, Reason},
|
||||||
|
% Reason :: bad_realm
|
||||||
|
% | bad_package
|
||||||
|
% | bad_version,
|
||||||
|
% NewState :: state().
|
||||||
|
%% @private
|
||||||
|
%% Queries a zomp realm for the latest version of a package or package
|
||||||
|
%% version (complete or incomplete version number).
|
||||||
|
%
|
||||||
|
%do_query_latest(Socket, {Realm, Name}) ->
|
||||||
|
% ok = zx_net:send(Socket, {latest, Realm, Name}),
|
||||||
|
% receive
|
||||||
|
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
||||||
|
% after 5000 -> {error, timeout}
|
||||||
|
% end;
|
||||||
|
%do_query_latest(Socket, {Realm, Name, Version}) ->
|
||||||
|
% ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
|
||||||
|
% receive
|
||||||
|
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
||||||
|
% after 5000 -> {error, timeout}
|
||||||
|
% end.
|
||||||
|
|
||||||
|
|
||||||
|
%-spec do_fetch(PackageIDs, State) -> NewState
|
||||||
|
% when PackageIDs :: [zx:package_id()],
|
||||||
|
% State :: state(),
|
||||||
|
% NewState :: state(),
|
||||||
|
% Result :: ok
|
||||||
|
% | {error, Reason},
|
||||||
|
% Reason :: bad_realm
|
||||||
|
% | bad_package
|
||||||
|
% | bad_version
|
||||||
|
% | network.
|
||||||
|
%% @private
|
||||||
|
%%
|
||||||
|
%
|
||||||
|
%do_fetch(PackageIDs, State) ->
|
||||||
|
% FIXME: Need to create a job queue divided by realm and dispatched to connectors,
|
||||||
|
% and cleared from the master pending queue kept here by the daemon as the
|
||||||
|
% workers succeed. Basic task queue management stuff... which never existed
|
||||||
|
% in ZX before... grrr...
|
||||||
|
% case scrub(PackageIDs) of
|
||||||
|
% [] ->
|
||||||
|
% ok;
|
||||||
|
% Needed ->
|
||||||
|
% Partitioned = partition_by_realm(Needed),
|
||||||
|
% EnsureDeps =
|
||||||
|
% fun({Realm, Packages}) ->
|
||||||
|
% ok = zx_conn:queue_package(Pid, Realm, Packages),
|
||||||
|
% log(info, "Disconnecting from realm: ~ts", [Realm])
|
||||||
|
% end,
|
||||||
|
% lists:foreach(EnsureDeps, Partitioned)
|
||||||
|
% end.
|
||||||
|
%
|
||||||
|
%
|
||||||
|
%partition_by_realm(PackageIDs) ->
|
||||||
|
% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs),
|
||||||
|
% maps:to_list(PartitionMap).
|
||||||
|
%
|
||||||
|
%
|
||||||
|
%partition_by_realm({R, P, V}, M) ->
|
||||||
|
% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M).
|
||||||
|
%
|
||||||
|
%
|
||||||
|
%ensure_deps(_, _, []) ->
|
||||||
|
% ok;
|
||||||
|
%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) ->
|
||||||
|
% ok = ensure_dep(Socket, {Realm, Name, Version}),
|
||||||
|
% ensure_deps(Socket, Realm, Rest).
|
||||||
|
%
|
||||||
|
%
|
||||||
|
%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return().
|
||||||
|
%% @private
|
||||||
|
%% Given an PackageID as an argument, check whether its package file exists in the
|
||||||
|
%% system cache, and if not download it. Should return `ok' whenever the file is
|
||||||
|
%% sourced, but exit with an error if it cannot locate or acquire the package.
|
||||||
|
%
|
||||||
|
%ensure_dep(Socket, PackageID) ->
|
||||||
|
% ZrpFile = filename:join("zrp", namify_zrp(PackageID)),
|
||||||
|
% ok =
|
||||||
|
% case filelib:is_regular(ZrpFile) of
|
||||||
|
% true -> ok;
|
||||||
|
% false -> fetch(Socket, PackageID)
|
||||||
|
% end,
|
||||||
|
% ok = install(PackageID),
|
||||||
|
% build(PackageID).
|
||||||
|
%
|
||||||
|
%
|
||||||
|
%-spec scrub(Deps) -> Scrubbed
|
||||||
|
% when Deps :: [package_id()],
|
||||||
|
% Scrubbed :: [package_id()].
|
||||||
|
%% @private
|
||||||
|
%% Take a list of dependencies and return a list of dependencies that are not yet
|
||||||
|
%% installed on the system.
|
||||||
|
%
|
||||||
|
%scrub([]) ->
|
||||||
|
% [];
|
||||||
|
%scrub(Deps) ->
|
||||||
|
% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps).
|
||||||
|
|||||||
@ -1,30 +1,31 @@
|
|||||||
%%% @doc
|
%%% @doc
|
||||||
%%% ZX Daemon
|
%%% ZX Daemon
|
||||||
%%%
|
%%%
|
||||||
%%% Resident execution daemon and runtime interface to Zomp.
|
%%% Resident task daemon and runtime interface to Zomp.
|
||||||
%%%
|
%%%
|
||||||
%%% The daemon resides in the background once started and awaits query requests and
|
%%% The daemon resides in the background once started and awaits query requests and
|
||||||
%%% subscriptions from from other processes. The daemon is only capable of handling
|
%%% subscriptions from other processes. The daemon is only capable of handling
|
||||||
%%% unprivileged (user) actions.
|
%%% unprivileged (user) actions.
|
||||||
%%%
|
%%%
|
||||||
%%%
|
%%%
|
||||||
%%% Discrete state and local abstract data types
|
%%% Discrete state and local abstract data types
|
||||||
%%%
|
%%%
|
||||||
%%% The daemon must keep track of requestors, subscribers, and zx_conn processes by
|
%%% The daemon must keep track of requestors, subscribers, and zx_conn processes by
|
||||||
%%% using monitors, and because the various types of clients are found in different
|
%%% using monitors. Because these various types of clients are found in different
|
||||||
%%% locations the monitors are maintained in a data type called monitor_index(),
|
%%% structures the monitors are maintained in a data type called monitor_index(),
|
||||||
%%% shortened to "mx" throughout the module. This structure is treated as an opaque
|
%%% shortened to "mx" throughout the module. This structure is treated as an opaque
|
||||||
%%% data type and is handled by a set of functions defined toward the end of the module
|
%%% data type and is handled by a set of functions defined toward the end of the module
|
||||||
%%% as mx_*/N.
|
%%% as mx_*/N.
|
||||||
%%%
|
%%%
|
||||||
%%% Node connections (cx_conn processes) must also be tracked for status and realm
|
%%% Node connections (zx_conn processes) must also be tracked for status and realm
|
||||||
%%% availability. This is done using a type called conn_index(), shortened to "cx"
|
%%% availability. This is done using a type called conn_index(), shortened to "cx"
|
||||||
%%% throughout the module. conn_index() is treated as an abstract, opaque datatype
|
%%% throughout the module. conn_index() is treated as an abstract, opaque datatype
|
||||||
%%% throughout the module, and is handled via a set of cx_*/N functions (after the
|
%%% throughout the module, and is handled via a set of cx_*/N functions (after the
|
||||||
%%% mx_*/N section).
|
%%% mx_*/N section).
|
||||||
%%%
|
%%%
|
||||||
%%% Do NOT directly access data within these structures, use (or write) an accessor
|
%%% Do NOT directly access data within these structures, use (or write) an accessor
|
||||||
%%% function that does what you want.
|
%%% function that does what you want. Accessor functions MUST be pure with the sole
|
||||||
|
%%% exception of the mx_* functions that create and destroy monitors.
|
||||||
%%%
|
%%%
|
||||||
%%%
|
%%%
|
||||||
%%% Connection handling
|
%%% Connection handling
|
||||||
@ -40,7 +41,7 @@
|
|||||||
%%% determine what realms must be available and what cached Zomp nodes it is aware of.
|
%%% determine what realms must be available and what cached Zomp nodes it is aware of.
|
||||||
%%% It populates the CX (conn_index(), mentioned above) with realm config and host
|
%%% It populates the CX (conn_index(), mentioned above) with realm config and host
|
||||||
%%% cache data, and then immediately initiates three connection attempts to cached
|
%%% cache data, and then immediately initiates three connection attempts to cached
|
||||||
%%% nodes for each realm configured (if possible; see init_connections/0).
|
%%% nodes for each realm configured if possible (see init_connections/0).
|
||||||
%%%
|
%%%
|
||||||
%%% Once connection attempts have been initiated the daemon waits in receive for
|
%%% Once connection attempts have been initiated the daemon waits in receive for
|
||||||
%%% either a connection report (success or failure) or an action request from
|
%%% either a connection report (success or failure) or an action request from
|
||||||
@ -90,6 +91,44 @@
|
|||||||
%%% a blocking way, establishing its own receive timeouts or implementing either an
|
%%% a blocking way, establishing its own receive timeouts or implementing either an
|
||||||
%%% asynchronous or synchronous interface library atop zx_daemon interface function,
|
%%% asynchronous or synchronous interface library atop zx_daemon interface function,
|
||||||
%%% but leaving zx_daemon and zx_conn alone to work asynchronously with one another.
|
%%% but leaving zx_daemon and zx_conn alone to work asynchronously with one another.
|
||||||
|
%%%
|
||||||
|
%%%
|
||||||
|
%%% Race Avoidance
|
||||||
|
%%%
|
||||||
|
%%% Each runtime can only have one zx_daemon alive at a time, and each system can
|
||||||
|
%%% only have one zx_daemon directly performing actions at a time. This is to prevent
|
||||||
|
%%% problems where multiple zx instances are running at the same time using the same
|
||||||
|
%%% home directory and might clash with one another (overwriting each other's data,
|
||||||
|
%%% corrupting package or key files, etc.). OTP makes running a single registered
|
||||||
|
%%% process simple within a single runtime, but there is no standard cross-platform
|
||||||
|
%%% method for ensuring a given process is the only one of its type in a given scope
|
||||||
|
%%% within a host system.
|
||||||
|
%%%
|
||||||
|
%%% When zx starts the daemon will attempt an exclusive write to a lock file called
|
||||||
|
%%% $ZOMP_HOME/zomp.lock using file:open(LockFile, [exclusive]), writing a system
|
||||||
|
%%% timestamp. If the write succeeds then the daemon knows it is the master for the
|
||||||
|
%%% system and will begin initiating connections as described above as well as open a
|
||||||
|
%%% local socket to listen for other zx instances which will need to proxy their own
|
||||||
|
%%% actions through the master. Once the socket is open, the lock file is updated with
|
||||||
|
%%% the local port number. If the write fails then the file is read and if a port
|
||||||
|
%%% number is indicated then the daemon connects to the master zx_daemon and proxies
|
||||||
|
%%% its requests through it. If no port number exists then the daemon waits 5 seconds,
|
||||||
|
%%% checks again, and if there is still no port number then it checks whether the
|
||||||
|
%%% timestamp is more than 5 seconds old or in the future. If the timestamp is more
|
||||||
|
%%% than 5 seconds old or in the future then the file is deleted and the process
|
||||||
|
%%% of master identification starts again.
|
||||||
|
%%%
|
||||||
|
%%% If a master daemon's runtime is shutting down it will designate its oldest peer
|
||||||
|
%%% daemon connection as the new master. At that point the new master will open a port
|
||||||
|
%%% and rewrite the lock file. Once written the old master will drop all its node
|
||||||
|
%%% connections and dequeue all current requests, then pass a local redirect message
|
||||||
|
%%% to the subordinate daemons telling them the new port to which they should connect.
|
||||||
|
%%%
|
||||||
|
%%% Even if there is considrable churn within a system from, for example, scripted
|
||||||
|
%%% initiation of several small utilities that have never been executed before, the
|
||||||
|
%%% longest-living daemon should always become the master. This is not the most
|
||||||
|
%%% efficient procedure, but it is the easiest to understand and debug across various
|
||||||
|
%%% platforms.
|
||||||
%%% @end
|
%%% @end
|
||||||
|
|
||||||
-module(zx_daemon).
|
-module(zx_daemon).
|
||||||
@ -274,12 +313,6 @@
|
|||||||
%% the filesystem. This step allows running development code from any location in
|
%% the filesystem. This step allows running development code from any location in
|
||||||
%% the filesystem against installed dependencies without requiring any magical
|
%% the filesystem against installed dependencies without requiring any magical
|
||||||
%% references.
|
%% references.
|
||||||
%%
|
|
||||||
%% This call blocks specifically so that we can be certain that the target application
|
|
||||||
%% cannot be started before the impact of this call has taken full effect. It cannot
|
|
||||||
%% be known whether the very first thing the target application will do is send this
|
|
||||||
%% process an async message. That implies that this should only ever be called once,
|
|
||||||
%% by the launching process (which normally terminates shortly thereafter).
|
|
||||||
|
|
||||||
pass_meta(Meta, Dir, ArgV) ->
|
pass_meta(Meta, Dir, ArgV) ->
|
||||||
gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}).
|
gen_server:cast(?MODULE, {pass_meta, Meta, Dir, ArgV}).
|
||||||
@ -288,11 +321,11 @@ pass_meta(Meta, Dir, ArgV) ->
|
|||||||
-spec subscribe(Package) -> ok
|
-spec subscribe(Package) -> ok
|
||||||
when Package :: zx:package().
|
when Package :: zx:package().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Subscribe to update notifications for a for a particular package.
|
%% Subscribe to update notifications for a for a package.
|
||||||
%% The caller will receive update notifications of type `sub_message()' as Erlang
|
%% The caller will receive update notifications of type `sub_message()' as Erlang
|
||||||
%% messages whenever an update occurs.
|
%% messages whenever an update occurs.
|
||||||
%% Crashes the caller if the Realm or Name of the Package argument are illegal
|
%% Crashes the caller if the Realm or Name of the Package argument are illegal
|
||||||
%% `zx:lower0_9' strings.
|
%% `zx:lower0_9()' strings.
|
||||||
|
|
||||||
subscribe(Package = {Realm, Name}) ->
|
subscribe(Package = {Realm, Name}) ->
|
||||||
true = zx_lib:valid_lower0_9(Realm),
|
true = zx_lib:valid_lower0_9(Realm),
|
||||||
@ -558,6 +591,8 @@ start_link() ->
|
|||||||
|
|
||||||
|
|
||||||
-spec init(none) -> {ok, state()}.
|
-spec init(none) -> {ok, state()}.
|
||||||
|
%% @private
|
||||||
|
%% TODO: Implement lockfile checking and master lock acquisition.
|
||||||
|
|
||||||
init(none) ->
|
init(none) ->
|
||||||
Blank = blank_state(),
|
Blank = blank_state(),
|
||||||
@ -674,7 +709,8 @@ handle_cast({result, Ref, Result}, State) ->
|
|||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
handle_cast({notify, Conn, Package, Update}, State) ->
|
handle_cast({notify, Conn, Package, Update}, State) ->
|
||||||
ok = do_notify(Conn, Package, Update, State),
|
ok = do_notify(Conn, Package, Update, State),
|
||||||
{noreply, State};
|
NewState = eval_queue(State),
|
||||||
|
{noreply, NewState};
|
||||||
handle_cast(stop, State) ->
|
handle_cast(stop, State) ->
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
handle_cast(Unexpected, State) ->
|
handle_cast(Unexpected, State) ->
|
||||||
@ -704,6 +740,7 @@ code_change(_, State, _) ->
|
|||||||
%% @private
|
%% @private
|
||||||
%% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean
|
%% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean
|
||||||
%% termination request.
|
%% termination request.
|
||||||
|
%% TODO: Implement new master selection, dequeuing and request queue passing.
|
||||||
|
|
||||||
terminate(normal, #s{cx = CX}) ->
|
terminate(normal, #s{cx = CX}) ->
|
||||||
ok = log(info, "zx_daemon shutting down..."),
|
ok = log(info, "zx_daemon shutting down..."),
|
||||||
@ -778,7 +815,7 @@ do_request(Requestor, Action, State = #s{id = ID, actions = Actions}) ->
|
|||||||
State :: state(),
|
State :: state(),
|
||||||
NewState :: state().
|
NewState :: state().
|
||||||
%% @private
|
%% @private
|
||||||
%% Receive a report from a connection process, and update the connection index and
|
%% Receive a report from a connection process, update the connection index and
|
||||||
%% possibly retry connections.
|
%% possibly retry connections.
|
||||||
|
|
||||||
do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
|
do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
|
||||||
@ -788,7 +825,7 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
|
|||||||
{assigned, NextCX} ->
|
{assigned, NextCX} ->
|
||||||
{NextMX, NextCX};
|
{NextMX, NextCX};
|
||||||
{unassigned, NextCX} ->
|
{unassigned, NextCX} ->
|
||||||
{ok, ScrubbedMX} = mx_del_monitor(Conn, conn, NextMX),
|
ScrubbedMX = mx_del_monitor(Conn, conn, NextMX),
|
||||||
ok = zx_conn:stop(Conn),
|
ok = zx_conn:stop(Conn),
|
||||||
{ScrubbedMX, NextCX}
|
{ScrubbedMX, NextCX}
|
||||||
end,
|
end,
|
||||||
@ -796,9 +833,9 @@ do_report(Conn, {connected, Realms}, State = #s{mx = MX, cx = CX}) ->
|
|||||||
do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) ->
|
do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) ->
|
||||||
NextMX = mx_del_monitor(Conn, attempt, MX),
|
NextMX = mx_del_monitor(Conn, attempt, MX),
|
||||||
{Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX),
|
{Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX),
|
||||||
{NewMX, NewCX} = ensure_connection(Unassigned, NextMX, NextCX),
|
{NewMX, NewCX} = ensure_connections(Unassigned, NextMX, NextCX),
|
||||||
State#s{mx = NewMX, cx = NewCX};
|
State#s{mx = NewMX, cx = NewCX};
|
||||||
do_report(Conn, failed, State = #s{mx = MX, cx = CX}) ->
|
do_report(Conn, failed, State = #s{mx = MX}) ->
|
||||||
NewMX = mx_del_monitor(Conn, attempt, MX),
|
NewMX = mx_del_monitor(Conn, attempt, MX),
|
||||||
failed(Conn, State#s{mx = NewMX});
|
failed(Conn, State#s{mx = NewMX});
|
||||||
do_report(Conn, disconnected, State = #s{mx = MX}) ->
|
do_report(Conn, disconnected, State = #s{mx = MX}) ->
|
||||||
@ -806,21 +843,30 @@ do_report(Conn, disconnected, State = #s{mx = MX}) ->
|
|||||||
disconnected(Conn, State#s{mx = NewMX}).
|
disconnected(Conn, State#s{mx = NewMX}).
|
||||||
|
|
||||||
|
|
||||||
failed(Conn, State#s{mx = MX, cx = CX}) ->
|
-spec failed(Conn, State) -> NewState
|
||||||
|
when Conn :: pid(),
|
||||||
|
State :: state(),
|
||||||
|
NewState :: state().
|
||||||
|
|
||||||
|
failed(Conn, State = #s{mx = MX, cx = CX}) ->
|
||||||
{Realms, NextCX} = cx_failed(Conn, CX),
|
{Realms, NextCX} = cx_failed(Conn, CX),
|
||||||
{NewMX, NewCX} = ensure_connection(Realms, MX, NextCX),
|
{NewMX, NewCX} = ensure_connections(Realms, MX, NextCX),
|
||||||
State#s{mx = NewMX, cx = NewCX}.
|
State#s{mx = NewMX, cx = NewCX}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec disconnected(Conn, State) -> NewState
|
||||||
|
when Conn :: pid(),
|
||||||
|
State :: state(),
|
||||||
|
NewState :: state().
|
||||||
|
|
||||||
disconnected(Conn,
|
disconnected(Conn,
|
||||||
State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) ->
|
State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) ->
|
||||||
{Pending, LostSubs, ScrubbedCX} = cx_disconnected(Conn, CX),
|
{Pending, LostSubs, Unassigned, ScrubbedCX} = cx_disconnected(Conn, CX),
|
||||||
Unassigned = cx_unassigned(ScrubbedCX),
|
|
||||||
ReSubs = [{S, {subscribe, P}} || {S, P} <- LostSubs],
|
ReSubs = [{S, {subscribe, P}} || {S, P} <- LostSubs],
|
||||||
{Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests),
|
{Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests),
|
||||||
ReReqs = maps:to_list(Dequeued),
|
ReReqs = maps:to_list(Dequeued),
|
||||||
NewActions = ReReqs ++ ReSubs ++ Actions,
|
NewActions = ReReqs ++ ReSubs ++ Actions,
|
||||||
{NewMX, NewCX} = ensure_connection(Unassigned, ScrubbedMX, ScrubbedCX),
|
{NewMX, NewCX} = ensure_connections(Unassigned, MX, ScrubbedCX),
|
||||||
State#s{actions = NewActions,
|
State#s{actions = NewActions,
|
||||||
requests = NewRequests,
|
requests = NewRequests,
|
||||||
mx = NewMX,
|
mx = NewMX,
|
||||||
@ -848,32 +894,48 @@ dequeue(Pending) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec ensure_connection(Realms, MX, CX) -> {NewMX, NewCX}
|
-spec ensure_connections(Realms, MX, CX) -> {NewMX, NewCX}
|
||||||
when Realms :: [zx:realm()],
|
when Realms :: [zx:realm()],
|
||||||
MX :: monitor_index(),
|
MX :: monitor_index(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
NewMX :: monitor_index(),
|
NewMX :: monitor_index(),
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% Initiates a connection to a single realm at a time, taking into account whether
|
%% Check the list of unprovided realms with all available connections that can provide
|
||||||
%% connected but unassigned nodes are alterantive providers of the needed realm.
|
%% it, and allocate accordingly. If any realms cannot be provided by existing
|
||||||
%% Returns updated monitor and connection indices.
|
%% connections, new connections are initiated for all unprovided realms.
|
||||||
|
|
||||||
ensure_connection([Realm | Realms], MX, CX = #cx{realms = RMetas}) ->
|
ensure_connections(Realms, MX, CX) ->
|
||||||
{NewMX, NewCX} =
|
{NextCX, Unavailable} = reassign_conns(Realms, CX, []),
|
||||||
|
{ok, NewMX, NewCX} = init_connections(Unavailable, MX, NextCX),
|
||||||
|
{NewMX, NewCX}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec reassign_conns(Realms, CX, Unavailable) -> {NewCX, NewUnavailable}
|
||||||
|
when Realms :: [zx:realm()],
|
||||||
|
CX :: conn_index(),
|
||||||
|
Unavailable :: [zx:realm()],
|
||||||
|
NewCX :: conn_index(),
|
||||||
|
NewUnavailable :: [zx:realm()].
|
||||||
|
%% @private
|
||||||
|
%% Finds connections that provide a requested realm and assigns that connection to
|
||||||
|
%% take over realm provision. Returns the updated CX and a list of all the realms
|
||||||
|
%% that could not be provided by any available connection.
|
||||||
|
|
||||||
|
reassign_conns([Realm | Realms], CX = #cx{realms = RMetas}, Unassigned) ->
|
||||||
|
{NewUnassigned, NewCX} =
|
||||||
case maps:get(Realm, RMetas) of
|
case maps:get(Realm, RMetas) of
|
||||||
#rmeta{available = []} ->
|
#rmeta{available = []} ->
|
||||||
{ok, NextMX, NextCX} = init_connections([Realm], MX, CX),
|
{[Realm | Unassigned], CX};
|
||||||
{NextMX, NextCX};
|
|
||||||
Meta = #rmeta{available = [Conn | Conns]} ->
|
Meta = #rmeta{available = [Conn | Conns]} ->
|
||||||
NewMeta = Meta#rmeta{assigned = Conn, available = Conns},
|
NewMeta = Meta#rmeta{assigned = Conn, available = Conns},
|
||||||
NewRMetas = maps:put(Realm, NewMeta, RMetas),
|
NewRMetas = maps:put(Realm, NewMeta, RMetas),
|
||||||
NextCX = CX#cx{realms = NewRMetas},
|
NextCX = CX#cx{realms = NewRMetas},
|
||||||
{MX, NextCX}
|
{Unassigned, NextCX}
|
||||||
end,
|
end,
|
||||||
ensure_connection(Realms, NewMX, NewCX);
|
reassign_conns(Realms, NewCX, NewUnassigned);
|
||||||
ensure_connection([], MX, CX) ->
|
reassign_conns([], CX, Unassigned) ->
|
||||||
{MX, CX}.
|
{CX, Unassigned}.
|
||||||
|
|
||||||
|
|
||||||
-spec do_result(ID, Result, State) -> NewState
|
-spec do_result(ID, Result, State) -> NewState
|
||||||
@ -915,7 +977,7 @@ handle_orphan_result(ID, Result, Dropped) ->
|
|||||||
ok = log(info, Message, [ID, Request, Result]),
|
ok = log(info, Message, [ID, Request, Result]),
|
||||||
NewDropped;
|
NewDropped;
|
||||||
error ->
|
error ->
|
||||||
Message = "Received unknown request result ~tp: ~tp",
|
Message = "Received untracked request result ~tp: ~tp",
|
||||||
ok = log(warning, Message, [ID, Result]),
|
ok = log(warning, Message, [ID, Result]),
|
||||||
Dropped
|
Dropped
|
||||||
end.
|
end.
|
||||||
@ -954,6 +1016,16 @@ eval_queue(State = #s{actions = Actions}) ->
|
|||||||
eval_queue(InOrder, State#s{actions = []}).
|
eval_queue(InOrder, State#s{actions = []}).
|
||||||
|
|
||||||
|
|
||||||
|
-spec eval_queue(Actions, State) -> NewState
|
||||||
|
when Actions :: [action()],
|
||||||
|
State :: state(),
|
||||||
|
NewState :: state().
|
||||||
|
%% @private
|
||||||
|
%% This is essentially a big, gnarly fold over the action list with State as the
|
||||||
|
%% accumulator. It repacks the State#s.actions list with whatever requests were not
|
||||||
|
%% able to be handled and updates State in whatever way necessary according to the
|
||||||
|
%% handled requests.
|
||||||
|
|
||||||
eval_queue([], State) ->
|
eval_queue([], State) ->
|
||||||
State;
|
State;
|
||||||
eval_queue([Action = {request, Pid, ID, Message} | Rest],
|
eval_queue([Action = {request, Pid, ID, Message} | Rest],
|
||||||
@ -966,7 +1038,7 @@ eval_queue([Action = {request, Pid, ID, Message} | Rest],
|
|||||||
{Actions, NextRequests, NextMX, NextCX};
|
{Actions, NextRequests, NextMX, NextCX};
|
||||||
{result, Response} ->
|
{result, Response} ->
|
||||||
Pid ! Response,
|
Pid ! Response,
|
||||||
{Actions, Requests, CX};
|
{Actions, Requests, MX, CX};
|
||||||
wait ->
|
wait ->
|
||||||
NextActions = [Action | Actions],
|
NextActions = [Action | Actions],
|
||||||
NextMX = mx_add_monitor(Pid, requestor, MX),
|
NextMX = mx_add_monitor(Pid, requestor, MX),
|
||||||
@ -986,7 +1058,7 @@ eval_queue([Action = {subscribe, Pid, Package} | Rest],
|
|||||||
ok = zx_conn:subscribe(Conn, Package),
|
ok = zx_conn:subscribe(Conn, Package),
|
||||||
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
||||||
{Actions, NextMX, NextCX};
|
{Actions, NextMX, NextCX};
|
||||||
{have_sub, NextCX}
|
{have_sub, NextCX} ->
|
||||||
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
||||||
{Actions, NextMX, NextCX};
|
{Actions, NextMX, NextCX};
|
||||||
unassigned ->
|
unassigned ->
|
||||||
@ -1002,8 +1074,8 @@ eval_queue([{unsubscribe, Pid, Package} | Rest],
|
|||||||
{ok, NewMX} = mx_del_monitor(Pid, {subscription, Package}, MX),
|
{ok, NewMX} = mx_del_monitor(Pid, {subscription, Package}, MX),
|
||||||
NewCX =
|
NewCX =
|
||||||
case cx_del_sub(Pid, Package, CX) of
|
case cx_del_sub(Pid, Package, CX) of
|
||||||
{drop_sub, NextCX} ->
|
{{drop_sub, ConnPid}, NextCX} ->
|
||||||
ok = zx_conn:unsubscribe(Conn, Package),
|
ok = zx_conn:unsubscribe(ConnPid, Package),
|
||||||
NextCX;
|
NextCX;
|
||||||
{keep_sub, NextCX} ->
|
{keep_sub, NextCX} ->
|
||||||
NextCX;
|
NextCX;
|
||||||
@ -1026,10 +1098,13 @@ eval_queue([{unsubscribe, Pid, Package} | Rest],
|
|||||||
| wait,
|
| wait,
|
||||||
NewCX :: conn_index(),
|
NewCX :: conn_index(),
|
||||||
Response :: result().
|
Response :: result().
|
||||||
|
%% @private
|
||||||
|
%% Routes a request to the correct realm connector, if it is available. If it is not
|
||||||
|
%% available but configured it will return `wait' indicating that the caller should
|
||||||
|
%% repack the request and attempt to re-evaluate it later. If the realm is not
|
||||||
|
%% configured at all, the process is short-circuited by forming an error response
|
||||||
|
%% directly.
|
||||||
|
|
||||||
dispatch_request(list, ID, CX) ->
|
|
||||||
Realms = cx_realms(CX),
|
|
||||||
{result, ID, Realms};
|
|
||||||
dispatch_request(Action, ID, CX) ->
|
dispatch_request(Action, ID, CX) ->
|
||||||
Realm = element(2, Action),
|
Realm = element(2, Action),
|
||||||
case cx_pre_send(Realm, ID, CX) of
|
case cx_pre_send(Realm, ID, CX) of
|
||||||
@ -1062,27 +1137,20 @@ clear_monitor(Pid,
|
|||||||
mx = MX,
|
mx = MX,
|
||||||
cx = CX}) ->
|
cx = CX}) ->
|
||||||
case mx_crashed_monitor(Pid, MX) of
|
case mx_crashed_monitor(Pid, MX) of
|
||||||
{attempt, NextMX} ->
|
{attempt, NewMX} ->
|
||||||
failed(
|
failed(Pid, State#s{mx = NewMX});
|
||||||
{conn, NextMX} ->
|
{conn, NewMX} ->
|
||||||
disconnected(Pid, State#s{mx = NextMX});
|
disconnected(Pid, State#s{mx = NewMX});
|
||||||
{{Reqs, Subs}, NextMX} ->
|
{{Reqs, Subs}, NewMX} ->
|
||||||
case mx_lookup_category(Pid, MX) of
|
|
||||||
attempt ->
|
|
||||||
do_report(Pid, failed, State);
|
|
||||||
conn ->
|
|
||||||
do_report(Pid, disconnected, State);
|
|
||||||
{Reqs, Subs} ->
|
|
||||||
NewActions = drop_actions(Pid, Actions),
|
NewActions = drop_actions(Pid, Actions),
|
||||||
{NewDropped, NewRequests} = drop_requests(Pid, Dropped, Requests),
|
{NewDropped, NewRequests} = drop_requests(Pid, Dropped, Requests),
|
||||||
NewCX = cx_clear_client(Pid, Reqs, Subs, CX),
|
NewCX = cx_clear_client(Pid, Reqs, Subs, CX),
|
||||||
NewMX = mx_del_monitor(Pid, crashed, MX),
|
|
||||||
State#s{actions = NewActions,
|
State#s{actions = NewActions,
|
||||||
requests = NewRequests,
|
requests = NewRequests,
|
||||||
dropped = NewDropped,
|
dropped = NewDropped,
|
||||||
mx = NewMX,
|
mx = NewMX,
|
||||||
cx = NewCX};
|
cx = NewCX};
|
||||||
error ->
|
unknown ->
|
||||||
Unexpected = {'DOWN', Ref, process, Pid, Reason},
|
Unexpected = {'DOWN', Ref, process, Pid, Reason},
|
||||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||||
State
|
State
|
||||||
@ -1121,116 +1189,8 @@ drop_requests(ReqIDs, Dropped, Requests) ->
|
|||||||
lists:fold(Partition, {Dropped, Requests}, ReqIDs).
|
lists:fold(Partition, {Dropped, Requests}, ReqIDs).
|
||||||
|
|
||||||
|
|
||||||
%-spec do_query_latest(Object, State) -> {Result, NewState}
|
|
||||||
% when Object :: zx:package() | zx:package_id(),
|
|
||||||
% State :: state(),
|
|
||||||
% Result :: {ok, zx:version()}
|
|
||||||
% | {error, Reason},
|
|
||||||
% Reason :: bad_realm
|
|
||||||
% | bad_package
|
|
||||||
% | bad_version,
|
|
||||||
% NewState :: state().
|
|
||||||
%% @private
|
|
||||||
%% Queries a zomp realm for the latest version of a package or package
|
|
||||||
%% version (complete or incomplete version number).
|
|
||||||
%
|
|
||||||
%do_query_latest(Socket, {Realm, Name}) ->
|
|
||||||
% ok = zx_net:send(Socket, {latest, Realm, Name}),
|
|
||||||
% receive
|
|
||||||
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
|
||||||
% after 5000 -> {error, timeout}
|
|
||||||
% end;
|
|
||||||
%do_query_latest(Socket, {Realm, Name, Version}) ->
|
|
||||||
% ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
|
|
||||||
% receive
|
|
||||||
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
|
||||||
% after 5000 -> {error, timeout}
|
|
||||||
% end.
|
|
||||||
|
|
||||||
|
|
||||||
%-spec do_fetch(PackageIDs, State) -> NewState
|
|
||||||
% when PackageIDs :: [zx:package_id()],
|
|
||||||
% State :: state(),
|
|
||||||
% NewState :: state(),
|
|
||||||
% Result :: ok
|
|
||||||
% | {error, Reason},
|
|
||||||
% Reason :: bad_realm
|
|
||||||
% | bad_package
|
|
||||||
% | bad_version
|
|
||||||
% | network.
|
|
||||||
%% @private
|
|
||||||
%%
|
|
||||||
%
|
|
||||||
%do_fetch(PackageIDs, State) ->
|
|
||||||
% FIXME: Need to create a job queue divided by realm and dispatched to connectors,
|
|
||||||
% and cleared from the master pending queue kept here by the daemon as the
|
|
||||||
% workers succeed. Basic task queue management stuff... which never existed
|
|
||||||
% in ZX before... grrr...
|
|
||||||
% case scrub(PackageIDs) of
|
|
||||||
% [] ->
|
|
||||||
% ok;
|
|
||||||
% Needed ->
|
|
||||||
% Partitioned = partition_by_realm(Needed),
|
|
||||||
% EnsureDeps =
|
|
||||||
% fun({Realm, Packages}) ->
|
|
||||||
% ok = zx_conn:queue_package(Pid, Realm, Packages),
|
|
||||||
% log(info, "Disconnecting from realm: ~ts", [Realm])
|
|
||||||
% end,
|
|
||||||
% lists:foreach(EnsureDeps, Partitioned)
|
|
||||||
% end.
|
|
||||||
%
|
|
||||||
%
|
|
||||||
%partition_by_realm(PackageIDs) ->
|
|
||||||
% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs),
|
|
||||||
% maps:to_list(PartitionMap).
|
|
||||||
%
|
|
||||||
%
|
|
||||||
%partition_by_realm({R, P, V}, M) ->
|
|
||||||
% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M).
|
|
||||||
%
|
|
||||||
%
|
|
||||||
%ensure_deps(_, _, []) ->
|
|
||||||
% ok;
|
|
||||||
%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) ->
|
|
||||||
% ok = ensure_dep(Socket, {Realm, Name, Version}),
|
|
||||||
% ensure_deps(Socket, Realm, Rest).
|
|
||||||
%
|
|
||||||
%
|
|
||||||
%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return().
|
|
||||||
%% @private
|
|
||||||
%% Given an PackageID as an argument, check whether its package file exists in the
|
|
||||||
%% system cache, and if not download it. Should return `ok' whenever the file is
|
|
||||||
%% sourced, but exit with an error if it cannot locate or acquire the package.
|
|
||||||
%
|
|
||||||
%ensure_dep(Socket, PackageID) ->
|
|
||||||
% ZrpFile = filename:join("zrp", namify_zrp(PackageID)),
|
|
||||||
% ok =
|
|
||||||
% case filelib:is_regular(ZrpFile) of
|
|
||||||
% true -> ok;
|
|
||||||
% false -> fetch(Socket, PackageID)
|
|
||||||
% end,
|
|
||||||
% ok = install(PackageID),
|
|
||||||
% build(PackageID).
|
|
||||||
%
|
|
||||||
%
|
|
||||||
%-spec scrub(Deps) -> Scrubbed
|
|
||||||
% when Deps :: [package_id()],
|
|
||||||
% Scrubbed :: [package_id()].
|
|
||||||
%% @private
|
|
||||||
%% Take a list of dependencies and return a list of dependencies that are not yet
|
|
||||||
%% installed on the system.
|
|
||||||
%
|
|
||||||
%scrub([]) ->
|
|
||||||
% [];
|
|
||||||
%scrub(Deps) ->
|
|
||||||
% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps).
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% Monitor Index ADT Interface Functions
|
%%% Monitor Index ADT Interface Functions
|
||||||
%%%
|
|
||||||
%%% Very simple structure, but explicit handling of it becomes bothersome in other
|
|
||||||
%%% code, so it is all just packed down here.
|
|
||||||
|
|
||||||
-spec mx_new() -> monitor_index().
|
-spec mx_new() -> monitor_index().
|
||||||
%% @private
|
%% @private
|
||||||
@ -1288,7 +1248,7 @@ mx_upgrade_conn(Pid, MX) ->
|
|||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
Category :: attempt
|
Category :: attempt
|
||||||
| conn
|
| conn
|
||||||
| {requestor, id()},
|
| {requestor, id()}
|
||||||
| {subscriber, Sub :: tuple()},
|
| {subscriber, Sub :: tuple()},
|
||||||
MX :: monitor_index(),
|
MX :: monitor_index(),
|
||||||
NewMX :: monitor_index().
|
NewMX :: monitor_index().
|
||||||
@ -1311,12 +1271,12 @@ mx_del_monitor(Pid, {requestor, ID}, MX) ->
|
|||||||
true = demonitor(Ref, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
NextMX;
|
NextMX;
|
||||||
{{Ref, {Reqs, Subs}}, NextMX} when Reqs > 0 ->
|
{{Ref, {Reqs, Subs}}, NextMX} when Reqs > 0 ->
|
||||||
NewReqs = lists:subtract(ID, Reqs),
|
NewReqs = lists:delete(ID, Reqs),
|
||||||
maps:put(Pid, {NewReqs, Subs}, NextMX)
|
maps:put(Pid, {Ref, {NewReqs, Subs}}, NextMX)
|
||||||
end,
|
end;
|
||||||
mx_del_monitor(Pid, {subscriber, Sub}, MX) ->
|
mx_del_monitor(Pid, {subscriber, Sub}, MX) ->
|
||||||
case maps:take(Pid, MX) of
|
case maps:take(Pid, MX) of
|
||||||
{{Ref, {[], [Package]}}, NextMX} ->
|
{{Ref, {[], [Sub]}}, NextMX} ->
|
||||||
true = demonitor(Ref, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
NextMX;
|
NextMX;
|
||||||
{{Ref, {Reqs, Subs}}, NextMX} when Subs > 0 ->
|
{{Ref, {Reqs, Subs}}, NextMX} when Subs > 0 ->
|
||||||
@ -1338,27 +1298,10 @@ mx_del_monitor(Pid, {subscriber, Sub}, MX) ->
|
|||||||
mx_crashed_monitor(Pid, MX) ->
|
mx_crashed_monitor(Pid, MX) ->
|
||||||
case maps:take(Pid, MX) of
|
case maps:take(Pid, MX) of
|
||||||
{{Ref, Type}, NewMX} ->
|
{{Ref, Type}, NewMX} ->
|
||||||
true = demonitor(Mon, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
{Type, NewMX};
|
{Type, NewMX};
|
||||||
error ->
|
error ->
|
||||||
error
|
unknown
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
-spec mx_lookup_category(Pid, MX) -> Result
|
|
||||||
when Pid :: pid(),
|
|
||||||
MX :: monitor_index(),
|
|
||||||
Result :: attempt
|
|
||||||
| conn
|
|
||||||
| {Reqs :: [reference()], Subs :: [zx:package()]}
|
|
||||||
| error.
|
|
||||||
%% @private
|
|
||||||
%% Lookup a monitor's categories.
|
|
||||||
|
|
||||||
mx_lookup_category(Pid, MX) ->
|
|
||||||
case maps:find(Pid, MX) of
|
|
||||||
{ok, Mon} -> element(2, Mon);
|
|
||||||
error -> error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
@ -1672,7 +1615,7 @@ cx_connected(Available, Pid, CX = #cx{attempts = Attempts}) ->
|
|||||||
-spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX}
|
-spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX}
|
||||||
when A :: unassigned | assigned,
|
when A :: unassigned | assigned,
|
||||||
Available :: [{zx:realm(), zx:serial()}],
|
Available :: [{zx:realm(), zx:serial()}],
|
||||||
Conn :: {pid(), zx:host(), [zx:realm()]},
|
Conn :: connection(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
NewA :: unassigned | assigned,
|
NewA :: unassigned | assigned,
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
@ -1704,7 +1647,7 @@ cx_connected(A,
|
|||||||
-spec cx_connected(A, Realm, Conn, Meta, CX) -> {NewA, NewCX}
|
-spec cx_connected(A, Realm, Conn, Meta, CX) -> {NewA, NewCX}
|
||||||
when A :: unassigned | assigned,
|
when A :: unassigned | assigned,
|
||||||
Realm :: zx:host(),
|
Realm :: zx:host(),
|
||||||
Conn :: {pid(), zx:host(), [zx:realm()]},
|
Conn :: connection(),
|
||||||
Meta :: realm_meta(),
|
Meta :: realm_meta(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
NewA :: unassigned | assigned,
|
NewA :: unassigned | assigned,
|
||||||
@ -1785,6 +1728,12 @@ cx_failed(Conn, CX = #cx{attempts = Attempts}) ->
|
|||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
Unassigned :: [zx:realm()],
|
Unassigned :: [zx:realm()],
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
|
%% @private
|
||||||
|
%% Remove a redirected connection attempt from CX, add its redirect hosts to the
|
||||||
|
%% mirror queue, and proceed to make further connection atempts to all unassigned
|
||||||
|
%% realms. This can cause an inflationary number of new connection attempts, but this
|
||||||
|
%% is considered preferrable because the more mirrors fail the longer the user is
|
||||||
|
%% waiting and the more urgent the need to discover a working node becomes.
|
||||||
|
|
||||||
cx_redirect(Conn, Hosts, CX = #cx{attempts = Attempts}) ->
|
cx_redirect(Conn, Hosts, CX = #cx{attempts = Attempts}) ->
|
||||||
NewAttempts = lists:keydelete(Conn, 1, Attempts),
|
NewAttempts = lists:keydelete(Conn, 1, Attempts),
|
||||||
@ -1810,7 +1759,7 @@ cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) ->
|
|||||||
NewMeta = Meta#rmeta{mirrors = NewMirrors},
|
NewMeta = Meta#rmeta{mirrors = NewMirrors},
|
||||||
maps:put(R, NewMeta, Rs);
|
maps:put(R, NewMeta, Rs);
|
||||||
error ->
|
error ->
|
||||||
R
|
Rs
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
NewRealms = lists:foldl(Apply, Realms, Provided),
|
NewRealms = lists:foldl(Apply, Realms, Provided),
|
||||||
@ -1823,7 +1772,7 @@ cx_redirect([], CX) ->
|
|||||||
when CX :: conn_index(),
|
when CX :: conn_index(),
|
||||||
Unassigned :: [zx:realm()].
|
Unassigned :: [zx:realm()].
|
||||||
%% @private
|
%% @private
|
||||||
%% Scan the CX record for unassigned realms;return a list of all unassigned
|
%% Scan CX#cx.realms for unassigned realms and return a list of all unassigned
|
||||||
%% realm names.
|
%% realm names.
|
||||||
|
|
||||||
cx_unassigned(#cx{realms = Realms}) ->
|
cx_unassigned(#cx{realms = Realms}) ->
|
||||||
@ -1837,11 +1786,12 @@ cx_unassigned(#cx{realms = Realms}) ->
|
|||||||
maps:fold(NotAssigned, [], Realms).
|
maps:fold(NotAssigned, [], Realms).
|
||||||
|
|
||||||
|
|
||||||
-spec cx_disconnected(Conn, CX) -> {Requests, Subs, NewCX}
|
-spec cx_disconnected(Conn, CX) -> {Requests, Subs, Unassigned, NewCX}
|
||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
Requests :: [reference()],
|
Requests :: [id()],
|
||||||
Subs :: [zx:package()],
|
Subs :: [zx:package()],
|
||||||
|
Unassigned :: [zx:realm()],
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% An abstract data handler which is called whenever a connection terminates.
|
%% An abstract data handler which is called whenever a connection terminates.
|
||||||
@ -1854,7 +1804,8 @@ cx_disconnected(Pid, CX = #cx{realms = Realms, conns = Conns}) ->
|
|||||||
#conn{host = Host, requests = Requests, subs = Subs} = Conn,
|
#conn{host = Host, requests = Requests, subs = Subs} = Conn,
|
||||||
NewRealms = cx_scrub_assigned(Pid, Host, Realms),
|
NewRealms = cx_scrub_assigned(Pid, Host, Realms),
|
||||||
NewCX = CX#cx{realms = NewRealms, conns = NewConns},
|
NewCX = CX#cx{realms = NewRealms, conns = NewConns},
|
||||||
{Requests, Subs, NewCX}.
|
Unassigned = cx_unassigned(NewCX),
|
||||||
|
{Requests, Subs, Unassigned, NewCX}.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms
|
-spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms
|
||||||
@ -1960,11 +1911,11 @@ cx_add_sub(Subscriber, Channel, CX = #cx{conns = Conns}) ->
|
|||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
|
|
||||||
cx_maybe_new_sub(Conn = #conn{pid = ConnPid, subs = Subs},
|
cx_maybe_new_sub(Conn = #conn{pid = ConnPid, subs = Subs},
|
||||||
Sub = {Subscriber, Channel},
|
Sub = {_, Channel},
|
||||||
CX = #cx{conns = Conns}) ->
|
CX = #cx{conns = Conns}) ->
|
||||||
NewSubs = [{Subscriber, Channel} | Subs],
|
NewSubs = [Sub | Subs],
|
||||||
NewConn = Conn#conn{subs = NewSubs},
|
NewConn = Conn#conn{subs = NewSubs},
|
||||||
NewConns = [NewConn | NextConns],
|
NewConns = [NewConn | Conns],
|
||||||
NewCX = CX#cx{conns = NewConns},
|
NewCX = CX#cx{conns = NewConns},
|
||||||
case lists:keymember(Channel, 2, Subs) of
|
case lists:keymember(Channel, 2, Subs) of
|
||||||
false -> {need_sub, ConnPid, NewCX};
|
false -> {need_sub, ConnPid, NewCX};
|
||||||
@ -1990,25 +1941,35 @@ cx_del_sub(Subscriber, Channel, CX = #cx{conns = Conns}) ->
|
|||||||
case cx_resolve(Realm, CX) of
|
case cx_resolve(Realm, CX) of
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
{value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns),
|
{value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns),
|
||||||
cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns};
|
cx_maybe_last_sub(Conn, {Subscriber, Channel}, CX#cx{conns = NewConns});
|
||||||
Other ->
|
Other ->
|
||||||
Other
|
Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec cx_maybe_last_sub(Conn, Sub, CX) -> {Verdict, NewCX}
|
||||||
|
when Conn :: connection(),
|
||||||
|
Sub :: {pid(), term()},
|
||||||
|
CX :: conn_index(),
|
||||||
|
Verdict :: {drop_sub, Conn :: pid()} | keep_sub,
|
||||||
|
NewCX :: conn_index().
|
||||||
|
%% @private
|
||||||
|
%% Tells us whether a sub is still valid for any clients. If a sub is unsubbed by all
|
||||||
|
%% then it needs to be unsubscribed at the upstream node.
|
||||||
|
|
||||||
cx_maybe_last_sub(Conn = #conn{pid = ConnPid, subs = Subs},
|
cx_maybe_last_sub(Conn = #conn{pid = ConnPid, subs = Subs},
|
||||||
Sub = {Subscriber, Channel},
|
Sub = {_, Channel},
|
||||||
CX = #cx{conns = Conns}) ->
|
CX = #cx{conns = Conns}) ->
|
||||||
NewSubs = lists:delete(Sub, Subs),
|
NewSubs = lists:delete(Sub, Subs),
|
||||||
NewConn = Conn#conn{subs = NewSubs},
|
NewConn = Conn#conn{subs = NewSubs},
|
||||||
NewConns = [NewConn | NextConns],
|
NewConns = [NewConn | Conns],
|
||||||
NewCX = CX#cx{conns = NewConns},
|
NewCX = CX#cx{conns = NewConns},
|
||||||
MaybeDrop =
|
Verdict =
|
||||||
case lists:keymember(Channel, 2, NewSubs) of
|
case lists:keymember(Channel, 2, NewSubs) of
|
||||||
false -> drop_sub;
|
false -> {drop_sub, ConnPid};
|
||||||
true -> keep_sub
|
true -> keep_sub
|
||||||
end,
|
end,
|
||||||
{MaybeDrop, NewCX}.
|
{Verdict, NewCX}.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_get_subscribers(Conn, Channel, CX) -> Subscribers
|
-spec cx_get_subscribers(Conn, Channel, CX) -> Subscribers
|
||||||
@ -2022,6 +1983,16 @@ cx_get_subscribers(Conn, Channel, #cx{conns = Conns}) ->
|
|||||||
lists:fold(registered_to(Channel), [], Subs).
|
lists:fold(registered_to(Channel), [], Subs).
|
||||||
|
|
||||||
|
|
||||||
|
-spec registered_to(Channel) -> fun(({P, C}, A) -> NewA)
|
||||||
|
when Channel :: term(),
|
||||||
|
P :: pid(),
|
||||||
|
C :: term(),
|
||||||
|
A :: [pid()],
|
||||||
|
NewA :: [pid()].
|
||||||
|
%% @private
|
||||||
|
%% Matching function that closes over a given channel in a subscriber list.
|
||||||
|
%% This function exists mostly to make its parent function read nicely.
|
||||||
|
|
||||||
registered_to(Channel) ->
|
registered_to(Channel) ->
|
||||||
fun({P, C}, A) ->
|
fun({P, C}, A) ->
|
||||||
case C == Channel of
|
case C == Channel of
|
||||||
@ -2031,8 +2002,15 @@ registered_to(Channel) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec cx_clear_client(Pid, DeadReqs, DeadSubs, CX) -> NewCX
|
||||||
|
when Pid :: pid(),
|
||||||
|
DeadReqs :: [id()],
|
||||||
|
DeadSubs :: [term()],
|
||||||
|
CX :: conn_index(),
|
||||||
|
NewCX :: conn_index().
|
||||||
|
|
||||||
cx_clear_client(Pid, DeadReqs, DeadSubs, CX = #cx{conns = Conns}) ->
|
cx_clear_client(Pid, DeadReqs, DeadSubs, CX = #cx{conns = Conns}) ->
|
||||||
DropSubs = [{S, Pid} || S <- DeadSubs],
|
DropSubs = [{Pid, Sub} || Sub <- DeadSubs],
|
||||||
Clear =
|
Clear =
|
||||||
fun(C = #conn{requests = Requests, subs = Subs}) ->
|
fun(C = #conn{requests = Requests, subs = Subs}) ->
|
||||||
NewSubs = lists:subtract(Subs, DropSubs),
|
NewSubs = lists:subtract(Subs, DropSubs),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user