roster is broadcast
This commit is contained in:
parent
25a775ee96
commit
6028bb5850
@ -23,6 +23,12 @@ start_link() ->
|
||||
|
||||
init([]) ->
|
||||
RestartStrategy = {one_for_one, 1, 60},
|
||||
WebRTC = {fd_httpd_webrtc,
|
||||
{fd_httpd_webrtc, start_link, []},
|
||||
permanent,
|
||||
5000,
|
||||
worker,
|
||||
[fd_httpd_webrtc]},
|
||||
FileCache = {fd_httpd_sfc,
|
||||
{fd_httpd_sfc, start_link, []},
|
||||
permanent,
|
||||
@ -35,5 +41,6 @@ init([]) ->
|
||||
5000,
|
||||
supervisor,
|
||||
[fd_httpd_clients]},
|
||||
Children = [FileCache, Clients],
|
||||
Children = [WebRTC, FileCache, Clients],
|
||||
%Children = [FileCache, Clients],
|
||||
{ok, {RestartStrategy, Children}}.
|
||||
|
||||
@ -293,9 +293,9 @@ respond_static(Sock, not_found) ->
|
||||
%% ------------------------------
|
||||
|
||||
-record(rs,
|
||||
{received = <<>> :: binary(),
|
||||
username = undefined :: undefined | string(),
|
||||
peers = undefined :: undefined | [string()]}).
|
||||
{socket :: gen_tcp:socket(),
|
||||
received = <<>> :: binary(),
|
||||
username = undefined :: undefined | string()}).
|
||||
|
||||
-type webrtc_state() :: #rs{}.
|
||||
|
||||
@ -304,7 +304,7 @@ ws_webrtc(Sock, Request, Received) ->
|
||||
case qhl_ws:handshake(Request) of
|
||||
{ok, Response} ->
|
||||
fd_httpd_utils:respond(Sock, Response),
|
||||
ws_webrtc_loop(Sock, #rs{received = Received});
|
||||
ws_webrtc_loop(#rs{socket = Sock, received = Received});
|
||||
Error ->
|
||||
tell("ws_webrtc: error: ~tp", [Error]),
|
||||
fd_httpd_utils:http_err(Sock, 400)
|
||||
@ -318,20 +318,75 @@ ws_webrtc(Sock, Request, Received) ->
|
||||
|
||||
-define(WEBRTC_TIMEOUT, 30*qhl_ws:min()).
|
||||
|
||||
|
||||
|
||||
-spec ws_webrtc_loop(webrtc_state()) -> no_return().
|
||||
|
||||
%% first thing is to get username
|
||||
ws_webrtc_loop(Socket, State = #rs{received = Recv,
|
||||
username = undefined}) ->
|
||||
%Foo = qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT),
|
||||
%tell("~p recv_json: ~p", [self(), Foo]),
|
||||
%error(ur_mom);
|
||||
{ok, ["username", Username], NewRecv} = qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT),
|
||||
ws_webrtc_loop(State = #rs{socket = Socket,
|
||||
received = Recv,
|
||||
username = undefined}) ->
|
||||
{ok, ["username", Username], NewRecv} =
|
||||
qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT),
|
||||
tell("~p ws_webrtc_loop: request username: ~p", [self(), Username]),
|
||||
{ok, ActualUsername} = fd_httpd_webrtc:join(Username),
|
||||
ok = qhl_ws:send_json(Socket, ["username", ActualUsername]),
|
||||
NewState = State#rs{received = NewRecv,
|
||||
username = Username},
|
||||
ws_webrtc_loop(Socket, NewState);
|
||||
ws_webrtc_loop(Socket, State = _) ->
|
||||
tell("~p ws_webrtc_loop nyi state: ~p", [self(), State]),
|
||||
error(nyi).
|
||||
username = ActualUsername},
|
||||
ws_webrtc_loop(NewState);
|
||||
% we have no tcp bytes waiting to be parsed
|
||||
ws_webrtc_loop(State = #rs{socket = Socket,
|
||||
received = <<>>}) ->
|
||||
ok = inet:setopts(Socket, [{active, once}]),
|
||||
receive
|
||||
{tcp, Socket, Message} ->
|
||||
NewState = State#rs{received = Message},
|
||||
ws_webrtc_loop(NewState);
|
||||
{webrtc, Message} ->
|
||||
NewState = ws_webrtc_handle_webrtc(Message, State),
|
||||
ws_webrtc_loop(NewState);
|
||||
{tcp_closed, Socket} ->
|
||||
ok = tell("~p Socket closed, retiring.~n", [self()]),
|
||||
exit(normal)
|
||||
end;
|
||||
% we have TCP bytes sitting and waiting to be parsed
|
||||
ws_webrtc_loop(State = #rs{socket = Socket,
|
||||
received = R}) ->
|
||||
{ok, Message, NewR} = qhl_ws:recv_json(Socket, R, 5000),
|
||||
NewState = ws_webrtc_handle_json(Message, State#rs{received = NewR}),
|
||||
ws_webrtc_loop(NewState).
|
||||
|
||||
|
||||
|
||||
-spec ws_webrtc_handle_webrtc(Message, State) -> NewState
|
||||
when Message :: any(),
|
||||
State :: webrtc_state(),
|
||||
NewState :: webrtc_state().
|
||||
% @private handle a message from the server
|
||||
|
||||
ws_webrtc_handle_webrtc(Message, State = #rs{socket = Sock}) ->
|
||||
tell("~p received message from webrtc: ~p", [self(), Message]),
|
||||
NewState =
|
||||
case Message of
|
||||
{users, Users} ->
|
||||
ok = qhl_ws:send_json(Sock, ["users", Users]),
|
||||
State;
|
||||
_ ->
|
||||
tell("~p unknown webrtc message: ~p", [self, Message])
|
||||
end,
|
||||
NewState.
|
||||
|
||||
|
||||
|
||||
-spec ws_webrtc_handle_json(Message, State) -> NewState
|
||||
when Message :: zj:value(),
|
||||
State :: webrtc_state(),
|
||||
NewState :: webrtc_state().
|
||||
% @private handle a message from the client
|
||||
|
||||
ws_webrtc_handle_json(Message, State) ->
|
||||
tell("~p received json message from client: ~p", [self(), Message]),
|
||||
State.
|
||||
|
||||
|
||||
%% ------------------------------
|
||||
|
||||
174
src/fd_httpd_webrtc.erl
Normal file
174
src/fd_httpd_webrtc.erl
Normal file
@ -0,0 +1,174 @@
|
||||
% @doc webrtc peer pool
|
||||
-module(fd_httpd_webrtc).
|
||||
|
||||
-behavior(gen_server).
|
||||
|
||||
%% api (caller context)
|
||||
-export([
|
||||
join/1
|
||||
]).
|
||||
|
||||
% startup/gen_server callbacks
|
||||
-export([
|
||||
% caller context
|
||||
start_link/0,
|
||||
% process context
|
||||
init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
code_change/3, terminate/2
|
||||
]).
|
||||
|
||||
-include("$zx_include/zx_logger.hrl").
|
||||
|
||||
|
||||
-record(u,
|
||||
{pid :: pid(),
|
||||
username :: string()}).
|
||||
|
||||
-type user() :: #u{}.
|
||||
|
||||
-record(s,
|
||||
{users = [] :: [user()]}).
|
||||
|
||||
-type state() :: #s{}.
|
||||
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% caller context
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
|
||||
|
||||
|
||||
|
||||
-spec join(Username :: string()) -> {ok, ActualUsername :: string()} | {error, any()}.
|
||||
|
||||
join(Username) ->
|
||||
gen_server:call(?MODULE, {join, Username}).
|
||||
|
||||
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% process context
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
%% gen_server callbacks
|
||||
|
||||
init(none) ->
|
||||
tell("starting fd_httpd_webrtc", []),
|
||||
InitState = #s{},
|
||||
{ok, InitState}.
|
||||
|
||||
|
||||
handle_call({join, Username}, _From = {PID, _Tag}, State) ->
|
||||
{Reply, NewState} = do_join(Username, PID, State),
|
||||
{reply, Reply, NewState};
|
||||
handle_call(Unexpected, From, State) ->
|
||||
tell("~tp: unexpected call from ~tp: ~tp", [?MODULE, Unexpected, From]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_cast(Unexpected, State) ->
|
||||
tell("~tp: unexpected cast: ~tp", [?MODULE, Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info({'DOWN', _Ref, process, PID, _Reason}, State) ->
|
||||
NewState = do_down(PID, State),
|
||||
{noreply, NewState};
|
||||
handle_info(Unexpected, State) ->
|
||||
tell("~tp: unexpected info: ~tp", [?MODULE, Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
code_change(_, State, _) ->
|
||||
{ok, State}.
|
||||
|
||||
terminate(_, _) ->
|
||||
ok.
|
||||
|
||||
|
||||
%%---------------------
|
||||
%% doers
|
||||
%%---------------------
|
||||
|
||||
-spec do_join(Username, PID, State) -> {Reply, NewState}
|
||||
when Username :: string(),
|
||||
PID :: pid(),
|
||||
State :: state(),
|
||||
Reply :: {ok, ActualUsername :: string()}
|
||||
| {error, any()},
|
||||
NewState :: state().
|
||||
% @private join a user to a pool
|
||||
|
||||
do_join(Username, PID, State = #s{users = Users}) ->
|
||||
% see if pid is already there
|
||||
case lists:keymember(PID, #u.pid, Users) of
|
||||
true -> {error, already_joined};
|
||||
false -> do_join2(Username, PID, State)
|
||||
end.
|
||||
|
||||
do_join2(Username, PID, State = #s{users = Users}) ->
|
||||
monitor(process, PID),
|
||||
ActualUsername = unique_username(Username, Users),
|
||||
NewUser = #u{pid = PID, username = ActualUsername},
|
||||
NewRoster = usort([NewUser | Users]),
|
||||
NewState = State#s{users = NewRoster},
|
||||
ok = gossip_roster(NewState),
|
||||
{{ok, ActualUsername}, NewState}.
|
||||
|
||||
usort(Users) ->
|
||||
lists:keysort(#u.username, Users).
|
||||
|
||||
unique_username(Username, Users) ->
|
||||
tell("~p unique_username(~p, ~p)", [?MODULE, Username, Users]),
|
||||
case lists:keymember(Username, #u.username, Users) of
|
||||
true -> unique_username(Username, 1, Users);
|
||||
false -> Username
|
||||
end.
|
||||
|
||||
unique_username(Username, N, Users) ->
|
||||
tell("~p unique_username(~p, ~p ~p)", [?MODULE, Username, N, Users]),
|
||||
U = Username ++ integer_to_list(N),
|
||||
case lists:keymember(U, #u.username, Users) of
|
||||
true -> unique_username(Username, N + 1, Users);
|
||||
false -> U
|
||||
end.
|
||||
|
||||
|
||||
|
||||
-spec do_down(PID, State) -> NewState
|
||||
when PID :: pid(),
|
||||
State :: state(),
|
||||
NewState :: state().
|
||||
% @private handle a user dying
|
||||
|
||||
do_down(PID, State = #s{users = Users}) ->
|
||||
% remove from users
|
||||
NewUsers = usort(lists:keydelete(PID, #u.pid, Users)),
|
||||
NewState = State#s{users = NewUsers},
|
||||
% broadcast username
|
||||
ok = gossip_roster(State),
|
||||
NewState.
|
||||
|
||||
|
||||
|
||||
-spec gossip_roster(state()) -> ok.
|
||||
% @private gossip the roster to everyone
|
||||
|
||||
gossip_roster(State = #s{users = Users}) ->
|
||||
Usernames = [Username || #u{username = Username} <- Users],
|
||||
gossip({users, Usernames}, State).
|
||||
|
||||
|
||||
|
||||
-spec gossip(any(), state()) -> ok.
|
||||
% @private gossip a message to everyone
|
||||
|
||||
gossip(Message, #s{users = Users}) ->
|
||||
GossipTo =
|
||||
fun(#u{pid = PID}) ->
|
||||
PID ! {webrtc, Message}
|
||||
end,
|
||||
lists:foreach(GossipTo, Users).
|
||||
Loading…
x
Reference in New Issue
Block a user