blah blah blah
This commit is contained in:
parent
62385cd088
commit
3dfa14d20c
56
TODO
56
TODO
@ -2,16 +2,50 @@
|
|||||||
On redirect a list of hosts of the form [{zx:host(), [zx:realm()]}] must be provided to the downstream client.
|
On redirect a list of hosts of the form [{zx:host(), [zx:realm()]}] must be provided to the downstream client.
|
||||||
This is the only way that downstream clients can determine which redirect hosts are useful to it.
|
This is the only way that downstream clients can determine which redirect hosts are useful to it.
|
||||||
|
|
||||||
- The ZX daemon should be able to retry requests that were submitted but did not receive a response before the relevant
|
- Connect attempts and established connections should have different report statuses.
|
||||||
connection was terminated for whatever reason.
|
An established connection does not necessarily have other attempts being made concurrently, so a termination should initiate 3 new connect attempts as init.
|
||||||
The most obvious way to do this would be to keep a set or queue of references in each connection monitor's section,
|
An attempt is just an attempt. It can fall flat and be replaced only 1::1.
|
||||||
clearing them when they receive responses, and pushing them back into the action queue (from the responses reference map)
|
|
||||||
when they fail.
|
|
||||||
|
|
||||||
- The same issue as the one above, but with subscriptions. Currently there is no obvious way to track what subscriptions flow
|
|
||||||
through which connections, and on termination or change of a connection there is no way to ensure that the subscription request
|
|
||||||
finds its way back into the action queue to resubmission once a realm becomes available again.
|
|
||||||
|
|
||||||
- The request tracking ref list is currently passing through the MX record. That is exactly the wrong place to put it.
|
|
||||||
It should DEFINITELY be in the CX record.
|
New Feature: ZX Universal lock
|
||||||
MOVE IT.
|
Cross-instance communication
|
||||||
|
We don't want multiple zx_daemons doing funny things to the filesystem at the same time.
|
||||||
|
We DO want to be able to run multiple ZX programs to be able to run at the same time.
|
||||||
|
The solution is to guarantee that there is only ever one zx_daemon running at a given time.
|
||||||
|
IPC is messy to get straight across various host systems, but network sockets to localhost work in a normal way.
|
||||||
|
The first zx_daemon to start up will check for a lock file in the $ZOMP_HOME directory.
|
||||||
|
If there is no lock file present it will write a lock file with a timestamp, begin listening on a local port, and then update the lock file with the listening port number.
|
||||||
|
If it finds a lock file in $ZOMP_HOME it will attempt to connect to the running zx_daemon on the port indicated in the lock file. If only a timestamp exists with no port number then it will wait two seconds from the time indicated in the timestamp in the lock file before re-reading it -- if the second read still contains no port number it will remove the lock file and assume the primary role for the system itself.
|
||||||
|
If a connection cannot be established then it will assume the instance that had written the lock file failed to delete it for Bad Reasons, remove the lock file, open a port, and write its own lock file, taking over as the lead zx_daemon.
|
||||||
|
|
||||||
|
Any new zx_daemons that come up in the system will establish a connection to the zx_daemon that wrote the lock file, and proxy all requests through that primary zx_daemon.
|
||||||
|
When a zx_daemon that is acting as primary for the system retires it will complete all ongoing actions first, then begin queueing requests without acting on them, designate the oldest peer as the new leader, get confirmation that the new leader has a port open, update the original lock file with the new port number, redirect all connections to the new zx_daemon, and then retire.
|
||||||
|
|
||||||
|
|
||||||
|
init(Args) ->
|
||||||
|
Stuff = do_stuff(),
|
||||||
|
Tries = 3,
|
||||||
|
Path = lock_file(),
|
||||||
|
case check_for_leader(Tries, Path) of
|
||||||
|
no_leader ->
|
||||||
|
{found, Socket} ->
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
check_for_leader(0, _) ->
|
||||||
|
no_leader;
|
||||||
|
check_for_leader(Tries, Path) ->
|
||||||
|
case file:open(Path, [write, exclusive]) of
|
||||||
|
{ok, FD} -> become_leader(FD);
|
||||||
|
{error, eexist} -> contact_leader()
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
case file:consult(Path) of
|
||||||
|
{ok, Data} ->
|
||||||
|
|
||||||
|
{error, enoexist} ->
|
||||||
|
check_file(Path);
|
||||||
|
end
|
||||||
|
end,
|
||||||
@ -105,7 +105,7 @@
|
|||||||
fetch/1, key/1,
|
fetch/1, key/1,
|
||||||
pending/1, packagers/1, maintainers/1, sysops/1]).
|
pending/1, packagers/1, maintainers/1, sysops/1]).
|
||||||
-export([report/1, result/2, notify/2]).
|
-export([report/1, result/2, notify/2]).
|
||||||
-export([start_link/0]).
|
-export([start_link/0, stop/0]).
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
code_change/3, terminate/2]).
|
code_change/3, terminate/2]).
|
||||||
|
|
||||||
@ -119,7 +119,7 @@
|
|||||||
{meta = none :: none | zx:package_meta(),
|
{meta = none :: none | zx:package_meta(),
|
||||||
home = none :: none | file:filename(),
|
home = none :: none | file:filename(),
|
||||||
argv = none :: none | [string()],
|
argv = none :: none | [string()],
|
||||||
actions = [] :: [request() | {reference(), request()}],
|
actions = [] :: [{pid(), subunsub()} | {reference(), request()}],
|
||||||
requests = maps:new() :: #{reference() := request()},
|
requests = maps:new() :: #{reference() := request()},
|
||||||
subs = [] :: [{pid(), zx:package()}],
|
subs = [] :: [{pid(), zx:package()}],
|
||||||
mx = mx_new() :: monitor_index(),
|
mx = mx_new() :: monitor_index(),
|
||||||
@ -155,7 +155,8 @@
|
|||||||
|
|
||||||
%% State Types
|
%% State Types
|
||||||
-type state() :: #s{}.
|
-type state() :: #s{}.
|
||||||
-type monitor_index() :: [{reference(), pid(), category()}].
|
%-type monitor_index() :: [{reference(), pid(), category()}].
|
||||||
|
-type monitor_index() :: #{pid() := {reference(), category()}}.
|
||||||
-type conn_index() :: #cx{}.
|
-type conn_index() :: #cx{}.
|
||||||
-type realm_meta() :: #rmeta{}.
|
-type realm_meta() :: #rmeta{}.
|
||||||
-type connection() :: #conn{}.
|
-type connection() :: #conn{}.
|
||||||
@ -398,7 +399,7 @@ request(Action) ->
|
|||||||
-spec report(Message) -> ok
|
-spec report(Message) -> ok
|
||||||
when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
|
when Message :: {connected, Realms :: [{zx:realm(), zx:serial()}]}
|
||||||
| {redirect, Hosts :: [{zx:host(), [zx:realm()]}]}
|
| {redirect, Hosts :: [{zx:host(), [zx:realm()]}]}
|
||||||
| failed
|
| aborted
|
||||||
| disconnected.
|
| disconnected.
|
||||||
%% @private
|
%% @private
|
||||||
%% Should only be called by a zx_conn. This function is how a zx_conn reports its
|
%% Should only be called by a zx_conn. This function is how a zx_conn reports its
|
||||||
@ -413,7 +414,7 @@ report(Message) ->
|
|||||||
%% Return a tagged result back to the daemon to be forwarded to the original requestor.
|
%% Return a tagged result back to the daemon to be forwarded to the original requestor.
|
||||||
|
|
||||||
result(Reference, Result) ->
|
result(Reference, Result) ->
|
||||||
gen_server:cast(?MODULE, {result, self(), Reference, Result}).
|
gen_server:cast(?MODULE, {result, Reference, Result}).
|
||||||
|
|
||||||
|
|
||||||
-spec notify(Package, Message) -> ok
|
-spec notify(Package, Message) -> ok
|
||||||
@ -490,6 +491,17 @@ init_connections([], MX, CX) ->
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%% Shutdown
|
||||||
|
|
||||||
|
-spec stop() -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% A polite way to shut down the daemon without doing a bunch of vile things.
|
||||||
|
|
||||||
|
stop() ->
|
||||||
|
gen_server:cast(?MODULE, stop).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% gen_server
|
%%% gen_server
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -522,13 +534,15 @@ handle_cast({report, Conn, Message}, State) ->
|
|||||||
NextState = do_report(Conn, Message, State),
|
NextState = do_report(Conn, Message, State),
|
||||||
NewState = eval_queue(NextState),
|
NewState = eval_queue(NextState),
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
handle_cast({result, Conn, Ref, Result}, State) ->
|
handle_cast({result, Ref, Result}, State) ->
|
||||||
NextState = do_result(Conn, Ref, Result, State),
|
NextState = do_result(Ref, Result, State),
|
||||||
NewState = eval_queue(NextState),
|
NewState = eval_queue(NextState),
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
handle_cast({notice, Package, Update}, State) ->
|
handle_cast({notice, Package, Update}, State) ->
|
||||||
ok = do_notice(Package, Update, State),
|
ok = do_notice(Package, Update, State),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
handle_cast(stop, State) ->
|
||||||
|
{stop, normal, State};
|
||||||
handle_cast(Unexpected, State) ->
|
handle_cast(Unexpected, State) ->
|
||||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
@ -554,8 +568,15 @@ code_change(_, State, _) ->
|
|||||||
%% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean
|
%% gen_server callback to handle shutdown/cleanup tasks on receipt of a clean
|
||||||
%% termination request.
|
%% termination request.
|
||||||
|
|
||||||
terminate(_, _) ->
|
terminate(normal, #s{cx = CX}) ->
|
||||||
ok.
|
ok = log(info, "zx_daemon shutting down..."),
|
||||||
|
case cx_store_cache(CX) of
|
||||||
|
ok ->
|
||||||
|
log(info, "Cache written.");
|
||||||
|
{error, Reason} ->
|
||||||
|
Message = "Cache write failed with ~tp",
|
||||||
|
log(error, Message, [Reason])
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -641,28 +662,49 @@ do_report(Conn, {redirect, Hosts}, State = #s{mx = MX, cx = CX}) ->
|
|||||||
{Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX),
|
{Unassigned, NextCX} = cx_redirect(Conn, Hosts, CX),
|
||||||
{NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX),
|
{NewMX, NewCX} = init_connection(Unassigned, NextMX, NextCX),
|
||||||
State#s{mx = NewMX, cx = NewCX};
|
State#s{mx = NewMX, cx = NewCX};
|
||||||
do_report(Conn,
|
do_report(Conn, aborted, State = #s{mx = MX, cx = CX}) ->
|
||||||
disconnected,
|
|
||||||
State = #s{actions = Actions, requests = Requests, mx = MX, cx = CX}) ->
|
|
||||||
case mx_lookup_category(Conn, MX) of
|
|
||||||
attempt ->
|
|
||||||
{ok, NewMX} = mx_del_monitor(Conn, attempt, MX),
|
{ok, NewMX} = mx_del_monitor(Conn, attempt, MX),
|
||||||
NewCX = cx_failed(Conn, CX),
|
{Realms, NextCX} = cx_failed(Conn, CX),
|
||||||
|
{NewMX, NewCX} = init_connection(Realms, NextCX),
|
||||||
State#s{mx = NewMX, cx = NewCX};
|
State#s{mx = NewMX, cx = NewCX};
|
||||||
conn ->
|
do_report(Conn, disconnected, State) ->
|
||||||
{ok, ScrubbedMX, ReqRefs} = mx_del_monitor(Conn, conn, MX),
|
ScrubbedMX = mx_del_monitor(Conn, conn, MX),
|
||||||
ScrubbedCX = cx_disconnected(Conn, CX),
|
{Pending, LostSubs, ScrubbedCX} = cx_disconnected(Conn, CX),
|
||||||
Unassigned = cx_unassigned(ScrubbedCX),
|
Unassigned = cx_unassigned(ScrubbedCX),
|
||||||
NewActions = maps:to_list(maps:with(ReqRefs, Requests)) ++ Actions,
|
UnSub = fun(S) -> lists:member(element(2, S), LostSubs) end,
|
||||||
NewRequests = maps:without(ReqRefs, Requests),
|
{UnSubbed, NewSubs} = lists:partition(UnSub, Subs),
|
||||||
|
ReSubs = [{S, {subscribe, P}} || {S, P} <- UnSubbed],
|
||||||
|
{Dequeued, NewRequests} = maps:fold(dequeue(Pending), {#{}, #{}}, Requests),
|
||||||
|
ReReqs = maps:to_list(Dequeued),
|
||||||
|
NewActions = ReReqs ++ ReSubs ++ Actions,
|
||||||
{NewMX, NewCX} = init_connection(Unassigned, ScrubbedMX, ScrubbedCX),
|
{NewMX, NewCX} = init_connection(Unassigned, ScrubbedMX, ScrubbedCX),
|
||||||
State#s{actions = NewActions,
|
State#s{actions = NewActions,
|
||||||
requests = NewRequests,
|
requests = NewRequests,
|
||||||
|
subs = NewSubs,
|
||||||
mx = NewMX,
|
mx = NewMX,
|
||||||
cx = NewCX}
|
cx = NewCX}.
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
|
-spec dequeue(Pending) -> fun(K, V, {D, R}) -> {NewD, NewR}
|
||||||
|
when Pending :: [reference()],
|
||||||
|
K :: reference(),
|
||||||
|
V :: request(),
|
||||||
|
D :: #{reference(), request()},
|
||||||
|
R :: #{reference(), request()},
|
||||||
|
NewD :: #{reference(), request()},
|
||||||
|
NewR :: #{reference(), request()}.
|
||||||
|
%% @private
|
||||||
|
%% Return a function that partitions the current Request map into two maps, one that
|
||||||
|
%% matches the closed `Pending' list of references and ones that don't.
|
||||||
|
|
||||||
|
dequeue(Pending) ->
|
||||||
|
fun(K, V, {D, R}) ->
|
||||||
|
case lists:member(K, Pending) of
|
||||||
|
true -> {maps:put(K, V, D), R};
|
||||||
|
false -> {D, maps:put(K, V, R)}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec init_connection(Realms, MX, CX) -> {NewMX, NewCX}
|
-spec init_connection(Realms, MX, CX) -> {NewMX, NewCX}
|
||||||
when Realms :: [zx:realm()],
|
when Realms :: [zx:realm()],
|
||||||
@ -692,27 +734,26 @@ init_connection([], MX, CX) ->
|
|||||||
{MX, CX}.
|
{MX, CX}.
|
||||||
|
|
||||||
|
|
||||||
-spec do_result(Conn, Ref, Result, State) -> NewState
|
-spec do_result(Reference, Result, State) -> NewState
|
||||||
when Conn :: pid(),
|
when Reference :: reference(),
|
||||||
Ref :: reference(),
|
|
||||||
Result :: result(),
|
Result :: result(),
|
||||||
State :: state(),
|
State :: state(),
|
||||||
NewState :: state().
|
NewState :: state().
|
||||||
%% @private
|
%% @private
|
||||||
%% Receive the result of a sent request and route it back to the original requestor.
|
%% Receive the result of a sent request and route it back to the original requestor.
|
||||||
|
|
||||||
do_result(Conn, Ref, Result, State = #s{requests = Requests, mx = MX}) ->
|
do_result(Reference, Result, State = #s{requests = Requests}) ->
|
||||||
NewRequests =
|
NewRequests =
|
||||||
case maps:take(Ref, Requests) of
|
case maps:take(Reference, Requests) of
|
||||||
{{Req, {Type, Object}}, NextRequests} ->
|
{{Requestor, {Type, Object}}, NextRequests} ->
|
||||||
Req ! {result, {Type, Object, Result}},
|
Requestor ! {result, {Type, Object, Result}},
|
||||||
NextRequests;
|
NextRequests;
|
||||||
error ->
|
error ->
|
||||||
ok = log(warning, "Received unqueued result ~tp:~tp", [Ref, Result]),
|
Message = "Received unqueued result ~tp: ~tp",
|
||||||
|
ok = log(warning, Message, [Reference, Result]),
|
||||||
Requests
|
Requests
|
||||||
end,
|
end,
|
||||||
NewMX = mx_clear_request(Conn, Ref, MX),
|
State#s{requests = NewRequests}.
|
||||||
State#s{requests = NewRequests, mx = NewMX}.
|
|
||||||
|
|
||||||
|
|
||||||
-spec do_notice(Package, Update, State) -> ok
|
-spec do_notice(Package, Update, State) -> ok
|
||||||
@ -754,7 +795,7 @@ eval_queue([Action = {Pid, {subscribe, Package}} | Rest],
|
|||||||
{NewActions, NewSubs, NewMX} =
|
{NewActions, NewSubs, NewMX} =
|
||||||
case cx_resolve(Realm, CX) of
|
case cx_resolve(Realm, CX) of
|
||||||
{ok, Conn} ->
|
{ok, Conn} ->
|
||||||
ok = zx_conn:subscribe(Package),
|
ok = zx_conn:subscribe(Conn, Package),
|
||||||
NextSubs = [{Pid, Package} | Subs],
|
NextSubs = [{Pid, Package} | Subs],
|
||||||
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
NextMX = mx_add_monitor(Pid, subscriber, MX),
|
||||||
{Actions, NextSubs, NextMX};
|
{Actions, NextSubs, NextMX};
|
||||||
@ -775,7 +816,7 @@ eval_queue([Action = {Pid, {unsubscribe, Package}} | Rest],
|
|||||||
Realm = element(1, Package),
|
Realm = element(1, Package),
|
||||||
ok =
|
ok =
|
||||||
case cx_resolve(Realm, CX) of
|
case cx_resolve(Realm, CX) of
|
||||||
{ok, Conn} -> cx_conn:unsubscribe(Package);
|
{ok, Conn} -> cx_conn:unsubscribe(Conn, Package);
|
||||||
unassigned -> ok
|
unassigned -> ok
|
||||||
end,
|
end,
|
||||||
eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX});
|
eval_queue(Rest, State#s{actions = NewActions, subs = NewSubs, mx = NewMX});
|
||||||
@ -836,121 +877,109 @@ send_result(Pid, Ref, Message) ->
|
|||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
||||||
-spec do_query_latest(Object, State) -> {Result, NewState}
|
%-spec do_query_latest(Object, State) -> {Result, NewState}
|
||||||
when Object :: zx:package() | zx:package_id(),
|
% when Object :: zx:package() | zx:package_id(),
|
||||||
State :: state(),
|
% State :: state(),
|
||||||
Result :: {ok, zx:version()}
|
% Result :: {ok, zx:version()}
|
||||||
| {error, Reason},
|
% | {error, Reason},
|
||||||
Reason :: bad_realm
|
% Reason :: bad_realm
|
||||||
| bad_package
|
% | bad_package
|
||||||
| bad_version,
|
% | bad_version,
|
||||||
NewState :: state().
|
% NewState :: state().
|
||||||
%% @private
|
%% @private
|
||||||
%% Queries a zomp realm for the latest version of a package or package
|
%% Queries a zomp realm for the latest version of a package or package
|
||||||
%% version (complete or incomplete version number).
|
%% version (complete or incomplete version number).
|
||||||
|
%
|
||||||
do_query_latest(Socket, {Realm, Name}) ->
|
%do_query_latest(Socket, {Realm, Name}) ->
|
||||||
ok = zx_net:send(Socket, {latest, Realm, Name}),
|
% ok = zx_net:send(Socket, {latest, Realm, Name}),
|
||||||
receive
|
% receive
|
||||||
{tcp, Socket, Bin} -> binary_to_term(Bin)
|
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
||||||
after 5000 -> {error, timeout}
|
% after 5000 -> {error, timeout}
|
||||||
end;
|
% end;
|
||||||
do_query_latest(Socket, {Realm, Name, Version}) ->
|
%do_query_latest(Socket, {Realm, Name, Version}) ->
|
||||||
ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
|
% ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
|
||||||
receive
|
% receive
|
||||||
{tcp, Socket, Bin} -> binary_to_term(Bin)
|
% {tcp, Socket, Bin} -> binary_to_term(Bin)
|
||||||
after 5000 -> {error, timeout}
|
% after 5000 -> {error, timeout}
|
||||||
end.
|
% end.
|
||||||
|
|
||||||
|
|
||||||
-spec do_unsubscribe(State) -> {ok, NewState}
|
%-spec do_fetch(PackageIDs, State) -> NewState
|
||||||
when State :: state(),
|
% when PackageIDs :: [zx:package_id()],
|
||||||
NewState :: state().
|
% State :: state(),
|
||||||
|
% NewState :: state(),
|
||||||
do_unsubscribe(State = #s{connp = none}) ->
|
% Result :: ok
|
||||||
{ok, State};
|
% | {error, Reason},
|
||||||
do_unsubscribe(State = #s{connp = ConnP, connm = ConnM}) ->
|
% Reason :: bad_realm
|
||||||
true = demonitor(ConnM),
|
% | bad_package
|
||||||
ok = zx_conn:stop(ConnP),
|
% | bad_version
|
||||||
NewState = State#s{realm = none, name = none, version = none,
|
% | network.
|
||||||
connp = ConnP, connm = ConnM},
|
|
||||||
{ok, NewState}.
|
|
||||||
|
|
||||||
|
|
||||||
-spec do_fetch(PackageIDs) -> Result
|
|
||||||
when PackageIDs :: [zx:package_id()],
|
|
||||||
Result :: ok
|
|
||||||
| {error, Reason},
|
|
||||||
Reason :: bad_realm
|
|
||||||
| bad_package
|
|
||||||
| bad_version
|
|
||||||
| network.
|
|
||||||
%% @private
|
%% @private
|
||||||
%%
|
%%
|
||||||
|
%
|
||||||
do_fetch(PackageIDs, State) ->
|
%do_fetch(PackageIDs, State) ->
|
||||||
% FIXME: Need to create a job queue divided by realm and dispatched to connectors,
|
% FIXME: Need to create a job queue divided by realm and dispatched to connectors,
|
||||||
% and cleared from the master pending queue kept here by the daemon as the
|
% and cleared from the master pending queue kept here by the daemon as the
|
||||||
% workers succeed. Basic task queue management stuff... which never existed
|
% workers succeed. Basic task queue management stuff... which never existed
|
||||||
% in ZX before... grrr...
|
% in ZX before... grrr...
|
||||||
case scrub(PackageIDs) of
|
% case scrub(PackageIDs) of
|
||||||
[] ->
|
% [] ->
|
||||||
ok;
|
% ok;
|
||||||
Needed ->
|
% Needed ->
|
||||||
Partitioned = partition_by_realm(Needed),
|
% Partitioned = partition_by_realm(Needed),
|
||||||
EnsureDeps =
|
% EnsureDeps =
|
||||||
fun({Realm, Packages}) ->
|
% fun({Realm, Packages}) ->
|
||||||
ok = zx_conn:queue_package(Pid, Realm, Packages),
|
% ok = zx_conn:queue_package(Pid, Realm, Packages),
|
||||||
log(info, "Disconnecting from realm: ~ts", [Realm])
|
% log(info, "Disconnecting from realm: ~ts", [Realm])
|
||||||
end,
|
% end,
|
||||||
lists:foreach(EnsureDeps, Partitioned)
|
% lists:foreach(EnsureDeps, Partitioned)
|
||||||
end.
|
% end.
|
||||||
|
%
|
||||||
|
%
|
||||||
partition_by_realm(PackageIDs) ->
|
%partition_by_realm(PackageIDs) ->
|
||||||
PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs),
|
% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs),
|
||||||
maps:to_list(PartitionMap).
|
% maps:to_list(PartitionMap).
|
||||||
|
%
|
||||||
|
%
|
||||||
partition_by_realm({R, P, V}, M) ->
|
%partition_by_realm({R, P, V}, M) ->
|
||||||
maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M).
|
% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M).
|
||||||
|
%
|
||||||
|
%
|
||||||
ensure_deps(_, _, []) ->
|
%ensure_deps(_, _, []) ->
|
||||||
ok;
|
% ok;
|
||||||
ensure_deps(Socket, Realm, [{Name, Version} | Rest]) ->
|
%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) ->
|
||||||
ok = ensure_dep(Socket, {Realm, Name, Version}),
|
% ok = ensure_dep(Socket, {Realm, Name, Version}),
|
||||||
ensure_deps(Socket, Realm, Rest).
|
% ensure_deps(Socket, Realm, Rest).
|
||||||
|
%
|
||||||
|
%
|
||||||
-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return().
|
%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return().
|
||||||
%% @private
|
%% @private
|
||||||
%% Given an PackageID as an argument, check whether its package file exists in the
|
%% Given an PackageID as an argument, check whether its package file exists in the
|
||||||
%% system cache, and if not download it. Should return `ok' whenever the file is
|
%% system cache, and if not download it. Should return `ok' whenever the file is
|
||||||
%% sourced, but exit with an error if it cannot locate or acquire the package.
|
%% sourced, but exit with an error if it cannot locate or acquire the package.
|
||||||
|
%
|
||||||
ensure_dep(Socket, PackageID) ->
|
%ensure_dep(Socket, PackageID) ->
|
||||||
ZrpFile = filename:join("zrp", namify_zrp(PackageID)),
|
% ZrpFile = filename:join("zrp", namify_zrp(PackageID)),
|
||||||
ok =
|
% ok =
|
||||||
case filelib:is_regular(ZrpFile) of
|
% case filelib:is_regular(ZrpFile) of
|
||||||
true -> ok;
|
% true -> ok;
|
||||||
false -> fetch(Socket, PackageID)
|
% false -> fetch(Socket, PackageID)
|
||||||
end,
|
% end,
|
||||||
ok = install(PackageID),
|
% ok = install(PackageID),
|
||||||
build(PackageID).
|
% build(PackageID).
|
||||||
|
%
|
||||||
|
%
|
||||||
-spec scrub(Deps) -> Scrubbed
|
%-spec scrub(Deps) -> Scrubbed
|
||||||
when Deps :: [package_id()],
|
% when Deps :: [package_id()],
|
||||||
Scrubbed :: [package_id()].
|
% Scrubbed :: [package_id()].
|
||||||
%% @private
|
%% @private
|
||||||
%% Take a list of dependencies and return a list of dependencies that are not yet
|
%% Take a list of dependencies and return a list of dependencies that are not yet
|
||||||
%% installed on the system.
|
%% installed on the system.
|
||||||
|
%
|
||||||
scrub([]) ->
|
%scrub([]) ->
|
||||||
[];
|
% [];
|
||||||
scrub(Deps) ->
|
%scrub(Deps) ->
|
||||||
lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps).
|
% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -963,33 +992,40 @@ scrub(Deps) ->
|
|||||||
%% @private
|
%% @private
|
||||||
%% Returns a new, empty monitor index.
|
%% Returns a new, empty monitor index.
|
||||||
|
|
||||||
mx_new() -> [].
|
mx_new() ->
|
||||||
|
maps:new().
|
||||||
|
|
||||||
|
|
||||||
-spec mx_add_monitor(pid(), category(), monitor_index()) -> monitor_index().
|
-spec mx_add_monitor(Pid, Category, MX) -> NewMX
|
||||||
|
when Pid :: pid(),
|
||||||
|
Category :: subscriber
|
||||||
|
| requestor
|
||||||
|
| attempt,
|
||||||
|
MX :: monitor_index(),
|
||||||
|
NewMX :: monitor_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% Begin monitoring the given Pid, keeping track of its category.
|
%% Begin monitoring the given Pid, keeping track of its category.
|
||||||
|
|
||||||
mx_add_monitor(Pid, subscriber, MX) ->
|
mx_add_monitor(Pid, subscriber, MX) ->
|
||||||
case lists:keytake(Pid, 2, MX) of
|
case maps:take(Pid, MX) of
|
||||||
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
|
{{Ref, {Subs, Reqs}}, NextMX} ->
|
||||||
[{Ref, Pid, {Subs + 1, Reqs}} | NextMX];
|
maps:put(Pid, {Ref, {Subs + 1, Reqs}}, NextMX);
|
||||||
false ->
|
error ->
|
||||||
Ref = monitor(process, Pid),
|
Ref = monitor(process, Pid),
|
||||||
[{Ref, Pid, {1, 0}} | MX]
|
maps:put(Pid, {Ref, {1, 0}}, MX)
|
||||||
end;
|
end;
|
||||||
mx_add_monitor(Pid, requestor, MX) ->
|
mx_add_monitor(Pid, requestor, MX) ->
|
||||||
case lists:keytake(Pid, 2, MX) of
|
case maps:take(Pid, MX) of
|
||||||
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} ->
|
{{Ref, {Subs, Reqs}}, NextMX} ->
|
||||||
[{Ref, Pid, {Subs, Reqs + 1}} | NextMX];
|
maps:put(Pid, {Ref, {Subs, Reqs + 1}}, NextMX);
|
||||||
false ->
|
error ->
|
||||||
Ref = monitor(process, Pid),
|
Ref = monitor(process, Pid),
|
||||||
[{Ref, Pid, {0, 1}} | MX]
|
maps:put(Pid, {Ref, {0, 1}}, MX)
|
||||||
end;
|
end;
|
||||||
mx_add_monitor(Pid, attempt, MX) ->
|
mx_add_monitor(Pid, attempt, MX) ->
|
||||||
false = lists:keymember(Pid, 2, MX),
|
false = maps:is_key(Pid, MX),
|
||||||
Ref = monitor(process, Pid),
|
Ref = monitor(process, Pid),
|
||||||
[{Ref, Pid, attempt} | MX].
|
maps:put(Pid, {Ref, attempt}, MX).
|
||||||
|
|
||||||
|
|
||||||
-spec mx_upgrade_conn(Pid, MX) -> NewMX
|
-spec mx_upgrade_conn(Pid, MX) -> NewMX
|
||||||
@ -1000,88 +1036,49 @@ mx_add_monitor(Pid, attempt, MX) ->
|
|||||||
%% Upgrade an `attempt' monitor to a `conn' monitor.
|
%% Upgrade an `attempt' monitor to a `conn' monitor.
|
||||||
|
|
||||||
mx_upgrade_conn(Pid, MX) ->
|
mx_upgrade_conn(Pid, MX) ->
|
||||||
{value, {Ref, Pid, attempt}, NextMX} = lists:keytake(Pid, 2, MX),
|
{{Ref, attempt}, NextMX} = maps:take(Pid, MX),
|
||||||
[{Ref, Pid, {conn, []}} | NextMX].
|
maps:put(Pid, {Ref, conn}, NextMX).
|
||||||
|
|
||||||
|
|
||||||
-spec mx_del_monitor(Conn, Category, MX) -> Result
|
-spec mx_del_monitor(Conn, Category, MX) -> NewMX
|
||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
Category :: subscriber
|
Category :: subscriber
|
||||||
| requestor
|
| requestor
|
||||||
| attempt
|
| attempt
|
||||||
| conn,
|
| conn,
|
||||||
MX :: monitor_index(),
|
MX :: monitor_index(),
|
||||||
Result :: {ok, NewMX}
|
NewMX :: monitor_index().
|
||||||
| {ok, NewMX, RequestRefs},
|
|
||||||
NewMX :: monitor_index(),
|
|
||||||
RequestRefs :: [reference()].
|
|
||||||
|
|
||||||
-spec mx_del_monitor(pid(), category(), monitor_index()) -> monitor_index().
|
|
||||||
%% @private
|
%% @private
|
||||||
%% Drop a monitor category, removing the entire monitor in the case only one category
|
%% Drop a monitor category, removing the entire monitor in the case only one category
|
||||||
%% exists. Returns a tuple including the remaining request references in the case of
|
%% exists. Returns a tuple including the remaining request references in the case of
|
||||||
%% a conn type.
|
%% a conn type.
|
||||||
|
|
||||||
mx_del_monitor(Pid, subscriber, MX) ->
|
mx_del_monitor(Pid, subscriber, MX) ->
|
||||||
NewMX =
|
case maps:take(Pid, MX) of
|
||||||
case lists:keytake(Pid, 2, MX) of
|
{{Ref, {1, 0}}, NewMX} ->
|
||||||
{value, {Ref, Pid, {1, 0}}, NextMX} ->
|
|
||||||
true = demonitor(Ref, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
NextMX;
|
NewMX;
|
||||||
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} when Subs > 0 ->
|
{{Ref, {Subs, Reqs}}, NextMX} when Subs > 0 ->
|
||||||
[{Ref, Pid, {Subs - 1, Reqs}} | NextMX]
|
maps:put(Pid, {Ref, {Subs - 1, Reqs}}, NextMX)
|
||||||
end,
|
end;
|
||||||
{ok, NewMX};
|
|
||||||
mx_del_monitor(Pid, requestor, MX) ->
|
mx_del_monitor(Pid, requestor, MX) ->
|
||||||
NewMX =
|
case maps:take(Pid, MX) of
|
||||||
case lists:keytake(Pid, 2, MX) of
|
{{Ref, {0, 1}}, NewMX} ->
|
||||||
{value, {Ref, Pid, {0, 1}}, NextMX} ->
|
|
||||||
true = demonitor(Ref, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
NextMX;
|
NewMX;
|
||||||
{value, {Ref, Pid, {Subs, Reqs}}, NextMX} when Reqs > 0 ->
|
{{Ref, {Subs, Reqs}}, NextMX} when Reqs > 0 ->
|
||||||
[{Ref, Pid, {Subs, Reqs - 1}} | NextMX]
|
maps:put(Pid, {Ref, {Subs, Reqs - 1}}, NextMX)
|
||||||
end,
|
end;
|
||||||
{ok, NewMX};
|
mx_del_monitor(Pid, Category, MX) ->
|
||||||
mx_del_monitor(Pid, attempt, MX) ->
|
{{Ref, Category}, NewMX} = maps;take(Pid, MX),
|
||||||
{value, {Ref, Pid, attempt}, NewMX} = lists:keytake(Pid, 2, MX),
|
|
||||||
true = demonitor(Ref, [flush]),
|
true = demonitor(Ref, [flush]),
|
||||||
{ok, NewMX};
|
NewMX.
|
||||||
mx_del_monitor(Pid, conn, MX) ->
|
|
||||||
{value, {Ref, Pid, {conn, RequestRefs}}, NewMX} = lists:keytake(Pid, 2, MX),
|
|
||||||
true = demonitor(Ref, [flush]),
|
|
||||||
{ok, NewMX, RequestRefs}.
|
|
||||||
|
|
||||||
|
|
||||||
-spec mx_stash_request(Conn, RequestRef, MX) -> NewMX
|
-spec mx_lookup_category(Pid, MX) -> Result
|
||||||
when Conn :: pid(),
|
when Pid :: pid(),
|
||||||
RequestRef :: reference(),
|
|
||||||
MX :: monitor_index(),
|
MX :: monitor_index(),
|
||||||
NewMX :: monitor_index().
|
Result :: attempt
|
||||||
%% @private
|
|
||||||
%% Add a pending request reference to the connection monitor's record.
|
|
||||||
|
|
||||||
mx_stash_request(Conn, RequestRef, MX) ->
|
|
||||||
{value, {Ref, Conn, {conn, Pending}}, NextMX} = lists:keytake(Conn, 2, MX),
|
|
||||||
NewPending = [RequestRef | Pending],
|
|
||||||
[{Ref, Conn, {conn, NewPending}} | NextMX].
|
|
||||||
|
|
||||||
|
|
||||||
-spec mx_clear_request(Conn, RequestRef, MX) -> NewMX
|
|
||||||
when Conn :: pid(),
|
|
||||||
RequestRef :: reference(),
|
|
||||||
MX :: monitor_index(),
|
|
||||||
NewMX :: monitor_index().
|
|
||||||
%% @private
|
|
||||||
%% Remove a pending request reference from the connection monitor's record.
|
|
||||||
|
|
||||||
mx_clear_request(Conn, RequestRef, MX) ->
|
|
||||||
{value, {Ref, Conn, {conn, Pending}}, NextMX} = lists:keytake(Conn, 2, MX),
|
|
||||||
NewPending = lists:delete(RequestRef, Pending),
|
|
||||||
[{Ref, Conn, {conn, NewPending}} | NextMX].
|
|
||||||
|
|
||||||
|
|
||||||
-spec mx_lookup_category(pid(), monitor_index()) -> Result
|
|
||||||
when Result :: attempt
|
|
||||||
| conn
|
| conn
|
||||||
| requestor
|
| requestor
|
||||||
| subscriber
|
| subscriber
|
||||||
@ -1090,10 +1087,10 @@ mx_clear_request(Conn, RequestRef, MX) ->
|
|||||||
%% Lookup a monitor's categories.
|
%% Lookup a monitor's categories.
|
||||||
|
|
||||||
mx_lookup_category(Pid, MX) ->
|
mx_lookup_category(Pid, MX) ->
|
||||||
Monitor = lists:keyfind(Pid, 2, MX),
|
Monitor = maps:get(Pid, MX),
|
||||||
case element(3, Monitor) of
|
case element(2, Monitor) of
|
||||||
attempt -> attempt;
|
attempt -> attempt;
|
||||||
{conn, _} -> conn;
|
conn -> conn;
|
||||||
{0, _} -> requestor;
|
{0, _} -> requestor;
|
||||||
{_, 0} -> subscriber;
|
{_, 0} -> subscriber;
|
||||||
{_, _} -> sub_req
|
{_, _} -> sub_req
|
||||||
@ -1119,8 +1116,8 @@ mx_lookup_category(Pid, MX) ->
|
|||||||
|
|
||||||
cx_load() ->
|
cx_load() ->
|
||||||
case cx_populate() of
|
case cx_populate() of
|
||||||
{ok, CX} ->
|
{ok, Realms} ->
|
||||||
CX;
|
#cx{realms = maps:from_list(Realms)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = "Realm data and host cache load failed with : ~tp",
|
Message = "Realm data and host cache load failed with : ~tp",
|
||||||
ok = log(error, Message, [Reason]),
|
ok = log(error, Message, [Reason]),
|
||||||
@ -1146,41 +1143,42 @@ cx_populate() ->
|
|||||||
Pattern = filename:join(Home, "*.realm"),
|
Pattern = filename:join(Home, "*.realm"),
|
||||||
case filelib:wildcard(Pattern) of
|
case filelib:wildcard(Pattern) of
|
||||||
[] -> {error, no_realms};
|
[] -> {error, no_realms};
|
||||||
RealmFiles -> {ok, cx_populate(RealmFiles, #cx{})}
|
RealmFiles -> {ok, cx_populate(RealmFiles, [])}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_populate(RealmFiles, CX) -> NewCX
|
-spec cx_populate(RealmFiles, Realms) -> NewRealms
|
||||||
when RealmFiles :: file:filename(),
|
when RealmFiles :: file:filename(),
|
||||||
CX :: conn_index(),
|
Realms :: [{zx:realm(), realm_meta()}],
|
||||||
NewCX :: conn_index().
|
NewRealms :: [{zx:realm(), realm_meta()}].
|
||||||
%% @private
|
%% @private
|
||||||
%% Pack an initially empty conn_index() with realm meta and host cache data.
|
%% Pack an initially empty conn_index() with realm meta and host cache data.
|
||||||
%% Should not halt on a corrupted, missing, malformed, etc. realm file but will log
|
%% Should not halt on a corrupted, missing, malformed, etc. realm file but will log
|
||||||
%% any loading errors.
|
%% any loading errors.
|
||||||
|
|
||||||
cx_populate([File | Files], CX) ->
|
cx_populate([File | Files], Realms) ->
|
||||||
|
NewRealms =
|
||||||
case file:consult(File) of
|
case file:consult(File) of
|
||||||
{ok, Meta} ->
|
{ok, Meta} ->
|
||||||
NewCX = cx_load_realm_meta(Meta, CX),
|
Realm = cx_load_realm_meta(Meta),
|
||||||
cx_populate(Files, NewCX);
|
[Realm | Realms];
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = "Realm file ~tp could not be read. Failed with: ~tp. Skipping.",
|
Message = "Loading realm file ~tp failed with: ~tp. Skipping...",
|
||||||
ok = log(warning, Message, [File, Reason]),
|
ok = log(warning, Message, [File, Reason]),
|
||||||
cx_populate(Files, CX)
|
Realms
|
||||||
end;
|
end,
|
||||||
cx_populate([], CX) ->
|
cx_populate(Files, NewRealms);
|
||||||
CX.
|
cx_populate([], Realms) ->
|
||||||
|
Realms.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_load_realm_meta(Meta, CX) -> NewCX
|
-spec cx_load_realm_meta(Meta) -> Result
|
||||||
when Meta :: [{Key :: atom(), Value :: term()}],
|
when Meta :: [{Key :: atom(), Value :: term()}],
|
||||||
CX :: conn_index(),
|
Result :: {zx:realm(), realm_meta()}.
|
||||||
NewCX :: conn_index().
|
|
||||||
%% @private
|
%% @private
|
||||||
%% This function MUST adhere to the realmfile definition found at.
|
%% This function MUST adhere to the realmfile definition found at.
|
||||||
|
|
||||||
cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) ->
|
cx_load_realm_meta(Meta) ->
|
||||||
{realm, Realm} = lists:keyfind(realm, 1, Meta),
|
{realm, Realm} = lists:keyfind(realm, 1, Meta),
|
||||||
{revision, Revision} = lists:keyfind(revision, 1, Meta),
|
{revision, Revision} = lists:keyfind(revision, 1, Meta),
|
||||||
{prime, Prime} = lists:keyfind(prime, 1, Meta),
|
{prime, Prime} = lists:keyfind(prime, 1, Meta),
|
||||||
@ -1189,14 +1187,12 @@ cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) ->
|
|||||||
{sysops, Sysops} = lists:keyfind(sysops, 1, Meta),
|
{sysops, Sysops} = lists:keyfind(sysops, 1, Meta),
|
||||||
Basic =
|
Basic =
|
||||||
#rmeta{revision = Revision,
|
#rmeta{revision = Revision,
|
||||||
serial = Serial,
|
|
||||||
prime = Prime,
|
prime = Prime,
|
||||||
realm_keys = RealmKeys,
|
realm_keys = RealmKeys,
|
||||||
package_keys = PackageKeys,
|
package_keys = PackageKeys,
|
||||||
sysops = Sysops},
|
sysops = Sysops},
|
||||||
Complete = cx_load_cache(Realm, Basic),
|
Complete = cx_load_cache(Realm, Basic),
|
||||||
NewRealms = maps:put(Realm, Complete, Realms),
|
{Realm, Complete}.
|
||||||
CX#cx{realms = NewRealms}.
|
|
||||||
|
|
||||||
|
|
||||||
-spec cx_load_cache(Realm, Basic) -> Complete
|
-spec cx_load_cache(Realm, Basic) -> Complete
|
||||||
@ -1213,6 +1209,7 @@ cx_load_realm_meta(Meta, CX = #cx{realms = Realms}) ->
|
|||||||
%% Better file contention and parallel-executing system handling should
|
%% Better file contention and parallel-executing system handling should
|
||||||
%% eventually be implemented, especially if zx becomes common for end-user
|
%% eventually be implemented, especially if zx becomes common for end-user
|
||||||
%% GUI programs.
|
%% GUI programs.
|
||||||
|
%% NOTE: This "fixme" will only apply until the zx universal lock is implemented.
|
||||||
|
|
||||||
cx_load_cache(Realm, Basic) ->
|
cx_load_cache(Realm, Basic) ->
|
||||||
CacheFile = cx_cache_file(Realm),
|
CacheFile = cx_cache_file(Realm),
|
||||||
@ -1242,6 +1239,7 @@ cx_store_cache(#cx{realms = Realms}) ->
|
|||||||
-spec cx_write_cache({zx:realm(), realm_meta()}) -> ok.
|
-spec cx_write_cache({zx:realm(), realm_meta()}) -> ok.
|
||||||
%% @private
|
%% @private
|
||||||
%% FIXME: The same concerns as noted in the cx_load_cache/2 FIXME comment apply here.
|
%% FIXME: The same concerns as noted in the cx_load_cache/2 FIXME comment apply here.
|
||||||
|
%% NOTE: This "fixme" will only apply until the zx universal lock is implemented.
|
||||||
|
|
||||||
cx_write_cache({Realm,
|
cx_write_cache({Realm,
|
||||||
#rmeta{serial = Serial, private = Private, mirrors = Mirrors}}) ->
|
#rmeta{serial = Serial, private = Private, mirrors = Mirrors}}) ->
|
||||||
@ -1270,62 +1268,53 @@ cx_realms(#cx{realms = Realms}) ->
|
|||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
Result :: {ok, Next, NewCX}
|
Result :: {ok, Next, NewCX}
|
||||||
| {prime, Prime, NewCX}
|
| {prime, Prime, NewCX}
|
||||||
| {error, Reason},
|
| {error, Reason, NewCX},
|
||||||
Next :: zx:host(),
|
Next :: zx:host(),
|
||||||
Prime :: zx:host(),
|
Prime :: zx:host(),
|
||||||
NewCX :: conn_index(),
|
NewCX :: conn_index(),
|
||||||
Reason :: bad_realm
|
Reason :: bad_realm
|
||||||
| connected.
|
| connected.
|
||||||
%% @private
|
%% @private
|
||||||
%% Given a realm, retun the next cached host location to which to connect. Returns an
|
%% Given a realm, retun the next cached host location to which to connect. Returns
|
||||||
%% error if the realm is already assigned or if the realm is not configured.
|
%% error if the realm is already assigned, if it is available but should have been
|
||||||
|
%% assigned, or if the realm is not configured.
|
||||||
|
%% If all cached mirrors are exhausted it will return the realm's prime host and
|
||||||
|
%% reload the mirrors queue with private mirrors.
|
||||||
|
|
||||||
cx_next_host(Realm, CX = #cx{assigned = Assigned}) ->
|
cx_next_host(Realm, CX = #cx{realms = Realms}) ->
|
||||||
case lists:keymember(Realm, 1, Assigned) of
|
|
||||||
false -> cx_next_host2(Realm, CX);
|
|
||||||
true -> {error, connected}
|
|
||||||
end.
|
|
||||||
|
|
||||||
cx_next_host2(Realm, CX = #cx{realm = Realms}) ->
|
|
||||||
case maps:find(Realm, Realms) of
|
case maps:find(Realm, Realms) of
|
||||||
{ok, Meta} ->
|
{ok, Meta = #rmeta{assigned = none, available = [Pid | Pids]}} ->
|
||||||
{Result, Host, NewMeta} = cx_next_host3(Meta),
|
ok = log(warning, "Call to cx_next_host/2 when connection available."),
|
||||||
|
NewMeta = Meta#rmeta{assigned = Pid, available = Pids},
|
||||||
NewRealms = maps:put(Realm, NewMeta, Realms),
|
NewRealms = maps:put(Realm, NewMeta, Realms),
|
||||||
{Result, Host, CX#cx{realms = NewRealms}};
|
NewCX = CX#cx{realms = NewRealms},
|
||||||
|
{error, connected, NewCX};
|
||||||
|
{ok, Meta = #rmeta{assigned = none, available = []}} ->
|
||||||
|
{Outcome, Host, NewMeta} = cx_next_host(Meta),
|
||||||
|
NewRealms = maps:put(Realm, NewMeta, Realms),
|
||||||
|
NewCX = CX#cx{realms = NewRealms},
|
||||||
|
{Outcome, Host, NewCX};
|
||||||
|
{ok, Meta = #rmeta{assigned = Conn}} when is_pid(Conn) ->
|
||||||
|
ok = log(warning, "Call to cx_next_host/2 when connection assigned."),
|
||||||
|
{error, connected, CX};
|
||||||
error ->
|
error ->
|
||||||
{error, bad_realm}
|
{error, bad_realm, CX}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_next_host3(RealmMeta) -> Result
|
-spec cx_next_host(Meta) -> Result
|
||||||
when RealmMeta :: realm_meta(),
|
when Meta :: realm_meta(),
|
||||||
Result :: {ok, Next, NewRealmMeta}
|
Result :: {ok, Next, NewMeta}
|
||||||
| {prime, Prime, NewRealmMeta},
|
| {prime, Prime, NewMeta},
|
||||||
Next :: zx:host(),
|
Next :: zx:host(),
|
||||||
Prime :: zx:host(),
|
Prime :: zx:host(),
|
||||||
NewRealmMeta :: realm_meta().
|
NewMeta :: realm_meta().
|
||||||
%% @private
|
|
||||||
%% Match on the important success cases first.
|
|
||||||
%% If there is a locally configured set of private mirrors (usually on the same LAN,
|
|
||||||
%% or public ones an organization hosts for its own users) then try those first.
|
|
||||||
%% Trying "first" is a relative concept in long-lived systems that experience a high
|
|
||||||
%% frequency of network churn. When the daemon is initialized a queue is created from
|
|
||||||
%% the known public mirrors, and then the private mirrors are enqueued at the head of
|
|
||||||
%% the mirrors so they will be encountered first.
|
|
||||||
%% If the entire combined mirrors list is exhausted then the prime node will be
|
|
||||||
%% returned, but also as a consequence the prime mirrors will be reloaded at the head
|
|
||||||
%% once again so if the prime fails (or causes a redirect) the private mirrors will
|
|
||||||
%% once again occur in the queue.
|
|
||||||
%% If the prime node is returned it is indicated with the atom `prime', which should
|
|
||||||
%% indicate to the caller that any iterative host scanning or connection procedures
|
|
||||||
%% should treat this as the last value of interest (and stop spawning connections).
|
|
||||||
|
|
||||||
cx_next_host3(Meta = #rmeta{prime = Prime, private = Private, mirrors = Mirrors}) ->
|
cx_next_host(Meta = #rmeta{prime = Prime, private = Private, mirrors = Mirrors}) ->
|
||||||
case queue:is_empty(Mirrors) of
|
case queue:out(Mirrors) of
|
||||||
false ->
|
{{value, Next}, NewMirrors} ->
|
||||||
{{value, Next}, NewMirrors} = queue:out(Mirrors),
|
|
||||||
{ok, Next, Meta#rmeta{mirrors = NewMirrors}};
|
{ok, Next, Meta#rmeta{mirrors = NewMirrors}};
|
||||||
true ->
|
{empty, Mirrors} ->
|
||||||
Enqueue = fun(H, Q) -> queue:in(H, Q) end,
|
Enqueue = fun(H, Q) -> queue:in(H, Q) end,
|
||||||
NewMirrors = lists:foldl(Enqueue, Private, Mirrors),
|
NewMirrors = lists:foldl(Enqueue, Private, Mirrors),
|
||||||
{prime, Prime, Meta#rmeta{mirrors = NewMirrors}}
|
{prime, Prime, Meta#rmeta{mirrors = NewMirrors}}
|
||||||
@ -1364,7 +1353,7 @@ cx_next_hosts2(N, Realm, Meta, CX = #cx{realms = Realms}) ->
|
|||||||
cx_next_hosts3(N, Hosts, Meta) when N < 1 ->
|
cx_next_hosts3(N, Hosts, Meta) when N < 1 ->
|
||||||
{ok, Hosts, Meta};
|
{ok, Hosts, Meta};
|
||||||
cx_next_hosts3(N, Hosts, Meta) ->
|
cx_next_hosts3(N, Hosts, Meta) ->
|
||||||
case cx_next_host3(Meta) of
|
case cx_next_host(Meta) of
|
||||||
{ok, Host, NewMeta} -> cx_next_hosts3(N - 1, [Host | Hosts], NewMeta);
|
{ok, Host, NewMeta} -> cx_next_hosts3(N - 1, [Host | Hosts], NewMeta);
|
||||||
{prime, Prime, NewMeta} -> {ok, [Prime | Hosts], NewMeta}
|
{prime, Prime, NewMeta} -> {ok, [Prime | Hosts], NewMeta}
|
||||||
end.
|
end.
|
||||||
@ -1400,9 +1389,9 @@ cx_add_attempt(Pid, Host, Realm, CX = #cx{attempts = Attempts}) ->
|
|||||||
CX#cx{attempts = [{Pid, Host, [Realm]} | Attempts]}.
|
CX#cx{attempts = [{Pid, Host, [Realm]} | Attempts]}.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_connected(Available, Conn, CX) -> Result
|
-spec cx_connected(Available, Pid, CX) -> Result
|
||||||
when Available :: [{zx:realm(), zx:serial()}],
|
when Available :: [{zx:realm(), zx:serial()}],
|
||||||
Conn :: pid(),
|
Pid :: pid(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
Result :: {Assignment, NewCX},
|
Result :: {Assignment, NewCX},
|
||||||
Assignment :: assigned | unassigned,
|
Assignment :: assigned | unassigned,
|
||||||
@ -1415,11 +1404,13 @@ cx_add_attempt(Pid, Host, Realm, CX = #cx{attempts = Attempts}) ->
|
|||||||
%% The return value is a tuple that indicates whether the new connection was assigned
|
%% The return value is a tuple that indicates whether the new connection was assigned
|
||||||
%% or not and the updated CX data value.
|
%% or not and the updated CX data value.
|
||||||
|
|
||||||
cx_connected(Available, Conn, CX = #cx{attempts = Attempts}) ->
|
cx_connected(Available, Pid, CX = #cx{attempts = Attempts}) ->
|
||||||
{value, {Pid, Host, _}, NewAttempts} = lists:keytake(Conn, 1, Attempts),
|
{value, Attempt, NewAttempts} = lists:keytake(Pid, 1, Attempts),
|
||||||
Realms = [R || {R, _} <- Available],
|
Host = element(2, Attempt),
|
||||||
|
Realms = [element(1, R) || R <- Available],
|
||||||
NewCX = CX#cx{attempts = NewAttempts},
|
NewCX = CX#cx{attempts = NewAttempts},
|
||||||
cx_connected(unassigned, Available, {Pid, Host, Realms}, NewCX).
|
Conn = #conn{pid = Pid, host = Host, realms = Realms},
|
||||||
|
cx_connected(unassigned, Available, Conn, NewCX).
|
||||||
|
|
||||||
|
|
||||||
-spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX}
|
-spec cx_connected(A, Available, Conn, CX) -> {NewA, NewCX}
|
||||||
@ -1433,10 +1424,12 @@ cx_connected(Available, Conn, CX = #cx{attempts = Attempts}) ->
|
|||||||
%% Only accept a new realm as available if the reported serial is equal or newer to the
|
%% Only accept a new realm as available if the reported serial is equal or newer to the
|
||||||
%% highest known serial.
|
%% highest known serial.
|
||||||
|
|
||||||
cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) ->
|
cx_connected(A,
|
||||||
|
[{Realm, Serial} | Rest],
|
||||||
|
Conn = #conn{pid = Pid},
|
||||||
|
CX = #cx{realms = Realms}) ->
|
||||||
case maps:find(Realm, Realms) of
|
case maps:find(Realm, Realms) of
|
||||||
{ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial ->
|
{ok, Meta = #rmeta{serial = S, available = Available}} when S =< Serial ->
|
||||||
Pid = element(1, Conn),
|
|
||||||
NewMeta = Meta#rmeta{serial = Serial, available = [Pid | Available]},
|
NewMeta = Meta#rmeta{serial = Serial, available = [Pid | Available]},
|
||||||
{NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX),
|
{NewA, NewCX} = cx_connected(A, Realm, Conn, NewMeta, CX),
|
||||||
cx_connected(NewA, Rest, Conn, NewCX);
|
cx_connected(NewA, Rest, Conn, NewCX);
|
||||||
@ -1445,7 +1438,10 @@ cx_connected(A, [{Realm, Serial} | Rest], Conn, CX = #cx{realms = Realms}) ->
|
|||||||
error ->
|
error ->
|
||||||
cx_connected(A, Rest, Conn, CX)
|
cx_connected(A, Rest, Conn, CX)
|
||||||
end;
|
end;
|
||||||
cx_connected(A, [], Conn, CX = #cx{conns = Conns}) ->
|
cx_connected(A,
|
||||||
|
[],
|
||||||
|
Conn,
|
||||||
|
CX = #cx{conns = Conns}) ->
|
||||||
{A, CX#cx{conns = [Conn | Conns]}}.
|
{A, CX#cx{conns = [Conn | Conns]}}.
|
||||||
|
|
||||||
|
|
||||||
@ -1465,7 +1461,7 @@ cx_connected(A, [], Conn, CX = #cx{conns = Conns}) ->
|
|||||||
|
|
||||||
cx_connected(A,
|
cx_connected(A,
|
||||||
Realm,
|
Realm,
|
||||||
{Pid, Prime, _},
|
#conn{pid = Pid, host = Prime},
|
||||||
Meta = #rmeta{prime = Prime, assigned = none},
|
Meta = #rmeta{prime = Prime, assigned = none},
|
||||||
CX = #cx{realms = Realms}) ->
|
CX = #cx{realms = Realms}) ->
|
||||||
NewMeta = Meta#rmeta{assigned = Pid},
|
NewMeta = Meta#rmeta{assigned = Pid},
|
||||||
@ -1474,67 +1470,70 @@ cx_connected(A,
|
|||||||
{assigned, NewCX};
|
{assigned, NewCX};
|
||||||
cx_connected(A,
|
cx_connected(A,
|
||||||
Realm,
|
Realm,
|
||||||
{Pid, Host, _},
|
#conn{pid = Pid, host = Host},
|
||||||
Meta = #rmeta{mirrors = Mirrors, assigned = none},
|
Meta = #rmeta{mirrors = Mirrors, assigned = none},
|
||||||
CX = #cx{realms = Realms, attempts = Attempts}) ->
|
CX = #cx{realms = Realms, attempts = Attempts}) ->
|
||||||
NewMirrors = enqueue_unique(Host, Mirrors),
|
NewMirrors = cx_enqueue_unique(Host, Mirrors),
|
||||||
NewMeta = Meta#rmeta{mirrors = NewMirrors, assigned = Pid},
|
NewMeta = Meta#rmeta{mirrors = NewMirrors, assigned = Pid},
|
||||||
NewRealms = maps:put(Realm, NewMeta, Realms),
|
NewRealms = maps:put(Realm, NewMeta, Realms),
|
||||||
NewCX = CX#cx{realms = NewRealms},
|
NewCX = CX#cx{realms = NewRealms},
|
||||||
{assigned, NewCX};
|
{assigned, NewCX};
|
||||||
cx_connected(A,
|
cx_connected(A,
|
||||||
_,
|
_,
|
||||||
{_, Prime, _},
|
#conn{host = Prime},
|
||||||
#rmeta{prime = Prime},
|
#rmeta{prime = Prime},
|
||||||
CX) ->
|
CX) ->
|
||||||
{A, CX};
|
{A, CX};
|
||||||
cx_connected(A,
|
cx_connected(A,
|
||||||
Realm,
|
Realm,
|
||||||
{_, Host, _},
|
#conn{host = Host},
|
||||||
Meta = #rmeta{mirrors = Mirrors},
|
Meta = #rmeta{mirrors = Mirrors},
|
||||||
CX = #cx{realms = Realms}) ->
|
CX = #cx{realms = Realms}) ->
|
||||||
NewMirrors = enqueue_unique(Host, Mirrors),
|
NewMirrors = cx_enqueue_unique(Host, Mirrors),
|
||||||
NetMeta = Meta#rmeta{mirrors = NewMirrors},
|
NewMeta = Meta#rmeta{mirrors = NewMirrors},
|
||||||
NewRealms = maps:put(Realm, NewMeta, Realms),
|
NewRealms = maps:put(Realm, NewMeta, Realms),
|
||||||
NewCX = CX#cx{realms = NewRealms},
|
NewCX = CX#cx{realms = NewRealms},
|
||||||
{A, NewCX}.
|
{A, NewCX}.
|
||||||
|
|
||||||
|
|
||||||
-spec enqueue_unique(term(), queue:queue()) -> queue:queue().
|
-spec cx_enqueue_unique(term(), queue:queue()) -> queue:queue().
|
||||||
%% @private
|
%% @private
|
||||||
%% Simple function to ensure that only unique elements are added to a queue. Obviously
|
%% Simple function to ensure that only unique elements are added to a queue. Obviously
|
||||||
%% this operation is extremely general and O(n) in complexity due to the use of
|
%% this operation is extremely general and O(n) in complexity due to the use of
|
||||||
%% queue:member/2.
|
%% queue:member/2.
|
||||||
|
|
||||||
enqueue_unique(Element, Queue) ->
|
cx_enqueue_unique(Element, Queue) ->
|
||||||
case queue:member(Element, Queue) of
|
case queue:member(Element, Queue) of
|
||||||
true -> Queue;
|
true -> Queue;
|
||||||
false -> queue:in(Element, Queue)
|
false -> queue:in(Element, Queue)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_failed(Conn, CX) -> NewCX
|
-spec cx_failed(Conn, CX) -> {Realms, NewCX}
|
||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
|
Realms :: [zx:realm()],
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% Remove a failed attempt and all its associations.
|
%% Remove a failed attempt and all its associations.
|
||||||
|
|
||||||
cx_failed(Conn, #cx{attempts = Attempts}) ->
|
cx_failed(Conn, CX = #cx{attempts = Attempts}) ->
|
||||||
NewAttempts = lists:keydelete(Conn, 1, Attempts),
|
{value, Attempt, NewAttempts = lists:keytake(Conn, 1, Attempts),
|
||||||
CX#cx{attempts = NewAttempts}.
|
Realms = element(3, Attempt),
|
||||||
|
{Realms, CX#cx{attempts = NewAttempts}}.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_redirect(Conn, Hosts, CX) -> {Unassigned, NextCX}
|
-spec cx_redirect(Conn, Hosts, CX) -> {Unassigned, NewCX}
|
||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
Hosts :: [{zx:host(), [zx:realm()]}],
|
Hosts :: [{zx:host(), [zx:realm()]}],
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
Unassigned :: [zx:realm()],
|
Unassigned :: [zx:realm()],
|
||||||
NextCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
|
|
||||||
cx_redirect(Conn, Hosts, CX) ->
|
cx_redirect(Conn, Hosts, CX = #cx{attempts = Attempts}) ->
|
||||||
NextCX = cx_failed(Conn, CX),
|
NewAttempts = lists:keydelete(Conn, 1, Attempts),
|
||||||
NewCX = #cx{realms = Realms} = cx_redirect(Hosts, NextCX),
|
NextCX = CX#cx{attempts = NewAttempts},
|
||||||
|
NewCX = cx_redirect(Hosts, NextCX),
|
||||||
Unassigned = cx_unassigned(NewCX),
|
Unassigned = cx_unassigned(NewCX),
|
||||||
{Unassigned, NewCX}.
|
{Unassigned, NewCX}.
|
||||||
|
|
||||||
@ -1542,7 +1541,7 @@ cx_redirect(Conn, Hosts, CX) ->
|
|||||||
-spec cx_redirect(Hosts, CX) -> NewCX
|
-spec cx_redirect(Hosts, CX) -> NewCX
|
||||||
when Hosts :: [{zx:host(), [zx:realm()]}],
|
when Hosts :: [{zx:host(), [zx:realm()]}],
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
NextCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% Add the host to any realm mirror queues that it provides.
|
%% Add the host to any realm mirror queues that it provides.
|
||||||
|
|
||||||
@ -1551,7 +1550,7 @@ cx_redirect([{Host, Provided} | Rest], CX = #cx{realms = Realms}) ->
|
|||||||
fun(R, Rs) ->
|
fun(R, Rs) ->
|
||||||
case maps:find(R, Rs) of
|
case maps:find(R, Rs) of
|
||||||
{ok, Meta = #rmeta{mirrors = Mirrors}} ->
|
{ok, Meta = #rmeta{mirrors = Mirrors}} ->
|
||||||
NewMirrors = enqueue_unique(Host, Mirrors),
|
NewMirrors = cx_enqueue_unique(Host, Mirrors),
|
||||||
NewMeta = Meta#rmeta{mirrors = NewMirrors},
|
NewMeta = Meta#rmeta{mirrors = NewMirrors},
|
||||||
maps:put(R, NewMeta, Rs);
|
maps:put(R, NewMeta, Rs);
|
||||||
error ->
|
error ->
|
||||||
@ -1582,9 +1581,11 @@ cx_unassigned(#cx{realms = Realms}) ->
|
|||||||
maps:fold(NotAssigned, [], Realms).
|
maps:fold(NotAssigned, [], Realms).
|
||||||
|
|
||||||
|
|
||||||
-spec cx_disconnected(Conn, CX) -> NewCX
|
-spec cx_disconnected(Conn, CX) -> {Requests, Subs, NewCX}
|
||||||
when Conn :: pid(),
|
when Conn :: pid(),
|
||||||
CX :: conn_index(),
|
CX :: conn_index(),
|
||||||
|
Requests :: [reference()],
|
||||||
|
Subs :: [zx:package()],
|
||||||
NewCX :: conn_index().
|
NewCX :: conn_index().
|
||||||
%% @private
|
%% @private
|
||||||
%% An abstract data handler which is called whenever a connection terminates.
|
%% An abstract data handler which is called whenever a connection terminates.
|
||||||
@ -1592,10 +1593,12 @@ cx_unassigned(#cx{realms = Realms}) ->
|
|||||||
%% realms, and returns the monitor reference of the pid, a list of realms that are now
|
%% realms, and returns the monitor reference of the pid, a list of realms that are now
|
||||||
%% unassigned, and an updated connection index.
|
%% unassigned, and an updated connection index.
|
||||||
|
|
||||||
cx_disconnected(Conn, CX = #cx{realms = Realms, conns = Conns}) ->
|
cx_disconnected(Pid, CX = #cx{realms = Realms, conns = Conns}) ->
|
||||||
{value, {Pid, Host, _}, NewConns} = lists:keytake(Conn, 1, Conns),
|
{value, Conn, NewConns} = lists:keytake(Pid, #conn.pid, Conns),
|
||||||
|
#conn{host = Host, requests = Requests, subs = Subs} = Conn,
|
||||||
NewRealms = cx_scrub_assigned(Pid, Host, Realms),
|
NewRealms = cx_scrub_assigned(Pid, Host, Realms),
|
||||||
CX#cx{realms = NewRealms, conns = NewConns}.
|
NewCX = CX#cx{realms = NewRealms, conns = NewConns},
|
||||||
|
{Requests, Subs, NewCX}.
|
||||||
|
|
||||||
|
|
||||||
-spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms
|
-spec cx_scrub_assigned(Pid, Host, Realms) -> NewRealms
|
||||||
@ -1611,11 +1614,11 @@ cx_scrub_assigned(Pid, Host, Realms) ->
|
|||||||
Scrub =
|
Scrub =
|
||||||
fun
|
fun
|
||||||
(_, V = #rmeta{mirrors = M, available = A, assigned = C}) when C == Pid ->
|
(_, V = #rmeta{mirrors = M, available = A, assigned = C}) when C == Pid ->
|
||||||
V#rmeta{mirrors = enqueue_unique(Host, M),
|
V#rmeta{mirrors = cx_enqueue_unique(Host, M),
|
||||||
available = lists:delete(Pid, A),
|
available = lists:delete(Pid, A),
|
||||||
assigned = none};
|
assigned = none};
|
||||||
(_, V = #rmeta{mirrors = M, available = A}) ->
|
(_, V = #rmeta{mirrors = M, available = A}) ->
|
||||||
V#rmeta{mirrors = enqueue_unique(Host, M),
|
V#rmeta{mirrors = cx_enqueue_unique(Host, M),
|
||||||
available = lists:delete(Pid, A)}
|
available = lists:delete(Pid, A)}
|
||||||
end,
|
end,
|
||||||
maps:map(Scrub, Realms).
|
maps:map(Scrub, Realms).
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user