Fixed up zx_conn a bit

This commit is contained in:
Craig Everett 2018-06-05 18:56:14 +09:00
parent 3be0803721
commit 44d3bb37fc
8 changed files with 226 additions and 424 deletions

75
TODO
View File

@ -1,75 +0,0 @@
- Make the create project command fail if the realm is invalid or the project bame already exists.
- Add a module name conflict checker to ZX
- Check whether the "repo name doesn't have to be the same as package, package doesn't have to be the same as main interface module" statement is true.
- Make the "create user" bundle command check whether a user already exists at that realm.
- Define the user bundle file contents: .zuf
- Make it possible for a site administrator to declare specific versions for programs executed by clients.
Site adminisration via an organizational mirror should not be ricket science.
- zomp nodes must report the realms they provide to upstream nodes to which they connect.
On redirect a list of hosts of the form [{zx:host(), [zx:realm()]}] must be provided to the downstream client.
This is the only way that downstream clients can determine which redirect hosts are useful to it.
- Write a logging process.
Pick a log rotation scheme.
Eventually make it so that it can shed log loads in the event they get out of hand.
- Create zx_daemon primes.
See below
- Create persistent Windows alias for "zx" using the doskey command and adding it to the localuser's registry.
The installation escript will need to be updated to update the windows registry for HKEY_CURRENT_USER\Software\Microsoft\Command Processor
AutoRun will need to be set for %USERPROFILE%\zomp_alias.cmd
zomp_alias.cmd will need to do something like:
`doskey zx="werl.exe -pa %zx_dir%/ebin -run zx start $*"`
https://stackoverflow.com/questions/20530996/aliases-in-windows-command-prompt
Whatever happens, local users must be able to do `zx $@` and get the expected results.
Somewhat more tricky is how to make the creation of shortcuts less taxing on the lay user... ?
New Feature: ZX Universal lock
Cross-instance communication
We don't want multiple zx_daemons doing funny things to the filesystem at the same time.
We DO want to be able to run multiple ZX programs to be able to run at the same time.
The solution is to guarantee that there is only ever one zx_daemon running at a given time.
IPC is messy to get straight across various host systems, but network sockets to localhost work in a normal way.
The first zx_daemon to start up will check for a lock file in the $ZOMP_HOME directory.
If there is no lock file present it will write a lock file with a timestamp, begin listening on a local port, and then update the lock file with the listening port number.
If it finds a lock file in $ZOMP_HOME it will attempt to connect to the running zx_daemon on the port indicated in the lock file. If only a timestamp exists with no port number then it will wait two seconds from the time indicated in the timestamp in the lock file before re-reading it -- if the second read still contains no port number it will remove the lock file and assume the primary role for the system itself.
If a connection cannot be established then it will assume the instance that had written the lock file failed to delete it for Bad Reasons, remove the lock file, open a port, and write its own lock file, taking over as the lead zx_daemon.
Any new zx_daemons that come up in the system will establish a connection to the zx_daemon that wrote the lock file, and proxy all requests through that primary zx_daemon.
When a zx_daemon that is acting as primary for the system retires it will complete all ongoing actions first, then begin queueing requests without acting on them, designate the oldest peer as the new leader, get confirmation that the new leader has a port open, update the original lock file with the new port number, redirect all connections to the new zx_daemon, and then retire.
init(Args) ->
Stuff = do_stuff(),
Tries = 3,
Path = lock_file(),
case check_for_leader(Tries, Path) of
no_leader ->
{found, Socket} ->
end.
check_for_leader(0, _) ->
no_leader;
check_for_leader(Tries, Path) ->
case file:open(Path, [write, exclusive]) of
{ok, FD} -> become_leader(FD);
{error, eexist} -> contact_leader()
end.
case file:consult(Path) of
{ok, Data} ->
{error, enoexist} ->
check_file(Path);
end
end,

View File

@ -119,6 +119,9 @@ do(["list", "packages", Realm]) ->
do(["list", "versions", PackageName]) -> do(["list", "versions", PackageName]) ->
ok = start(), ok = start(),
done(zx_local:list_versions(PackageName)); done(zx_local:list_versions(PackageName));
do(["latest", PackageString]) ->
ok = start(),
done(zx_local:latest(PackageString));
do(["import", "realm", RealmFile]) -> do(["import", "realm", RealmFile]) ->
done(zx_local:import_realm(RealmFile)); done(zx_local:import_realm(RealmFile));
do(["drop", "dep", PackageString]) -> do(["drop", "dep", PackageString]) ->
@ -389,12 +392,12 @@ install(PackageString) ->
install(PackageString, ID) -> install(PackageString, ID) ->
receive receive
{z_result, ID, done} -> {result, ID, done} ->
ok; ok;
{z_result, ID, {hops, Count}} -> {result, ID, {hops, Count}} ->
ok = log(info, "~ts ~w hops away.", [PackageString, Count]), ok = log(info, "~ts ~w hops away.", [PackageString, Count]),
install(PackageString, ID); install(PackageString, ID);
{z_result, ID, {error, Reason}} -> {result, ID, {error, Reason}} ->
{error, Reason, 1} {error, Reason, 1}
after 60000 -> after 60000 ->
{error, timeout, 62} {error, timeout, 62}
@ -625,6 +628,8 @@ usage() ->
" zx accept PackageID~n" " zx accept PackageID~n"
" zx create realm~n" " zx create realm~n"
" zx create realmfile Realm~n" " zx create realmfile Realm~n"
" zx takeover Realm~n"
" zx abdicate Realm~n"
"~n" "~n"
"Where~n" "Where~n"
" PackageID :: A string of the form Realm-Name[-Version]~n" " PackageID :: A string of the form Realm-Name[-Version]~n"

View File

@ -11,9 +11,6 @@
-copyright("Craig Everett <zxq9@zxq9.com>"). -copyright("Craig Everett <zxq9@zxq9.com>").
-license("GPL-3.0"). -license("GPL-3.0").
% FIXME
-compile(export_all).
-export([subscribe/2, unsubscribe/2, fetch/3, query/3]). -export([subscribe/2, unsubscribe/2, fetch/3, query/3]).
-export([start/1, stop/1]). -export([start/1, stop/1]).
-export([start_link/1, init/2]). -export([start_link/1, init/2]).
@ -21,13 +18,6 @@
-include("zx_logger.hrl"). -include("zx_logger.hrl").
%%% Types
%-type incoming() :: ping
% | {sub, Channel :: term(), Message :: term()}
% | term().
%%% Interface %%% Interface
-spec subscribe(Conn, Package) -> ok -spec subscribe(Conn, Package) -> ok
@ -52,6 +42,10 @@ unsubscribe(Conn, Package) ->
when Conn :: pid(), when Conn :: pid(),
ID :: zx_daemon:id(), ID :: zx_daemon:id(),
Object :: zx_daemon:object(). Object :: zx_daemon:object().
%% @doc
%% This requests a package be fetched from upstream.
%% Only to be called by zx_daemon.
%% Results must be returned with the ID via zx_daemon:result/2.
fetch(Conn, ID, Object) -> fetch(Conn, ID, Object) ->
Conn ! {fetch, ID, Object}, Conn ! {fetch, ID, Object},
@ -62,6 +56,10 @@ fetch(Conn, ID, Object) ->
when Conn :: pid(), when Conn :: pid(),
ID :: zx_daemon:id(), ID :: zx_daemon:id(),
Action :: zx_daemon:action(). Action :: zx_daemon:action().
%% @doc
%% Wraps any legal query.
%% Only to be called by zx_daemon.
%% Results must be returned with the ID via zx_daemon:result/2.
query(Conn, ID, Action) -> query(Conn, ID, Action) ->
Conn ! {query, ID, Action}, Conn ! {query, ID, Action},
@ -142,43 +140,30 @@ connect(Parent, Debug, {Host, Port}) ->
Debug :: [sys:dbg_opt()], Debug :: [sys:dbg_opt()],
Socket :: gen_tcp:socket(). Socket :: gen_tcp:socket().
%% @private %% @private
%% Confirm the zomp node can handle "OTPR USER 1" and is accepting connections or try %% Confirm the zomp node can handle "LEAF 1:" and is accepting connections or try
%% another node. %% another node.
confirm_service(Parent, Debug, Socket) -> confirm_service(Parent, Debug, Socket) ->
ok = gen_tcp:send(Socket, <<"OTPR USER 1">>), ok = gen_tcp:send(Socket, <<"LEAF 1:">>),
receive receive
{tcp, Socket, Bin} -> {tcp, Socket, <<0:8, RealmsBin/binary>>} ->
case binary_to_term(Bin, [safe]) of {ok, Realms} = zx_lib:b_to_ts(RealmsBin),
ok ->
query_realms(Parent, Debug, Socket);
{redirect, Hosts} ->
ok = zx_daemon:report({redirect, Hosts}),
ok = zx_net:disconnect(Socket),
terminate()
end;
{tcp_closed, Socket} ->
handle_unexpected_close()
after 5000 ->
handle_timeout(Socket)
end.
-spec query_realms(Parent, Debug, Socket) -> no_return()
when Parent :: pid(),
Debug :: [sys:dbg_opt()],
Socket :: gen_tcp:socket().
%% @private
%% Confirm that the connected host has a valid serial for the realm zx is trying to
%% reach, and if not retry on another node.
query_realms(Parent, Debug, Socket) ->
ok = zx_net:send(Socket, list),
receive
{tcp, Socket, Bin} ->
{ok, Realms} = binary_to_term(Bin, [safe]),
ok = zx_daemon:report({connected, Realms}), ok = zx_daemon:report({connected, Realms}),
loop(Parent, Debug, Socket); loop(Parent, Debug, Socket);
{tcp, Socket, <<1:8, HostsBin/binary>>} ->
{ok, Hosts} = zx_lib:b_to_ts(HostsBin),
ok = zx_daemon:report({redirect, Hosts}),
ok = zx_net:disconnect(Socket),
terminate();
{tcp, Socket, <<2:8>>} ->
ok = zx_daemon:report({use_version, no_version}),
terminate();
{tcp, Socket, <<2:8, Version:16>>} ->
ok = zx_daemon:report({use_version, Version}),
terminate();
{tcp, Socket, <<3:8, Reason:16>>} ->
ok = zx_daemon:report({no_service, Reason}),
terminate();
{tcp_closed, Socket} -> {tcp_closed, Socket} ->
handle_unexpected_close() handle_unexpected_close()
after 5000 -> after 5000 ->
@ -200,28 +185,40 @@ query_realms(Parent, Debug, Socket) ->
%% of order or else whatever sequenced communication was happening will be corrupted. %% of order or else whatever sequenced communication was happening will be corrupted.
loop(Parent, Debug, Socket) -> loop(Parent, Debug, Socket) ->
receive
{tcp, Socket, Bin} ->
ok = handle_message(Socket, Bin),
ok = inet:setopts(Socket, [{active, once}]), ok = inet:setopts(Socket, [{active, once}]),
receive
{tcp, Socket, <<1:1, 0:7>>} ->
ok = pong(Socket),
loop(Parent, Debug, Socket); loop(Parent, Debug, Socket);
{tcp, Socket, <<1:1, 1:7, SigSize:24, Sig:SigSize/binary, 9:8, Bin/binary>>} ->
ok = handle_package_update(Sig, Bin),
loop(Parent, Debug, Socket);
{tcp, Socket, <<1:1, 2:7, Bin/binary>>} ->
{ok, {Realm, Serial}} = zx_lib:b_to_ts(Bin),
ok = zx_daemon:report({serial_update, Realm, Serial}),
loop(Parent, Debug, Socket);
{tcp, Socket, Unexpected} ->
ok = log(warning, "Funky data from node: ~tp", [Unexpected]),
ok = zx_net:disconnect(Socket),
terminate();
{subscribe, Package} -> {subscribe, Package} ->
ok = zx_net:send(Socket, {subscribe, Package}), ok = do_subscribe(Socket, Package),
loop(Parent, Debug, Socket); loop(Parent, Debug, Socket);
{unsubscribe, Package} -> {unsubscribe, Package} ->
ok = zx_net:send(Socket, {unsubscribe, Package}), ok = do_unsubscribe(Socket, Package),
loop(Parent, Debug, Socket);
{fetch, ID, Object} ->
{ok, Outcome} = handle_fetch(Socket, Object),
ok = zx_daemon:result(ID, Outcome),
loop(Parent, Debug, Socket); loop(Parent, Debug, Socket);
{query, ID, Action} -> {query, ID, Action} ->
{ok, Outcome} = handle_query(Socket, Action), Result = do_query(Socket, Action),
ok = zx_daemon:result(ID, Outcome), ok = zx_daemon:result(ID, Result),
loop(Parent, Debug, Socket);
{fetch, ID, PackageID} ->
ok = do_fetch(Socket, ID, PackageID),
loop(Parent, Debug, Socket); loop(Parent, Debug, Socket);
stop -> stop ->
ok = zx_net:disconnect(Socket), ok = zx_net:disconnect(Socket),
terminate(); terminate();
{tcp_closed, Socket} ->
handle_unexpected_close();
Unexpected -> Unexpected ->
ok = log(warning, "Unexpected message: ~tp", [Unexpected]), ok = log(warning, "Unexpected message: ~tp", [Unexpected]),
loop(Parent, Debug, Socket) loop(Parent, Debug, Socket)
@ -229,184 +226,162 @@ loop(Parent, Debug, Socket) ->
%%% Idle Incoming Upstream Messages %% TODO: Pull in missing key chains.
handle_package_update(Sig, Bin) ->
-spec handle_message(Socket, Bin) -> ok | no_return() Message = {_, _, _, KeyID, {Realm, Name} = PackageID} = zx_lib:b_to_ts(Bin),
when Socket :: gen_tcp:socket(), Package = {Realm, Name},
Bin :: binary(). {ok, Key} = zx_key:load(KeyID),
%% @private case zx_key:verify(Bin, Sig, Key) of
%% Single point to convert a binary message to a safe internal message. Actual handling true ->
%% of the converted message occurs in dispatch/2. zx_daemon:notify(Package, {update, PackageID});
false ->
handle_message(Socket, Bin) -> ok = log(error, "Received an unverified update message: ~tp", [Message]),
Message = binary_to_term(Bin, [safe]),
ok = log(info, "Received network message: ~tp", [Message]),
case binary_to_term(Bin, [safe]) of
ping ->
zx_net:send(Socket, pong);
{sub, Channel, Message} ->
log("Sub: ~tp - ~tp", [Channel, Message]);
{update, Message} ->
log("Update: ~tp", [Message]);
{redirect, Nodes} ->
log("Redirected to ~tp", [Nodes]);
Invalid ->
{ok, {Addr, Port}} = zomp:peername(Socket),
Host = inet:ntoa(Addr),
Warning = "Invalid message from ~s:~p: ~tp",
ok = log(warning, Warning, [Host, Port, Invalid]),
ok = zx_net:disconnect(Socket),
terminate() terminate()
end. end.
%%% Incoming Request Actions %%% Incoming Request Actions
-spec handle_request(Socket, Action) -> Result -spec do_subscribe(gen_tcp:socket(), zx:package()) -> ok | no_return().
when Socket :: gen_tcp:socket(),
Action :: term(),
Result :: {ok, Outcome :: term()}.
handle_request(Socket, Action) -> do_subscribe(Socket, Package) ->
ok = zx_net:send(Socket, Action), Reference = term_to_binary(Package),
Response = Message = <<0:1, 1:7, Reference/binary>>,
case element(1, Action) of ok = gen_tcp:send(Socket, Message),
list -> wait_ok(Socket).
do_list(Action, Socket);
latest ->
do_latest(Action, Socket);
fetch ->
do_fetch(Action, Socket);
key ->
do_key(Action, Socket);
pending ->
do_pending(Action, Socket);
packagers ->
do_packagers(Action, Socket);
maintainers ->
do_maintainers(Action, Socket);
sysops ->
do_sysops(Action, Socket)
end,
handle_response(Socket, Response).
handle_fetch(_, _) -> {error, nyi}. -spec do_unsubscribe(gen_tcp:socket(), zx:package()) -> ok | no_return().
handle_query(_, _) -> {error, nyi}. do_unsubscribe(Socket, Package) ->
Reference = term_to_binary(Package),
do_list(_, _) -> {error, nyi}. Message = <<0:1, 2:7, Reference/binary>>,
ok = gen_tcp:send(Socket, Message),
do_latest(_, _) -> {error, nyi}. wait_ok(Socket).
do_fetch(_, _) -> {error, nyi}.
do_key(_, _) -> {error, nyi}.
do_pending(_, _) -> {error, nyi}.
do_packagers(_, _) -> {error, nyi}.
do_maintainers(_, _) -> {error, nyi}.
do_sysops(_, _) -> {error, nyi}.
do_query(Socket, {list, Realm}) ->
Reference = term_to_binary(Realm),
Message = <<0:1, 3:7, Reference/binary>>,
send_query(Socket, Message);
do_query(Socket, {list, Realm, Name}) ->
Reference = term_to_binary({Realm, Name}),
Message = <<0:1, 4:7, Reference/binary>>,
send_query(Socket, Message);
do_query(Socket, {list, Realm, Name, Version}) ->
Reference = term_to_binary({Realm, Name, Version}),
Message = <<0:1, 4:7, Reference/binary>>,
send_query(Socket, Message);
do_query(Socket, {latest, Realm, Name}) ->
Reference = term_to_binary({Realm, Name}),
Message = <<0:1, 6:7, Reference/binary>>,
send_query(Socket, Message);
do_query(Socket, {latest, Realm, Name, Version}) ->
Reference = term_to_binary({Realm, Name, Version}),
Message = <<0:1, 6:7, Reference/binary>>,
send_query(Socket, Message).
handle_response(Socket, Command) ->
send_query(Socket, Message) ->
ok = gen_tcp:send(Socket, Message),
wait_query(Socket).
wait_query(Socket) ->
ok = inet:setopts(Socket, [{active, once}]),
receive receive
{tcp, Socket, Bin} -> {tcp, Socket, <<1:1, 0:7>>} ->
Outcome = binary_to_term(Bin, [safe]), ok = pong(Socket),
interpret_response(Socket, Outcome, Command); wait_query(Socket);
{tcp_closed, Socket} -> {tcp, Socket, <<0:1, 0:7, Bin/binary>>} ->
handle_unexpected_close() {ok, Result} = zx_lib:b_to_ts(Bin),
Result;
{tcp, Socket, <<0:1, 1:7>>} ->
ok = zx_daemon:report(failed),
terminate();
{tcp, Socket, <<0:1, 2:7>>} ->
{error, bad_realm};
{tcp, Socket, <<0:1, 3:7>>} ->
{error, bad_package};
{tcp, Socket, <<0:1, 4:7>>} ->
{error, bad_version}
after 5000 -> after 5000 ->
handle_timeout(Socket) handle_timeout(Socket)
end. end.
interpret_response(Socket, ping, Command) -> pong(Socket) ->
ok = zx_net:send(Socket, pong), gen_tcp:send(Socket, <<1:1, 0:7>>).
handle_response(Socket, Command);
interpret_response(Socket, {sub, Channel, Message}, Command) ->
ok = zx_daemon:notify(Channel, Message),
handle_response(Socket, Command).
%interpret_response(Socket, {update, Message}, Command) ->
%interpret_response(Socket, Response, list) ->
%interpret_response(Socket, Response, latest) ->
%interpret_response(Socket, Response, fetch) ->
%interpret_response(Socket, Response, key) ->
%interpret_response(Socket, Response, pending) ->
%interpret_response(Socket, Response, packagers) ->
%interpret_response(Socket, Response, maintainers) ->
%interpret_response(Socket, Response, sysops) ->
%
%
%
% case element(1, Action) of
% end,
-spec fetch(Socket, PackageID) -> Result -spec do_fetch(Socket, ID, PackageID) -> Result
when Socket :: gen_tcp:socket(), when Socket :: gen_tcp:socket(),
ID :: zx_daemon:id(),
PackageID :: zx:package_id(), PackageID :: zx:package_id(),
Result :: ok. Result :: ok.
%% @private %% @private
%% Download a package to the local cache. %% Download a package to the local cache.
fetch(Socket, PackageID) -> do_fetch(Socket, ID, PackageID) ->
{ok, LatestID} = request_zsp(Socket, PackageID), Reference = binary_to_term(PackageID),
ok = receive_zsp(Socket, LatestID), Message = <<0:1, 7:7, Reference/binary>>,
Latest = zx_lib:package_string(LatestID), ok = gen_tcp:send(Socket, Message),
log(info, "Fetched ~ts", [Latest]). ok = wait_hops(Socket, ID),
{ok, Bin} = receive_zsp(Socket),
zx_daemon:result(ID, {done, Bin}).
-spec request_zsp(Socket, PackageID) -> Result wait_hops(Socket, ID) ->
when Socket :: gen_tcp:socket(), ok = inet:setopts(Socket, [{active, once}]),
PackageID :: zx:package_id(),
Result :: {ok, Latest :: zx:package_id()}
| {error, Reason :: timeout | term()}.
request_zsp(Socket, PackageID) ->
ok = zx_net:send(Socket, {fetch, PackageID}),
receive receive
{tcp, Socket, Bin} -> {tcp, Socket, <<0:1, 0:7, 0:8>>} ->
case binary_to_term(Bin) of ok = inet:setopts(Socket, [{packet, 0}]),
{sending, LatestID} -> gen_tcp:send(Socket, <<0:1, 0:7>>);
{ok, LatestID}; {tcp, Socket, <<0:1, 0:7, Distance:8>>} ->
Error = {error, Reason} -> ok = zx_daemon:result(ID, {hops, Distance}),
PackageString = zx_lib:package_string(PackageID), wait_hops(Socket, ID);
Message = "Error receiving package ~ts: ~tp", {tcp, Socket, <<0:1, 1:7>>} ->
ok = log(info, Message, [PackageString, Reason]), {error, bad_message};
Error {tcp, Socket, <<0:1, 2:7>>} ->
{error, bad_realm};
{tcp, Socket, <<0:1, 3:7>>} ->
{error, bad_package};
{tcp, Socket, <<0:1, 4:7>>} ->
{error, bad_version};
{tcp, Socket, <<0:1, 5:7>>} ->
handle_timeout(Socket)
after 60000 ->
handle_timeout(Socket)
end.
receive_zsp(Socket) ->
ok = inet:setopts(Socket, [{active, once}]),
receive
{tcp, Socket, <<Size:32, Bin:Size/binary>>} ->
{ok, Bin};
{tcp, Socket, <<Total:32, Bin/binary>>} ->
Size = byte_size(Bin),
receive_zsp(Socket, Total, Size, Bin)
after 5000 ->
handle_timeout(Socket)
end.
receive_zsp(Socket, Total, SoFar, Bin) when Total > SoFar ->
ok = inet:setopts(Socket, [{active, once}]),
receive
{tcp, Socket, New} ->
Size = byte_size(New),
NewBin = <<Bin/binary, New/binary>>,
receive_zsp(Socket, Total, Size + SoFar, NewBin)
after 5000 ->
handle_timeout(Socket)
end; end;
{tcp_closed, Socket} -> receive_zsp(Socket, Total, Total, Bin) ->
handle_unexpected_close() ok = inet:setopts(Socket, [{packet, 4}]),
after 60000 -> ok = gen_tcp:send(Socket, <<0:1, 0:7>>),
{error, timeout} {ok, Bin}.
end.
-spec receive_zsp(Socket, PackageID) -> Result
when Socket :: gen_tcp:socket(),
PackageID :: zx:package_id(),
Result :: ok | {error, timeout}.
receive_zsp(Socket, PackageID) ->
receive
{tcp, Socket, Bin} ->
ZrpPath = zx_lib:zsp_path(PackageID),
ok = file:write_file(ZrpPath, Bin),
ok = zx_net:send(Socket, ok),
log(info, "Wrote ~ts", [ZrpPath]);
{tcp_closed, Socket} ->
handle_unexpected_close()
after 60000 ->
ok = log(error, "Timeout in socket receive for ~tp", [PackageID]),
{error, timeout}
end.
%%% Terminal handlers %%% Terminal handlers
@ -426,6 +401,15 @@ handle_timeout(Socket) ->
terminate(). terminate().
-spec wait_ok(gen_tcp:socket()) -> ok | no_return().
wait_ok(Socket) ->
receive
{tcp, Socket, <<0:1, 0:7>>} -> ok
after 5000 -> handle_timeout(Socket)
end.
-spec terminate() -> no_return(). -spec terminate() -> no_return().
%% @private %% @private
%% Convenience wrapper around the suicide call. %% Convenience wrapper around the suicide call.
@ -435,109 +419,3 @@ handle_timeout(Socket) ->
terminate() -> terminate() ->
exit(normal). exit(normal).
%-spec do_query_latest(Object, State) -> {Result, NewState}
% when Object :: zx:package() | zx:package_id(),
% State :: state(),
% Result :: {ok, zx:version()}
% | {error, Reason},
% Reason :: bad_realm
% | bad_package
% | bad_version,
% NewState :: state().
%% @private
%% Queries a zomp realm for the latest version of a package or package
%% version (complete or incomplete version number).
%
%do_query_latest(Socket, {Realm, Name}) ->
% ok = zx_net:send(Socket, {latest, Realm, Name}),
% receive
% {tcp, Socket, Bin} -> binary_to_term(Bin)
% after 5000 -> {error, timeout}
% end;
%do_query_latest(Socket, {Realm, Name, Version}) ->
% ok = zx_net:send(Socket, {latest, Realm, Name, Version}),
% receive
% {tcp, Socket, Bin} -> binary_to_term(Bin)
% after 5000 -> {error, timeout}
% end.
%-spec do_fetch(PackageIDs, State) -> NewState
% when PackageIDs :: [zx:package_id()],
% State :: state(),
% NewState :: state(),
% Result :: ok
% | {error, Reason},
% Reason :: bad_realm
% | bad_package
% | bad_version
% | network.
%% @private
%%
%
%do_fetch(PackageIDs, State) ->
% FIXME: Need to create a job queue divided by realm and dispatched to connectors,
% and cleared from the master pending queue kept here by the daemon as the
% workers succeed. Basic task queue management stuff... which never existed
% in ZX before... grrr...
% case scrub(PackageIDs) of
% [] ->
% ok;
% Needed ->
% Partitioned = partition_by_realm(Needed),
% EnsureDeps =
% fun({Realm, Packages}) ->
% ok = zx_conn:queue_package(Pid, Realm, Packages),
% log(info, "Disconnecting from realm: ~ts", [Realm])
% end,
% lists:foreach(EnsureDeps, Partitioned)
% end.
%
%
%partition_by_realm(PackageIDs) ->
% PartitionMap = lists:foldl(fun partition_by_realm/2, #{}, PackageIDs),
% maps:to_list(PartitionMap).
%
%
%partition_by_realm({R, P, V}, M) ->
% maps:update_with(R, fun(Ps) -> [{P, V} | Ps] end, [{P, V}], M).
%
%
%ensure_deps(_, _, []) ->
% ok;
%ensure_deps(Socket, Realm, [{Name, Version} | Rest]) ->
% ok = ensure_dep(Socket, {Realm, Name, Version}),
% ensure_deps(Socket, Realm, Rest).
%
%
%-spec ensure_dep(gen_tcp:socket(), package_id()) -> ok | no_return().
%% @private
%% Given an PackageID as an argument, check whether its package file exists in the
%% system cache, and if not download it. Should return `ok' whenever the file is
%% sourced, but exit with an error if it cannot locate or acquire the package.
%
%ensure_dep(Socket, PackageID) ->
% ZrpFile = zx_lib:zsp_path(PackageID),
% ok =
% case filelib:is_regular(ZrpFile) of
% true -> ok;
% false -> fetch(Socket, PackageID)
% end,
% ok = install(PackageID),
% build(PackageID).
%
%
%-spec scrub(Deps) -> Scrubbed
% when Deps :: [package_id()],
% Scrubbed :: [package_id()].
%% @private
%% Take a list of dependencies and return a list of dependencies that are not yet
%% installed on the system.
%
%scrub([]) ->
% [];
%scrub(Deps) ->
% lists:filter(fun(PackageID) -> not zx_lib:installed(PackageID) end, Deps).

View File

@ -235,14 +235,10 @@
| {list, zx:realm()} | {list, zx:realm()}
| {list, zx:realm(), zx:name()} | {list, zx:realm(), zx:name()}
| {list, zx:realm(), zx:name(), zx:version()} | {list, zx:realm(), zx:name(), zx:version()}
| {latest, zx:realm(), zx:name()}
| {latest, zx:realm(), zx:name(), zx:version()} | {latest, zx:realm(), zx:name(), zx:version()}
| {fetch, zx:realm(), zx:name(), zx:version()} | {fetch, zx:realm(), zx:name(), zx:version()}
| {fetchkey, zx:realm(), zx:key_name()} | {fetchkey, zx:realm(), zx:key_name()}.
| {pending, zx:realm(), zx:name()}
| {resigns, zx:realm()}
| {packagers, zx:realm(), zx:name()}
| {maintainers, zx:realm(), zx:name()}
| {sysops, zx:realm()}.
% Outgoing Result Messages % Outgoing Result Messages
% %
@ -251,7 +247,7 @@
% %
% Subscription messages are a separate type below. % Subscription messages are a separate type below.
-type result() :: {z_result, -type result() :: {result,
RequestID :: id(), RequestID :: id(),
Message :: realm_list() Message :: realm_list()
| package_list() | package_list()
@ -581,8 +577,8 @@ report(Message) ->
%% @private %% @private
%% Return a tagged result back to the daemon to be forwarded to the original requestor. %% Return a tagged result back to the daemon to be forwarded to the original requestor.
result(Reference, Result) -> result(ID, Result) ->
gen_server:cast(?MODULE, {result, Reference, Result}). gen_server:cast(?MODULE, {result, ID, Result}).
-spec notify(Package, Message) -> ok -spec notify(Package, Message) -> ok
@ -681,8 +677,8 @@ handle_cast({report, Conn, Message}, State) ->
NextState = do_report(Conn, Message, State), NextState = do_report(Conn, Message, State),
NewState = eval_queue(NextState), NewState = eval_queue(NextState),
{noreply, NewState}; {noreply, NewState};
handle_cast({result, Ref, Result}, State) -> handle_cast({result, ID, Result}, State) ->
NextState = do_result(Ref, Result, State), NextState = do_result(ID, Result, State),
NewState = eval_queue(NextState), NewState = eval_queue(NextState),
{noreply, NewState}; {noreply, NewState};
handle_cast({notify, Conn, Package, Update}, State) -> handle_cast({notify, Conn, Package, Update}, State) ->
@ -982,7 +978,7 @@ do_result(ID, Result, State = #s{requests = Requests, dropped = Dropped, mx = MX
{Dropped, NextR, NextMX}; {Dropped, NextR, NextMX};
{Request, Rest} -> {Request, Rest} ->
Requestor = element(1, Request), Requestor = element(1, Request),
Requestor ! {z_result, ID, Result}, Requestor ! {result, ID, Result},
NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX), NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX),
{Dropped, Rest, NextMX}; {Dropped, Rest, NextMX};
error -> error ->
@ -998,14 +994,14 @@ handle_fetch_result(ID, {done, Bin}, {Requestor, _}, Requests, MX) ->
ok -> done; ok -> done;
Error -> Error Error -> Error
end, end,
Requestor ! {z_result, ID, Result}, Requestor ! {result, ID, Result},
NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX), NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX),
{NextMX, Requests}; {NextMX, Requests};
handle_fetch_result(ID, Hops = {hops, _}, Request = {Requestor, _}, Requests, MX) -> handle_fetch_result(ID, Hops = {hops, _}, Request = {Requestor, _}, Requests, MX) ->
Requestor ! {z_result, ID, Hops}, Requestor ! {result, ID, Hops},
{MX, maps:put(ID, Request, Requests)}; {MX, maps:put(ID, Request, Requests)};
handle_fetch_result(ID, Outcome, {Requestor, _}, Requests, MX) -> handle_fetch_result(ID, Outcome, {Requestor, _}, Requests, MX) ->
Requestor ! {z_result, ID, Outcome}, Requestor ! {result, ID, Outcome},
NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX), NextMX = mx_del_monitor(Requestor, {requestor, ID}, MX),
{NextMX, Requests}. {NextMX, Requests}.
@ -1252,10 +1248,10 @@ do_fetch(PackageID, Requestor, State = #s{id = ID}) ->
{ok, Bin} -> {ok, Bin} ->
case do_import_package(Bin) of case do_import_package(Bin) of
ok -> ok ->
Requestor ! {z_result, ID, ok}, Requestor ! {result, ID, ok},
{ok, State}; {ok, State};
Error -> Error ->
Requestor ! {z_result, ID, Error}, Requestor ! {result, ID, Error},
{ok, State} {ok, State}
end; end;
{error, enoent} -> {error, enoent} ->
@ -1263,7 +1259,7 @@ do_fetch(PackageID, Requestor, State = #s{id = ID}) ->
Action = {fetch, Realm, Name, Version}, Action = {fetch, Realm, Name, Version},
do_request(Requestor, Action, State); do_request(Requestor, Action, State);
Error -> Error ->
Requestor ! {z_result, ID, Error} Requestor ! {result, ID, Error}
end. end.

View File

@ -17,7 +17,7 @@
-export([zomp_dir/0, find_zomp_dir/0, -export([zomp_dir/0, find_zomp_dir/0,
path/1, path/2, path/3, path/4, ppath/2, path/1, path/2, path/3, path/4, ppath/2,
force_dir/1, mktemp_dir/1, force_dir/1, mktemp_dir/1,
list_realms/0, list_realms/0, realm_exists/1,
get_prime/1, realm_meta/1, get_prime/1, realm_meta/1,
read_project_meta/0, read_project_meta/1, read_package_meta/1, read_project_meta/0, read_project_meta/1, read_package_meta/1,
write_project_meta/1, write_project_meta/2, write_project_meta/1, write_project_meta/2,
@ -192,6 +192,12 @@ list_realms() ->
[filename:basename(filename:dirname(C)) || C <- filelib:wildcard(Pattern)]. [filename:basename(filename:dirname(C)) || C <- filelib:wildcard(Pattern)].
-spec realm_exists(zx:realm()) -> boolean().
realm_exists(Realm) ->
lists:member(Realm, list_realms()).
-spec get_prime(Realm) -> Result -spec get_prime(Realm) -> Result
when Realm :: zx:realm(), when Realm :: zx:realm(),
Result :: {ok, zx:host()} Result :: {ok, zx:host()}

View File

@ -335,7 +335,8 @@ list_realms() ->
%% them to stdout. %% them to stdout.
list_packages(Realm) -> list_packages(Realm) ->
case zx_daemon:list_packages(Realm) of {ok, ID} = zx_daemon:list(Realm),
case wait_result(ID) of
{ok, []} -> {ok, []} ->
io:format("Realm ~tp has no packages available.~n", [Realm]); io:format("Realm ~tp has no packages available.~n", [Realm]);
{ok, Packages} -> {ok, Packages} ->
@ -364,15 +365,16 @@ list_versions(PackageString) ->
end. end.
list_versions2(PackageID) -> list_versions2({Realm, Name, Version}) ->
case zx_daemon:list_versions(PackageID) of {ok, ID} = zx_daemon:list(Realm, Name, Version),
case wait_result(ID) of
{ok, []} -> {ok, []} ->
io:format("No versions available.~n"); io:format("No versions available.~n");
{ok, Versions} -> {ok, Versions} ->
Print = Print =
fun(Version) -> fun(V) ->
{ok, VersionString} = zx_lib:version_to_string(Version), {ok, VS} = zx_lib:version_to_string(V),
io:format("~ts~n", [VersionString]) io:format("~ts~n", [VS])
end, end,
lists:foreach(Print, Versions); lists:foreach(Print, Versions);
{error, bad_realm} -> {error, bad_realm} ->
@ -385,6 +387,10 @@ list_versions2(PackageID) ->
end. end.
wait_result(ID) ->
receive {result, ID, Result} -> Result end.
-spec import_realm(Path) -> zx:outcome() -spec import_realm(Path) -> zx:outcome()
when Path :: file:filename(). when Path :: file:filename().
%% @private %% @private

View File

@ -9,25 +9,11 @@
-copyright("Craig Everett <zxq9@zxq9.com>"). -copyright("Craig Everett <zxq9@zxq9.com>").
-license("GPL-3.0"). -license("GPL-3.0").
-export([send/2, disconnect/1]). -export([disconnect/1]).
-include("zx_logger.hrl"). -include("zx_logger.hrl").
-spec send(Socket, Message) -> Result
when Socket :: gen_tcp:socket(),
Message :: term(),
Result :: ok
| {error, Reason},
Reason :: closed | inet:posix().
%% @doc
%% Packages an Erlang term and sends it to the indicated socket.
send(Socket, Message) ->
BinMessage = term_to_binary(Message),
gen_tcp:send(Socket, BinMessage).
-spec disconnect(Socket) -> ok -spec disconnect(Socket) -> ok
when Socket :: gen_tcp:socket(). when Socket :: gen_tcp:socket().
%% @doc %% @doc

View File

@ -76,7 +76,7 @@ populate_data(List) ->
Retries = Retries =
case proplists:get_value(retries, List, 3) of case proplists:get_value(retries, List, 3) of
RT when is_integer(RT) and RT > 0 -> {RT, RT}; RT when is_integer(RT) and RT > 0 -> {RT, RT};
_ -> 3 _ -> {3, 3}
end, end,
MaxConn = MaxConn =
case proplists:get_value(maxconn, List, 5) of case proplists:get_value(maxconn, List, 5) of