%%% @private %%% Hakuzaru Request Manager for Erlang %%% %%% This process is responsible for remembering the configured nodes and dispatching %%% requests to them. Request dispatch is made in a round-robin fashion with forwarded %%% gen_server return `From' values passed to the request worker instead of being %%% responded to directly by the manager itself (despite requests being generated as %%% gen_server:call/3s. %%% @end -module(hz_man). -vsn("0.5.1"). -behavior(gen_server). -author("Craig Everett "). -copyright("Craig Everett "). -license("MIT"). %% Admin functions -export([tls/0, tls/1, chain_nodes/0, chain_nodes/1, timeout/0, timeout/1]). %% The whole point of this module: -export([request/1, request/2]). %% gen_server goo -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). %%% Type and Record Definitions -record(fetcher, {pid = none :: none | pid(), mon = none :: none | reference(), time = none :: none | integer(), % nanosecond timestamp node = none :: none | hz:chain_node(), from = none :: none | gen_server:from(), req = none :: none | binary()}). -record(s, {tls = false :: boolean(), chain_nodes = {[], []} :: {[hz:chain_node()], [hz:chain_node()]}, sticky = none :: none | hz:chain_node(), fetchers = [] :: [#fetcher{}], timeout = 5000 :: pos_integer()}). -type state() :: #s{}. %%% Service Interface -spec tls() -> boolean(). tls() -> gen_server:call(?MODULE, tls). -spec tls(boolean()) -> ok. tls(Boolean) -> gen_server:cast(?MODULE, {tls, Boolean}). -spec chain_nodes() -> Used when Used :: [hz:chain_node()]. chain_nodes() -> gen_server:call(?MODULE, chain_nodes). -spec chain_nodes(ToUse) -> ok when ToUse :: [hz:chain_node()]. chain_nodes(ToUse) -> gen_server:cast(?MODULE, {chain_nodes, ToUse}). -spec timeout() -> Value when Value :: pos_integer(). timeout() -> gen_server:call(?MODULE, timeout). -spec timeout(Value) -> ok when Value :: pos_integer(). timeout(Value) when 0 < Value, Value =< 120000 -> gen_server:cast(?MODULE, {timeout, Value}). -spec request(Path) -> {ok, Value} | {error, Reason} when Path :: unicode:charlist(), Value :: map(), Reason :: hz:chain_error(). request(Path) -> gen_server:call(?MODULE, {request, {get, Path}}, infinity). -spec request(Path, Data) -> {ok, Value} | {error, Reason} when Path :: unicode:charlist(), Data :: unicode:charlist(), Value :: map(), Reason :: hz:chain_error(). request(Path, Data) -> gen_server:call(?MODULE, {request, {post, Path, Data}}, infinity). %%% Startup Functions -spec start_link() -> Result when Result :: {ok, pid()} | {error, Reason :: term()}. %% @private %% This should only ever be called by v_clients (the service-level supervisor). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). -spec init(none) -> {ok, state()}. %% @private %% Called by the supervisor process to give the process a chance to perform any %% preparatory work necessary for proper function. init(none) -> ok = io:format("hz_man starting.~n"), State = #s{}, {ok, State}. %%% gen_server Message Handling Callbacks handle_call({request, Request}, From, State) -> NewState = do_request(Request, From, State), {noreply, NewState}; handle_call(tls, _, State = #s{tls = TLS}) -> {reply, TLS, State}; handle_call(chain_nodes, _, State = #s{chain_nodes = {Wait, Used}}) -> Nodes = lists:append(Wait, Used), {reply, Nodes, State}; handle_call(timeout, _, State = #s{timeout = Value}) -> {reply, Value, State}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call from ~tp: ~tp~n", [From, Unexpected]), {noreply, State}. handle_cast({tls, Boolean}, State) -> NewState = do_tls(Boolean, State), {noreply, NewState}; handle_cast({chain_nodes, []}, State) -> {noreply, State#s{chain_nodes = {[], []}}}; handle_cast({chain_nodes, ToUse}, State) -> {noreply, State#s{chain_nodes = {ToUse, []}}}; handle_cast({timeout, Value}, State) -> {noreply, State#s{timeout = Value}}; handle_cast(Unexpected, State) -> ok = log(warning, "Unexpected cast: ~tp~n", [Unexpected]), {noreply, State}. handle_info({'DOWN', Mon, process, PID, Info}, State) -> NewState = handle_down(PID, Mon, Info, State), {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]), {noreply, State}. handle_down(_, Mon, normal, State = #s{fetchers = Fetchers}) -> NewFetchers = lists:keydelete(Mon, #fetcher.mon, Fetchers), State#s{fetchers = NewFetchers}; handle_down(PID, Mon, Info, State = #s{fetchers = Fetchers}) -> case lists:keytake(Mon, #fetcher.mon, Fetchers) of {value, #fetcher{time = Time, node = Node, from = From, req = R}, Remaining} -> TS = calendar:system_time_to_rfc3339(Time, [{unit, nanosecond}]), Format = "ERROR ~ts: Fetcher process ~130tp exited while making request to ~130tp~n" "Exit reason:~n" "~tp~n" "Request contents:~n" "~tp~n~n", Formatted = io_lib:format(Format, [TS, PID, Node, Info, R]), Message = unicode:characters_to_list(Formatted), ok = gen_server:reply(From, {error, Message}), State#s{fetchers = Remaining}; false -> Unexpected = {'DOWN', Mon, process, PID, Info}, ok = log(warning, "Unexpected info: ~w", [Unexpected]), State end. %%% OTP Service Functions code_change(_, State, _) -> {ok, State}. terminate(_, _) -> ok. %%% Doer Functions do_tls(true, State) -> ok = ssl:start(), State#s{tls = true}; do_tls(false, State) -> State#s{tls = false}; do_tls(_, State) -> State. do_request(_, From, State = #s{chain_nodes = {[], []}}) -> ok = gen_server:reply(From, {error, no_nodes}), State; do_request(Request, From, State = #s{tls = false, fetchers = Fetchers, chain_nodes = {[Node | Rest], Used}, timeout = Timeout}) -> Now = erlang:system_time(nanosecond), Fetcher = fun() -> hz_fetcher:connect(Node, Request, From, Timeout) end, {PID, Mon} = spawn_monitor(Fetcher), New = #fetcher{pid = PID, mon = Mon, time = Now, node = Node, from = From, req = Request}, State#s{fetchers = [New | Fetchers], chain_nodes = {Rest, [Node | Used]}}; do_request(Request, From, State = #s{tls = true, fetchers = Fetchers, chain_nodes = {[Node | Rest], Used}, timeout = Timeout}) -> Now = erlang:system_time(nanosecond), Fetcher = fun() -> hz_fetcher:slowly_connect(Node, Request, From, Timeout) end, {PID, Mon} = spawn_monitor(Fetcher), New = #fetcher{pid = PID, mon = Mon, time = Now, node = Node, from = From, req = Request}, State#s{fetchers = [New | Fetchers], chain_nodes = {Rest, [Node | Used]}}; do_request(Request, From, State = #s{chain_nodes = {[], Used}}) -> Fresh = lists:reverse(Used), do_request(Request, From, State#s{chain_nodes = {Fresh, []}}). log(Level, Format, Args) -> Raw = io_lib:format("~w ~w: " ++ Format, [?MODULE, self() | Args]), Entry = unicode:characters_to_list(Raw), logger:log(Level, Entry).