diff --git a/priv/static/js/dist/webrtc.d.ts b/priv/static/js/dist/webrtc.d.ts index aab59c5..2bed83b 100644 --- a/priv/static/js/dist/webrtc.d.ts +++ b/priv/static/js/dist/webrtc.d.ts @@ -10,5 +10,7 @@ * @module */ declare function main(): Promise; +type ws_msg = ["username", string] | ["users", Array]; +declare function handle_ws_msg(message: ws_msg, roster_ul: HTMLUListElement, whoami: HTMLHeadingElement): void; declare function handle_join(init: HTMLDivElement, init_name: HTMLInputElement, peers: HTMLDivElement, ws: WebSocket): Promise; declare function ws_send_json(ws: WebSocket, x: any): void; diff --git a/priv/static/js/dist/webrtc.js b/priv/static/js/dist/webrtc.js index d42e869..e9d70a3 100644 --- a/priv/static/js/dist/webrtc.js +++ b/priv/static/js/dist/webrtc.js @@ -16,18 +16,39 @@ async function main() { let ws = new WebSocket('/ws/webrtc'); // grab document elements let init = document.getElementById('init'); - let peers = document.getElementById('peers'); + let roster = document.getElementById('roster'); + let roster_ul = document.getElementById('roster-ul'); + let whoami = document.getElementById('whoami'); let init_name = document.getElementById('init-name'); let init_join = document.getElementById('init-join'); // handle button click init_join.addEventListener('click', function () { - handle_join(init, init_name, peers, ws); + handle_join(init, init_name, roster, ws); }); // handle message from ws ws.onopen = function (e) { console.log('ws open:', e); }; - ws.onclose = function (e) { console.log('ws closed:', e); }; + ws.onclose = function (e) { console.warn('ws closed:', e); }; ws.onerror = function (e) { console.error('ws error:', e); }; - ws.onmessage = function (e) { console.log('ws message', e); }; + ws.onmessage = + function (e) { + // console.log('ws message:', e.data); + let message = JSON.parse(e.data); + handle_ws_msg(message, roster_ul, whoami); + }; +} +function handle_ws_msg(message, roster_ul, whoami) { + switch (message[0]) { + case "username": + whoami.innerText = 'Whoami: ' + message[1]; + break; + case "users": + for (let uname of message[1]) { + let thisli = document.createElement('li'); + thisli.innerText = uname; + roster_ul.appendChild(thisli); + } + break; + } } async function handle_join(init, init_name, peers, ws) { console.log('connecting...'); diff --git a/priv/static/js/dist/webrtc.js.map b/priv/static/js/dist/webrtc.js.map index e8e036a..54edfe3 100644 --- a/priv/static/js/dist/webrtc.js.map +++ b/priv/static/js/dist/webrtc.js.map @@ -1 +1 @@ -{"version":3,"file":"webrtc.js","sourceRoot":"","sources":["../ts/webrtc.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;GAUG;AAEH,IAAI,EAAE,CAAC;AAEP,KAAK,UACL,IAAI;IAIA,8BAA8B;IAC9B,IAAI,EAAE,GAAG,IAAI,SAAS,CAAC,YAAY,CAAC,CAAC;IAErC,yBAAyB;IACzB,IAAI,IAAI,GAAI,QAAQ,CAAC,cAAc,CAAC,MAAM,CAAsB,CAAC;IACjE,IAAI,KAAK,GAAG,QAAQ,CAAC,cAAc,CAAC,OAAO,CAAqB,CAAC;IAEjE,IAAI,SAAS,GAAG,QAAQ,CAAC,cAAc,CAAC,WAAW,CAAqB,CAAC;IACzE,IAAI,SAAS,GAAG,QAAQ,CAAC,cAAc,CAAC,WAAW,CAAsB,CAAC;IAE1E,sBAAsB;IACtB,SAAS,CAAC,gBAAgB,CAAC,OAAO,EAC9B;QACI,WAAW,CAAC,IAAI,EAAE,SAAS,EAAE,KAAK,EAAE,EAAE,CAAC,CAAC;IAC5C,CAAC,CACJ,CAAC;IAEF,yBAAyB;IACzB,EAAE,CAAC,MAAM,GAAM,UAAS,CAAC,IAAI,OAAO,CAAC,GAAG,CAAC,UAAU,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC3D,EAAE,CAAC,OAAO,GAAK,UAAS,CAAC,IAAI,OAAO,CAAC,GAAG,CAAC,YAAY,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC7D,EAAE,CAAC,OAAO,GAAK,UAAS,CAAC,IAAI,OAAO,CAAC,KAAK,CAAC,WAAW,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC9D,EAAE,CAAC,SAAS,GAAG,UAAS,CAAC,IAAI,OAAO,CAAC,GAAG,CAAC,YAAY,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;AACjE,CAAC;AAED,KAAK,UACL,WAAW,CACN,IAA0B,EAC1B,SAA4B,EAC5B,KAA0B,EAC1B,EAAqB;IAGtB,OAAO,CAAC,GAAG,CAAC,eAAe,CAAC,CAAC;IAC7B,IAAI,SAAS,GAAW,SAAS,CAAC,KAAK,CAAC,IAAI,EAAE,CAAC;IAC/C,OAAO,CAAC,GAAG,CAAC,WAAW,EAAE,SAAS,CAAC,CAAC;IAEpC,YAAY,CAAC,EAAE,EAAE,CAAC,UAAU,EAAE,SAAS,CAAC,CAAC,CAAC;IAE1C,IAAI,CAAC,MAAM,GAAG,IAAI,CAAC;IACnB,KAAK,CAAC,MAAM,GAAG,KAAK,CAAC;AACzB,CAAC;AAGD,SACA,YAAY,CACP,EAAc,EACd,CAAQ;IAGT,IAAI,CAAC,GAAW,IAAI,CAAC,SAAS,CAAC,CAAC,EAAE,SAAS,EAAE,CAAC,CAAC,CAAC;IAChD,OAAO,CAAC,GAAG,CAAC,YAAY,EAAE,CAAC,CAAC,CAAC;IAE7B,EAAE,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;AACf,CAAC"} \ No newline at end of file +{"version":3,"file":"webrtc.js","sourceRoot":"","sources":["../ts/webrtc.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;GAUG;AAEH,IAAI,EAAE,CAAC;AAEP,KAAK,UACL,IAAI;IAIA,8BAA8B;IAC9B,IAAI,EAAE,GAAG,IAAI,SAAS,CAAC,YAAY,CAAC,CAAC;IAErC,yBAAyB;IACzB,IAAI,IAAI,GAAQ,QAAQ,CAAC,cAAc,CAAC,MAAM,CAAwB,CAAC;IACvE,IAAI,MAAM,GAAM,QAAQ,CAAC,cAAc,CAAC,QAAQ,CAAsB,CAAC;IACvE,IAAI,SAAS,GAAG,QAAQ,CAAC,cAAc,CAAC,WAAW,CAAqB,CAAC;IACzE,IAAI,MAAM,GAAM,QAAQ,CAAC,cAAc,CAAC,QAAQ,CAA0B,CAAC;IAE3E,IAAI,SAAS,GAAG,QAAQ,CAAC,cAAc,CAAC,WAAW,CAAqB,CAAC;IACzE,IAAI,SAAS,GAAG,QAAQ,CAAC,cAAc,CAAC,WAAW,CAAsB,CAAC;IAE1E,sBAAsB;IACtB,SAAS,CAAC,gBAAgB,CAAC,OAAO,EAC9B;QACI,WAAW,CAAC,IAAI,EAAE,SAAS,EAAE,MAAM,EAAE,EAAE,CAAC,CAAC;IAC7C,CAAC,CACJ,CAAC;IAEF,yBAAyB;IACzB,EAAE,CAAC,MAAM,GAAM,UAAS,CAAC,IAAI,OAAO,CAAC,GAAG,CAAC,UAAU,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC3D,EAAE,CAAC,OAAO,GAAK,UAAS,CAAC,IAAI,OAAO,CAAC,IAAI,CAAC,YAAY,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC9D,EAAE,CAAC,OAAO,GAAK,UAAS,CAAC,IAAI,OAAO,CAAC,KAAK,CAAC,WAAW,EAAE,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;IAC9D,EAAE,CAAC,SAAS;QACR,UAAS,CAAC;YACN,sCAAsC;YACtC,IAAI,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,CAAC,CAAC,IAAI,CAAW,CAAC;YAC3C,aAAa,CAAC,OAAO,EAAE,SAAS,EAAE,MAAM,CAAC,CAAC;QAC9C,CAAC,CAAC;AACV,CAAC;AAKD,SACA,aAAa,CACR,OAAkB,EAClB,SAA4B,EAC5B,MAA8B;IAG/B,QAAO,OAAO,CAAC,CAAC,CAAC,EAAE,CAAC;QAChB,KAAK,UAAU;YACX,MAAM,CAAC,SAAS,GAAG,UAAU,GAAG,OAAO,CAAC,CAAC,CAAC,CAAC;YAC3C,MAAM;QACV,KAAK,OAAO;YACR,KAAK,IAAI,KAAK,IAAI,OAAO,CAAC,CAAC,CAAC,EAAE,CAAC;gBAC3B,IAAI,MAAM,GAAG,QAAQ,CAAC,aAAa,CAAC,IAAI,CAAC,CAAC;gBAC1C,MAAM,CAAC,SAAS,GAAG,KAAK,CAAC;gBACzB,SAAS,CAAC,WAAW,CAAC,MAAM,CAAC,CAAC;YAClC,CAAC;YACD,MAAM;IACd,CAAC;AAEL,CAAC;AAID,KAAK,UACL,WAAW,CACN,IAA0B,EAC1B,SAA4B,EAC5B,KAA0B,EAC1B,EAAqB;IAGtB,OAAO,CAAC,GAAG,CAAC,eAAe,CAAC,CAAC;IAC7B,IAAI,SAAS,GAAW,SAAS,CAAC,KAAK,CAAC,IAAI,EAAE,CAAC;IAC/C,OAAO,CAAC,GAAG,CAAC,WAAW,EAAE,SAAS,CAAC,CAAC;IAEpC,YAAY,CAAC,EAAE,EAAE,CAAC,UAAU,EAAE,SAAS,CAAC,CAAC,CAAC;IAE1C,IAAI,CAAC,MAAM,GAAG,IAAI,CAAC;IACnB,KAAK,CAAC,MAAM,GAAG,KAAK,CAAC;AACzB,CAAC;AAGD,SACA,YAAY,CACP,EAAc,EACd,CAAQ;IAGT,IAAI,CAAC,GAAW,IAAI,CAAC,SAAS,CAAC,CAAC,EAAE,SAAS,EAAE,CAAC,CAAC,CAAC;IAChD,OAAO,CAAC,GAAG,CAAC,YAAY,EAAE,CAAC,CAAC,CAAC;IAE7B,EAAE,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;AACf,CAAC"} \ No newline at end of file diff --git a/priv/static/js/ts/webrtc.ts b/priv/static/js/ts/webrtc.ts index a5c1adb..85a4c5a 100644 --- a/priv/static/js/ts/webrtc.ts +++ b/priv/static/js/ts/webrtc.ts @@ -21,8 +21,10 @@ main let ws = new WebSocket('/ws/webrtc'); // grab document elements - let init = document.getElementById('init') as HTMLDivElement; - let peers = document.getElementById('peers') as HTMLDivElement; + let init = document.getElementById('init') as HTMLDivElement; + let roster = document.getElementById('roster') as HTMLDivElement; + let roster_ul = document.getElementById('roster-ul') as HTMLUListElement; + let whoami = document.getElementById('whoami') as HTMLHeadingElement; let init_name = document.getElementById('init-name') as HTMLInputElement; let init_join = document.getElementById('init-join') as HTMLButtonElement; @@ -30,17 +32,49 @@ main // handle button click init_join.addEventListener('click', function() { - handle_join(init, init_name, peers, ws); + handle_join(init, init_name, roster, ws); } ); // handle message from ws ws.onopen = function(e) { console.log('ws open:', e); }; - ws.onclose = function(e) { console.log('ws closed:', e); }; + ws.onclose = function(e) { console.warn('ws closed:', e); }; ws.onerror = function(e) { console.error('ws error:', e); }; - ws.onmessage = function(e) { console.log('ws message', e); }; + ws.onmessage = + function(e) { + // console.log('ws message:', e.data); + let message = JSON.parse(e.data) as ws_msg; + handle_ws_msg(message, roster_ul, whoami); + }; } +type ws_msg = ["username", string] + | ["users", Array]; + +function +handle_ws_msg + (message : ws_msg, + roster_ul : HTMLUListElement, + whoami : HTMLHeadingElement) + : void +{ + switch(message[0]) { + case "username": + whoami.innerText = 'Whoami: ' + message[1]; + break; + case "users": + for (let uname of message[1]) { + let thisli = document.createElement('li'); + thisli.innerText = uname; + roster_ul.appendChild(thisli); + } + break; + } + +} + + + async function handle_join (init : HTMLDivElement, diff --git a/priv/static/webrtc.html b/priv/static/webrtc.html index fd9603e..4e02488 100644 --- a/priv/static/webrtc.html +++ b/priv/static/webrtc.html @@ -22,10 +22,12 @@ - diff --git a/src/fd_httpd.erl b/src/fd_httpd.erl index 7d2c3b6..fbf4bae 100644 --- a/src/fd_httpd.erl +++ b/src/fd_httpd.erl @@ -23,6 +23,12 @@ start_link() -> init([]) -> RestartStrategy = {one_for_one, 1, 60}, + WebRTC = {fd_httpd_webrtc, + {fd_httpd_webrtc, start_link, []}, + permanent, + 5000, + worker, + [fd_httpd_webrtc]}, FileCache = {fd_httpd_sfc, {fd_httpd_sfc, start_link, []}, permanent, @@ -35,5 +41,6 @@ init([]) -> 5000, supervisor, [fd_httpd_clients]}, - Children = [FileCache, Clients], + Children = [WebRTC, FileCache, Clients], + %Children = [FileCache, Clients], {ok, {RestartStrategy, Children}}. diff --git a/src/fd_httpd_client.erl b/src/fd_httpd_client.erl index 87550b8..e1bdd76 100644 --- a/src/fd_httpd_client.erl +++ b/src/fd_httpd_client.erl @@ -135,8 +135,16 @@ loop(Parent, Debug, State = #s{socket = Socket, next = Next0}) -> Received = <>, case qhl:parse(Socket, Received) of {ok, Req, Next1} -> - Next2 = handle_request(Socket, Req, Next1), - NewState = State#s{next = Next2}, + %% FIXME: unfuck received logic here + %% handle_request should eventually call back into + %% loop/3 or close the socket + Next2 = + case Next1 of + none -> <<>>; + Bin -> Bin + end, + Next3 = handle_request(Socket, Req, Next2), + NewState = State#s{next = Next3}, loop(Parent, Debug, NewState); Error -> %% should trigger bad request @@ -233,7 +241,7 @@ handle_request(Sock, R = #request{method = M, path = P}, Received) when M =/= un route(Sock, get, Route, Request, Received) -> case Route of <<"/ws/echo">> -> ws_echo(Sock, Request) , Received; - <<"/ws/webrtc">> -> ws_webrtc(Sock, Request) , Received; + <<"/ws/webrtc">> -> ws_webrtc(Sock, Request, Received) , Received; <<"/">> -> route_static(Sock, <<"/index.html">>) , Received; _ -> route_static(Sock, Route) , Received end; @@ -284,13 +292,19 @@ respond_static(Sock, not_found) -> %% webrtc %% ------------------------------ +-record(rs, + {socket :: gen_tcp:socket(), + received = <<>> :: binary(), + username = undefined :: undefined | string()}). -ws_webrtc(Sock, Request) -> +-type webrtc_state() :: #rs{}. + +ws_webrtc(Sock, Request, Received) -> try case qhl_ws:handshake(Request) of {ok, Response} -> fd_httpd_utils:respond(Sock, Response), - ws_webrtc_loop(Sock); + ws_webrtc_loop(#rs{socket = Sock, received = Received}); Error -> tell("ws_webrtc: error: ~tp", [Error]), fd_httpd_utils:http_err(Sock, 400) @@ -301,12 +315,79 @@ ws_webrtc(Sock, Request) -> fd_httpd_utils:http_err(Sock, 500) end. --record(rs, - {ident = username = undefined :: undefined | string(), - peers = undefined :: undefined | [ --type webrtc_state() :: {username -ws_webrtc_loop( +-define(WEBRTC_TIMEOUT, 30*qhl_ws:min()). + + + +-spec ws_webrtc_loop(webrtc_state()) -> no_return(). + +%% first thing is to get username +ws_webrtc_loop(State = #rs{socket = Socket, + received = Recv, + username = undefined}) -> + {ok, ["username", Username], NewRecv} = + qhl_ws:recv_json(Socket, Recv, ?WEBRTC_TIMEOUT), + tell("~p ws_webrtc_loop: request username: ~p", [self(), Username]), + {ok, ActualUsername} = fd_httpd_webrtc:join(Username), + ok = qhl_ws:send_json(Socket, ["username", ActualUsername]), + NewState = State#rs{received = NewRecv, + username = ActualUsername}, + ws_webrtc_loop(NewState); +% we have no tcp bytes waiting to be parsed +ws_webrtc_loop(State = #rs{socket = Socket, + received = <<>>}) -> + ok = inet:setopts(Socket, [{active, once}]), + receive + {tcp, Socket, Message} -> + NewState = State#rs{received = Message}, + ws_webrtc_loop(NewState); + {webrtc, Message} -> + NewState = ws_webrtc_handle_webrtc(Message, State), + ws_webrtc_loop(NewState); + {tcp_closed, Socket} -> + ok = tell("~p Socket closed, retiring.~n", [self()]), + exit(normal) + end; +% we have TCP bytes sitting and waiting to be parsed +ws_webrtc_loop(State = #rs{socket = Socket, + received = R}) -> + {ok, Message, NewR} = qhl_ws:recv_json(Socket, R, 5000), + NewState = ws_webrtc_handle_json(Message, State#rs{received = NewR}), + ws_webrtc_loop(NewState). + + + +-spec ws_webrtc_handle_webrtc(Message, State) -> NewState + when Message :: any(), + State :: webrtc_state(), + NewState :: webrtc_state(). +% @private handle a message from the server + +ws_webrtc_handle_webrtc(Message, State = #rs{socket = Sock}) -> + tell("~p received message from webrtc: ~p", [self(), Message]), + NewState = + case Message of + {users, Users} -> + ok = qhl_ws:send_json(Sock, ["users", Users]), + State; + _ -> + tell("~p unknown webrtc message: ~p", [self, Message]) + end, + NewState. + + + +-spec ws_webrtc_handle_json(Message, State) -> NewState + when Message :: zj:value(), + State :: webrtc_state(), + NewState :: webrtc_state(). +% @private handle a message from the client + +ws_webrtc_handle_json(Message, State) -> + tell("~p received json message from client: ~p", [self(), Message]), + State. + %% ------------------------------ %% echo diff --git a/src/fd_httpd_webrtc.erl b/src/fd_httpd_webrtc.erl new file mode 100644 index 0000000..2b5001a --- /dev/null +++ b/src/fd_httpd_webrtc.erl @@ -0,0 +1,174 @@ +% @doc webrtc peer pool +-module(fd_httpd_webrtc). + +-behavior(gen_server). + +%% api (caller context) +-export([ + join/1 +]). + +% startup/gen_server callbacks +-export([ + % caller context + start_link/0, + % process context + init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2 +]). + +-include("$zx_include/zx_logger.hrl"). + + +-record(u, + {pid :: pid(), + username :: string()}). + +-type user() :: #u{}. + +-record(s, + {users = [] :: [user()]}). + +-type state() :: #s{}. + + +%%----------------------------------------------------------------------------- +%% caller context +%%----------------------------------------------------------------------------- + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). + + + +-spec join(Username :: string()) -> {ok, ActualUsername :: string()} | {error, any()}. + +join(Username) -> + gen_server:call(?MODULE, {join, Username}). + + + +%%----------------------------------------------------------------------------- +%% process context +%%----------------------------------------------------------------------------- + +%% gen_server callbacks + +init(none) -> + tell("starting fd_httpd_webrtc", []), + InitState = #s{}, + {ok, InitState}. + + +handle_call({join, Username}, _From = {PID, _Tag}, State) -> + {Reply, NewState} = do_join(Username, PID, State), + {reply, Reply, NewState}; +handle_call(Unexpected, From, State) -> + tell("~tp: unexpected call from ~tp: ~tp", [?MODULE, Unexpected, From]), + {noreply, State}. + + +handle_cast(Unexpected, State) -> + tell("~tp: unexpected cast: ~tp", [?MODULE, Unexpected]), + {noreply, State}. + + +handle_info({'DOWN', _Ref, process, PID, _Reason}, State) -> + NewState = do_down(PID, State), + {noreply, NewState}; +handle_info(Unexpected, State) -> + tell("~tp: unexpected info: ~tp", [?MODULE, Unexpected]), + {noreply, State}. + + +code_change(_, State, _) -> + {ok, State}. + +terminate(_, _) -> + ok. + + +%%--------------------- +%% doers +%%--------------------- + +-spec do_join(Username, PID, State) -> {Reply, NewState} + when Username :: string(), + PID :: pid(), + State :: state(), + Reply :: {ok, ActualUsername :: string()} + | {error, any()}, + NewState :: state(). +% @private join a user to a pool + +do_join(Username, PID, State = #s{users = Users}) -> + % see if pid is already there + case lists:keymember(PID, #u.pid, Users) of + true -> {error, already_joined}; + false -> do_join2(Username, PID, State) + end. + +do_join2(Username, PID, State = #s{users = Users}) -> + monitor(process, PID), + ActualUsername = unique_username(Username, Users), + NewUser = #u{pid = PID, username = ActualUsername}, + NewRoster = usort([NewUser | Users]), + NewState = State#s{users = NewRoster}, + ok = gossip_roster(NewState), + {{ok, ActualUsername}, NewState}. + +usort(Users) -> + lists:keysort(#u.username, Users). + +unique_username(Username, Users) -> + tell("~p unique_username(~p, ~p)", [?MODULE, Username, Users]), + case lists:keymember(Username, #u.username, Users) of + true -> unique_username(Username, 1, Users); + false -> Username + end. + +unique_username(Username, N, Users) -> + tell("~p unique_username(~p, ~p ~p)", [?MODULE, Username, N, Users]), + U = Username ++ integer_to_list(N), + case lists:keymember(U, #u.username, Users) of + true -> unique_username(Username, N + 1, Users); + false -> U + end. + + + +-spec do_down(PID, State) -> NewState + when PID :: pid(), + State :: state(), + NewState :: state(). +% @private handle a user dying + +do_down(PID, State = #s{users = Users}) -> + % remove from users + NewUsers = usort(lists:keydelete(PID, #u.pid, Users)), + NewState = State#s{users = NewUsers}, + % broadcast username + ok = gossip_roster(State), + NewState. + + + +-spec gossip_roster(state()) -> ok. +% @private gossip the roster to everyone + +gossip_roster(State = #s{users = Users}) -> + Usernames = [Username || #u{username = Username} <- Users], + gossip({users, Usernames}, State). + + + +-spec gossip(any(), state()) -> ok. +% @private gossip a message to everyone + +gossip(Message, #s{users = Users}) -> + GossipTo = + fun(#u{pid = PID}) -> + PID ! {webrtc, Message} + end, + lists:foreach(GossipTo, Users). diff --git a/src/qhl_ws.erl b/src/qhl_ws.erl index f14ab86..2b67bce 100644 --- a/src/qhl_ws.erl +++ b/src/qhl_ws.erl @@ -16,7 +16,9 @@ %% porcelain handshake/1, recv/3, recv/4, - send/2 + recv_strict/3, recv_json/3, + send/2, + send_dwim/2, send_json/2 ]). -include("http.hrl"). @@ -259,6 +261,54 @@ response_token(ChallengeToken) when is_binary(ChallengeToken) -> base64:encode(Sha1). +-spec recv_json(Socket, Received, TimeoutMS) -> Result + when Socket :: gen_tcp:socket(), + Received :: binary(), + TimeoutMS :: non_neg_integer(), + Result :: {ok, zj:value(), Remainder} + | {error, Reason}, + Remainder :: binary(), + Reason :: any(). +% @doc +% asserts response is text + +recv_json(Sock, Recv, TimeoutMS) -> + case recv_strict(Sock, Recv, TimeoutMS) of + {ok, {Type, Payload}, NewRecv} when Type =:= text orelse + Type =:= binary -> + case zj:decode(Payload) of + {ok, Value} -> + io:format("~p value: ~p~n", [self(), Value]), + {ok, Value, NewRecv}; + Error -> {error, {zj_decode, Error}} + end; + Error -> + Error + end. + + +-spec recv_strict(Socket, Received, TimeoutMS) -> Result + when Socket :: gen_tcp:socket(), + Received :: binary(), + TimeoutMS :: non_neg_integer(), + Result :: {ok, Message, Remainder} + | {error, Reason}, + Message :: ws_msg(), + Remainder :: binary(), + Reason :: any(). +% @doc +% Almost equivalent to recv/3, but asserts resulting frames are empty + +recv_strict(Sock, Recv, TimeoutMS) -> + case recv(Sock, Recv, TimeoutMS) of + {ok, Message, [], Remainder} -> + {ok, Message, Remainder}; + Illegal = {ok, _, _NonEmptyFrames, _} -> + {error, {bad_frame_stack, Illegal}}; + Error -> + Error + end. + -spec recv(Socket, Received, TimeoutMS) -> Result when Socket :: gen_tcp:socket(), @@ -604,10 +654,15 @@ recv_frame(Frame = #frame{payload_length = Len, payload = none}, Sock, Received, %% factoring this out into a function to reduce repetition recv_frame_await(Frame, Sock, Received, Timeout) -> + io:format("~p called: recv_frame_await(~p, ~p, ~p, ~p)~n", + [self(), Frame, Sock, Received, Timeout]), case inet:setopts(Sock, [{active, once}]) of ok -> receive - {tcp, Sock, Bin} -> recv_frame(Frame, Sock, <>, Timeout); + {tcp, Sock, Bin} -> + io:format("~p calling: recv_frame(~p, ~p, ~p, ~p)~n", + [self(),Frame, Sock, <>, Timeout]), + recv_frame(Frame, Sock, <>, Timeout); {tcp_closed, Sock} -> {error, tcp_closed}; {tcp_error, Sock, Reason} -> {error, {tcp_error, Reason}} after Timeout -> @@ -618,6 +673,34 @@ recv_frame_await(Frame, Sock, Received, Timeout) -> end. +send_json(Socket, X) -> + send_dwim(Socket, {json, X}). + + +-spec send_dwim(Socket, Message) -> Result + when Socket :: gen_tcp:socket(), + Message :: string() + | binary() + | {json, zj:value()} + | ws_msg(), + Result :: ok + | {error, Reason}, + Reason :: any(). +% @doc +% equivalent to send/2 but assumes iolists/strings are meant to be `text`, and +% naked binaries are meant to be `binary` +% +% lists are assumed to be unicode iolists and are converted to strings via +% unicode:characters_to_list +% +% json is encoded as text and sent as such +% @end + +send_dwim(Socket, X) when is_list(X) -> send(Socket, {text, X}); +send_dwim(Socket, X) when is_binary(X) -> send(Socket, {binary, X}); +send_dwim(Socket, {json, X}) -> send(Socket, {text, zj:encode(X)}); +send_dwim(Socket, {Type, Payload}) -> send(Socket, {Type, Payload}). + -spec send(Socket, Message) -> Result when Socket :: gen_tcp:socket(),