Merge branch 'master' into bin_sig

This commit is contained in:
2025-10-29 15:53:03 +09:00
3 changed files with 136 additions and 53 deletions
+74 -23
View File
@@ -21,7 +21,7 @@
timeout/0, timeout/1]).
%% The whole point of this module:
-export([request/1, request/2]).
-export([request_sticky/1, request_sticky/2, request/1, request/2]).
%% gen_server goo
-export([start_link/0]).
@@ -40,11 +40,11 @@
req = none :: none | binary()}).
-record(s,
{tls = false :: boolean(),
chain_nodes = {[], []} :: {[hz:chain_node()], [hz:chain_node()]},
sticky = none :: none | hz:chain_node(),
fetchers = [] :: [#fetcher{}],
timeout = 5000 :: pos_integer()}).
{tls = false :: boolean(),
chain_nodes = {[], []} :: {[hz:chain_node()], [hz:chain_node()]},
sticky = none :: none | hz:chain_node(),
fetchers = [] :: [#fetcher{}],
timeout = 5000 :: pos_integer()}).
-type state() :: #s{}.
@@ -94,6 +94,25 @@ timeout(Value) when 0 < Value, Value =< 120000 ->
gen_server:cast(?MODULE, {timeout, Value}).
-spec request_sticky(Path) -> {ok, Value} | {error, Reason}
when Path :: unicode:charlist(),
Value :: map(),
Reason :: hz:chain_error().
request_sticky(Path) ->
gen_server:call(?MODULE, {request_sticky, {get, Path}}, infinity).
-spec request_sticky(Path, Data) -> {ok, Value} | {error, Reason}
when Path :: unicode:charlist(),
Data :: unicode:charlist(),
Value :: map(),
Reason :: hz:chain_error().
request_sticky(Path, Data) ->
gen_server:call(?MODULE, {request_sticky, {post, Path, Data}}, infinity).
-spec request(Path) -> {ok, Value} | {error, Reason}
when Path :: unicode:charlist(),
Value :: map(),
@@ -145,10 +164,13 @@ init(none) ->
handle_call({request, Request}, From, State) ->
NewState = do_request(Request, From, State),
{noreply, NewState};
handle_call({request_sticky, Request}, From, State) ->
NewState = do_request_sticky(Request, From, State),
{noreply, NewState};
handle_call(tls, _, State = #s{tls = TLS}) ->
{reply, TLS, State};
handle_call(chain_nodes, _, State = #s{chain_nodes = {Wait, Used}}) ->
Nodes = lists:append(Wait, Used),
handle_call(chain_nodes, _, State) ->
Nodes = do_chain_nodes(State),
{reply, Nodes, State};
handle_call(timeout, _, State = #s{timeout = Value}) ->
{reply, Value, State};
@@ -160,10 +182,9 @@ handle_call(Unexpected, From, State) ->
handle_cast({tls, Boolean}, State) ->
NewState = do_tls(Boolean, State),
{noreply, NewState};
handle_cast({chain_nodes, []}, State) ->
{noreply, State#s{chain_nodes = {[], []}}};
handle_cast({chain_nodes, ToUse}, State) ->
{noreply, State#s{chain_nodes = {ToUse, []}}};
handle_cast({chain_nodes, List}, State) ->
NewState = do_chain_nodes(List, State),
{noreply, NewState};
handle_cast({timeout, Value}, State) ->
{noreply, State#s{timeout = Value}};
handle_cast(Unexpected, State) ->
@@ -218,6 +239,23 @@ terminate(_, _) ->
%%% Doer Functions
do_chain_nodes(#s{sticky = none, chain_nodes = {Wait, Used}}) ->
lists:append(Wait, Used);
do_chain_nodes(#s{sticky = Sticky, chain_nodes = {Wait, Used}}) ->
case lists:append(Wait, Used) of
[Sticky] -> [Sticky];
Nodes -> [Sticky | Nodes]
end.
do_chain_nodes([], State) ->
State#s{sticky = none, chain_nodes = {[], []}};
do_chain_nodes(List = [Sticky], State) ->
State#s{sticky = Sticky, chain_nodes = {List, []}};
do_chain_nodes([Sticky | List], State) ->
State#s{sticky = Sticky, chain_nodes = {List, []}}.
do_tls(true, State) ->
ok = ssl:start(),
State#s{tls = true};
@@ -227,17 +265,21 @@ do_tls(_, State) ->
State.
do_request(_, From, State = #s{chain_nodes = {[], []}}) ->
do_request_sticky(_, From, State = #s{sticky = none}) ->
ok = gen_server:reply(From, {error, no_nodes}),
State;
do_request(Request,
From,
State = #s{tls = false,
fetchers = Fetchers,
chain_nodes = {[Node | Rest], Used},
timeout = Timeout}) ->
do_request_sticky(Request,
From,
State = #s{tls = TLS,
fetchers = Fetchers,
sticky = Node,
timeout = Timeout}) ->
Now = erlang:system_time(nanosecond),
Fetcher = fun() -> hz_fetcher:connect(Node, Request, From, Timeout) end,
Fetcher =
case TLS of
true -> fun() -> hz_fetcher:connect_slowly(Node, Request, From, Timeout) end;
false -> fun() -> hz_fetcher:connect(Node, Request, From, Timeout) end
end,
{PID, Mon} = spawn_monitor(Fetcher),
New = #fetcher{pid = PID,
mon = Mon,
@@ -245,15 +287,24 @@ do_request(Request,
node = Node,
from = From,
req = Request},
State#s{fetchers = [New | Fetchers], chain_nodes = {Rest, [Node | Used]}};
State#s{fetchers = [New | Fetchers]}.
do_request(_, From, State = #s{chain_nodes = {[], []}}) ->
ok = gen_server:reply(From, {error, no_nodes}),
State;
do_request(Request,
From,
State = #s{tls = true,
State = #s{tls = TLS,
fetchers = Fetchers,
chain_nodes = {[Node | Rest], Used},
timeout = Timeout}) ->
Now = erlang:system_time(nanosecond),
Fetcher = fun() -> hz_fetcher:slowly_connect(Node, Request, From, Timeout) end,
Fetcher =
case TLS of
true -> fun() -> hz_fetcher:connect_slowly(Node, Request, From, Timeout) end;
false -> fun() -> hz_fetcher:connect(Node, Request, From, Timeout) end
end,
{PID, Mon} = spawn_monitor(Fetcher),
New = #fetcher{pid = PID,
mon = Mon,