diff --git a/src/fd_httpd.erl b/src/fd_httpd.erl index 7d2c3b6..fbf4bae 100644 --- a/src/fd_httpd.erl +++ b/src/fd_httpd.erl @@ -23,6 +23,12 @@ start_link() -> init([]) -> RestartStrategy = {one_for_one, 1, 60}, + WebRTC = {fd_httpd_webrtc, + {fd_httpd_webrtc, start_link, []}, + permanent, + 5000, + worker, + [fd_httpd_webrtc]}, FileCache = {fd_httpd_sfc, {fd_httpd_sfc, start_link, []}, permanent, @@ -35,5 +41,6 @@ init([]) -> 5000, supervisor, [fd_httpd_clients]}, - Children = [FileCache, Clients], + Children = [WebRTC, FileCache, Clients], + %Children = [FileCache, Clients], {ok, {RestartStrategy, Children}}. diff --git a/src/fd_httpd_client.erl b/src/fd_httpd_client.erl index 39b28b0..e1bdd76 100644 --- a/src/fd_httpd_client.erl +++ b/src/fd_httpd_client.erl @@ -293,9 +293,9 @@ respond_static(Sock, not_found) -> %% ------------------------------ -record(rs, - {received = <<>> :: binary(), - username = undefined :: undefined | string(), - peers = undefined :: undefined | [string()]}). + {socket :: gen_tcp:socket(), + received = <<>> :: binary(), + username = undefined :: undefined | string()}). -type webrtc_state() :: #rs{}. @@ -304,7 +304,7 @@ ws_webrtc(Sock, Request, Received) -> case qhl_ws:handshake(Request) of {ok, Response} -> fd_httpd_utils:respond(Sock, Response), - ws_webrtc_loop(Sock, #rs{received = Received}); + ws_webrtc_loop(#rs{socket = Sock, received = Received}); Error -> tell("ws_webrtc: error: ~tp", [Error]), fd_httpd_utils:http_err(Sock, 400) @@ -318,20 +318,75 @@ ws_webrtc(Sock, Request, Received) -> -define(WEBRTC_TIMEOUT, 30*qhl_ws:min()). + + +-spec ws_webrtc_loop(webrtc_state()) -> no_return(). + %% first thing is to get username -ws_webrtc_loop(Socket, State = #rs{received = Recv, - username = undefined}) -> - %Foo = qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT), - %tell("~p recv_json: ~p", [self(), Foo]), - %error(ur_mom); - {ok, ["username", Username], NewRecv} = qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT), +ws_webrtc_loop(State = #rs{socket = Socket, + received = Recv, + username = undefined}) -> + {ok, ["username", Username], NewRecv} = + qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT), tell("~p ws_webrtc_loop: request username: ~p", [self(), Username]), + {ok, ActualUsername} = fd_httpd_webrtc:join(Username), + ok = qhl_ws:send_json(Socket, ["username", ActualUsername]), NewState = State#rs{received = NewRecv, - username = Username}, - ws_webrtc_loop(Socket, NewState); -ws_webrtc_loop(Socket, State = _) -> - tell("~p ws_webrtc_loop nyi state: ~p", [self(), State]), - error(nyi). + username = ActualUsername}, + ws_webrtc_loop(NewState); +% we have no tcp bytes waiting to be parsed +ws_webrtc_loop(State = #rs{socket = Socket, + received = <<>>}) -> + ok = inet:setopts(Socket, [{active, once}]), + receive + {tcp, Socket, Message} -> + NewState = State#rs{received = Message}, + ws_webrtc_loop(NewState); + {webrtc, Message} -> + NewState = ws_webrtc_handle_webrtc(Message, State), + ws_webrtc_loop(NewState); + {tcp_closed, Socket} -> + ok = tell("~p Socket closed, retiring.~n", [self()]), + exit(normal) + end; +% we have TCP bytes sitting and waiting to be parsed +ws_webrtc_loop(State = #rs{socket = Socket, + received = R}) -> + {ok, Message, NewR} = qhl_ws:recv_json(Socket, R, 5000), + NewState = ws_webrtc_handle_json(Message, State#rs{received = NewR}), + ws_webrtc_loop(NewState). + + + +-spec ws_webrtc_handle_webrtc(Message, State) -> NewState + when Message :: any(), + State :: webrtc_state(), + NewState :: webrtc_state(). +% @private handle a message from the server + +ws_webrtc_handle_webrtc(Message, State = #rs{socket = Sock}) -> + tell("~p received message from webrtc: ~p", [self(), Message]), + NewState = + case Message of + {users, Users} -> + ok = qhl_ws:send_json(Sock, ["users", Users]), + State; + _ -> + tell("~p unknown webrtc message: ~p", [self, Message]) + end, + NewState. + + + +-spec ws_webrtc_handle_json(Message, State) -> NewState + when Message :: zj:value(), + State :: webrtc_state(), + NewState :: webrtc_state(). +% @private handle a message from the client + +ws_webrtc_handle_json(Message, State) -> + tell("~p received json message from client: ~p", [self(), Message]), + State. %% ------------------------------ diff --git a/src/fd_httpd_webrtc.erl b/src/fd_httpd_webrtc.erl new file mode 100644 index 0000000..2b5001a --- /dev/null +++ b/src/fd_httpd_webrtc.erl @@ -0,0 +1,174 @@ +% @doc webrtc peer pool +-module(fd_httpd_webrtc). + +-behavior(gen_server). + +%% api (caller context) +-export([ + join/1 +]). + +% startup/gen_server callbacks +-export([ + % caller context + start_link/0, + % process context + init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2 +]). + +-include("$zx_include/zx_logger.hrl"). + + +-record(u, + {pid :: pid(), + username :: string()}). + +-type user() :: #u{}. + +-record(s, + {users = [] :: [user()]}). + +-type state() :: #s{}. + + +%%----------------------------------------------------------------------------- +%% caller context +%%----------------------------------------------------------------------------- + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). + + + +-spec join(Username :: string()) -> {ok, ActualUsername :: string()} | {error, any()}. + +join(Username) -> + gen_server:call(?MODULE, {join, Username}). + + + +%%----------------------------------------------------------------------------- +%% process context +%%----------------------------------------------------------------------------- + +%% gen_server callbacks + +init(none) -> + tell("starting fd_httpd_webrtc", []), + InitState = #s{}, + {ok, InitState}. + + +handle_call({join, Username}, _From = {PID, _Tag}, State) -> + {Reply, NewState} = do_join(Username, PID, State), + {reply, Reply, NewState}; +handle_call(Unexpected, From, State) -> + tell("~tp: unexpected call from ~tp: ~tp", [?MODULE, Unexpected, From]), + {noreply, State}. + + +handle_cast(Unexpected, State) -> + tell("~tp: unexpected cast: ~tp", [?MODULE, Unexpected]), + {noreply, State}. + + +handle_info({'DOWN', _Ref, process, PID, _Reason}, State) -> + NewState = do_down(PID, State), + {noreply, NewState}; +handle_info(Unexpected, State) -> + tell("~tp: unexpected info: ~tp", [?MODULE, Unexpected]), + {noreply, State}. + + +code_change(_, State, _) -> + {ok, State}. + +terminate(_, _) -> + ok. + + +%%--------------------- +%% doers +%%--------------------- + +-spec do_join(Username, PID, State) -> {Reply, NewState} + when Username :: string(), + PID :: pid(), + State :: state(), + Reply :: {ok, ActualUsername :: string()} + | {error, any()}, + NewState :: state(). +% @private join a user to a pool + +do_join(Username, PID, State = #s{users = Users}) -> + % see if pid is already there + case lists:keymember(PID, #u.pid, Users) of + true -> {error, already_joined}; + false -> do_join2(Username, PID, State) + end. + +do_join2(Username, PID, State = #s{users = Users}) -> + monitor(process, PID), + ActualUsername = unique_username(Username, Users), + NewUser = #u{pid = PID, username = ActualUsername}, + NewRoster = usort([NewUser | Users]), + NewState = State#s{users = NewRoster}, + ok = gossip_roster(NewState), + {{ok, ActualUsername}, NewState}. + +usort(Users) -> + lists:keysort(#u.username, Users). + +unique_username(Username, Users) -> + tell("~p unique_username(~p, ~p)", [?MODULE, Username, Users]), + case lists:keymember(Username, #u.username, Users) of + true -> unique_username(Username, 1, Users); + false -> Username + end. + +unique_username(Username, N, Users) -> + tell("~p unique_username(~p, ~p ~p)", [?MODULE, Username, N, Users]), + U = Username ++ integer_to_list(N), + case lists:keymember(U, #u.username, Users) of + true -> unique_username(Username, N + 1, Users); + false -> U + end. + + + +-spec do_down(PID, State) -> NewState + when PID :: pid(), + State :: state(), + NewState :: state(). +% @private handle a user dying + +do_down(PID, State = #s{users = Users}) -> + % remove from users + NewUsers = usort(lists:keydelete(PID, #u.pid, Users)), + NewState = State#s{users = NewUsers}, + % broadcast username + ok = gossip_roster(State), + NewState. + + + +-spec gossip_roster(state()) -> ok. +% @private gossip the roster to everyone + +gossip_roster(State = #s{users = Users}) -> + Usernames = [Username || #u{username = Username} <- Users], + gossip({users, Usernames}, State). + + + +-spec gossip(any(), state()) -> ok. +% @private gossip a message to everyone + +gossip(Message, #s{users = Users}) -> + GossipTo = + fun(#u{pid = PID}) -> + PID ! {webrtc, Message} + end, + lists:foreach(GossipTo, Users).