gmhive_client/src/gmhc_handler.erl

197 lines
6.1 KiB
Erlang

-module(gmhc_handler).
-vsn("0.6.1").
-behavior(gen_server).
-export([ start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([ call/1
, async_call/1
, notify/1
, pool_connected/2
, from_pool/1 ]).
-record(pool, { id :: gmhc_connector:id()
, pid :: pid() | 'undefined'
, mref :: reference()
, connected = false :: boolean()
, keep = true :: boolean()
, host :: string()
, port :: pos_integer()
, opts = #{} :: map() }).
-record(st, {pools = [], opts = #{}}).
-define(CALL_TIMEOUT, 5000).
-include_lib("kernel/include/logger.hrl").
call(Req) ->
try gen_server:call(?MODULE, {call, Req}, ?CALL_TIMEOUT)
catch
exit:Reason ->
{error, Reason}
end.
async_call(Req) ->
try gen_server:call(?MODULE, {async_call, Req}, ?CALL_TIMEOUT)
catch
exit:Reason ->
{error, Reason}
end.
notify(Msg) ->
gen_server:cast(?MODULE, {notify, Msg}).
pool_connected(Id, Opts) ->
gen_server:cast(?MODULE, {pool_connected, Id, self(), Opts}).
from_pool(Msg) ->
ToSend = {from_pool, Msg},
?MODULE ! ToSend.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({call, Req}, _From, #st{} = S) ->
{reply, call_connector(Req), S};
handle_call({async_call, Req}, _From, #st{} = S) ->
{reply, call_connector(Req, false), S};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_method}, S}.
handle_cast({pool_connected, Id, Pid, Opts}, #st{pools = Pools} = S) ->
MRef = erlang:monitor(process, Pid),
case lists:keyfind(Id, #pool.id, Pools) of
#pool{} = P ->
P1 = P#pool{ connected = true
, pid = Pid
, mref = MRef },
Pools1 = lists:keyreplace(Id, #pool.id, Pools, P1),
{noreply, S#st{pools = Pools1}};
false ->
P = #pool{ id = Id
, pid = Pid
, mref = MRef
, host = maps:get(host, Opts)
, port = maps:get(port, Opts)
, opts = Opts },
{noreply, S#st{pools = [P | Pools]}}
end;
handle_cast({notify, Msg}, #st{} = S) ->
notify_connector(Msg),
{noreply, S};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({from_pool, Msg}, S) ->
maybe_publish(Msg),
case Msg of
#{notification := #{new_server := #{ host := _
, port := _
, keep := _ } = Server}} ->
{noreply, start_pool_connector(Server, S)};
_ ->
gmhc_server:from_pool(Msg),
{noreply, S}
end;
handle_info(Msg, S) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
maybe_publish(#{notification := Msg} = N) ->
Info = maybe_via(N, #{msg => Msg}),
gmhc_events:publish(pool_notification, Info),
if map_size(Msg) == 1 ->
[Tag] = maps:keys(Msg),
gmhc_events:publish({pool_notification, Tag}, Info);
true ->
ok
end;
maybe_publish(_) ->
ok.
maybe_via(#{via := Via}, Info) ->
Info#{via => Via}.
call_connector(Req) ->
call_connector(Req, true).
call_connector(Req0, Wait) ->
{ViaId, Req} = maps:take(via, Req0),
case gmhc_connector:whereis_id(ViaId) of
undefined ->
{error, no_connection};
Pid when is_pid(Pid) ->
Id = erlang:unique_integer(),
MRef = case Wait of
true -> erlang:monitor(process, Pid);
false -> none
end,
gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}),
case Wait of
true ->
receive
{from_pool, #{reply := #{ id := Id
, result := Result }}} ->
erlang:demonitor(MRef),
Result;
{from_pool, #{error := #{ id := Id } = Error}} ->
erlang:demonitor(MRef),
{error, maps:remove(id, Error)};
{'DOWN', MRef, _, _, _} ->
{error, no_connection}
after 5000 ->
erlang:demonitor(MRef),
{error, {timeout, process_info(self(), messages)}}
end;
false ->
ok
end
end.
notify_connector(Msg0) ->
{Via, Msg} = maps:take(via, Msg0),
gmhc_connector:send(Via, #{notification => #{msg => Msg}}).
start_pool_connector(#{ host := Host
, port := Port
, keep := Keep }, #st{pools = Pools, opts = Opts} = S) ->
case [P || #pool{host = H1, port = P1} = P <- Pools,
H1 == Host,
P1 == Port] of
[] ->
case gmhive_client:connect(Opts#{ host => Host
, port => Port
, nowait => true }) of
{ok, CId} ->
Pid = gmhc_connector:whereis_id(CId),
MRef = erlang:monitor(process, Pid),
P = #pool{ id = CId, host = Host, port = Port,
keep = Keep, pid = Pid, mref = MRef },
S#st{pools = [P | Pools]};
{error, _} = Error ->
?LOG_WARNING("Could not start connector to ~p:~p - ~p",
[Host, Port, Error]),
S
end;
[#pool{} = _P] ->
?LOG_DEBUG("Already have a pool entry for ~p:~p", [Host, Port]),
S
end.