diff --git a/src/msp_channel.erl b/src/msp_channel.erl index d340503..7bedf94 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -125,6 +125,7 @@ handle_call(Unexpected, From, State) -> handle_cast({configure_channel, Id, Socket, Peer}, none) -> + log(debug, "Opening channel ~p~n", [Id]), State = #s{chan_id = Id, socket = Socket, peer = Peer}, @@ -170,7 +171,7 @@ do_send(Message, Mod, State = #s{chan_id = Id, peer = Peer, socket = Sock, seq = Seq}) -> - io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), + log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), IsNewChannel = ChanMode == closed, {Op, NewChanMode} = choose_opcode(IsNewChannel, Mod), Datagram = <>, @@ -203,12 +204,12 @@ do_handle_datagram(State, <>) -> % did get through and how it was interpreted. do_handle_ack_cancel(State, Data); do_handle_datagram(State, <>) -> - io:format("Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), + log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), % Reliable stream operations need to be ordered first. NewState = do_buffer_reliable(State, Op, Seq, Rest), do_execute_buffer(NewState); do_handle_datagram(State, Unexpected) -> - io:format("Got datagram ~p which is too short.~n", [Unexpected]), + log(warning, "Got datagram ~p which is too short.~n", [Unexpected]), State. do_handle_lossy(State, Data) -> diff --git a/src/msp_channel_man.erl b/src/msp_channel_man.erl index 681fe0c..e10b85f 100644 --- a/src/msp_channel_man.erl +++ b/src/msp_channel_man.erl @@ -9,7 +9,7 @@ -copyright("Jarvis Carroll "). -license("MIT"). --export([add_route/4, create_channel/3, dispatch_fallback/5]). +-export([add_route/4, create_channel/3, dispatch_fallback/4]). %% Worker interface -export([enroll/0]). %% gen_server @@ -31,6 +31,7 @@ -record(route_info, {router :: pid(), socket :: gen_udp:socket(), + side :: 0 | 1, unused_channels :: [integer()], used_channels :: #{integer() => #channel{}}}). @@ -41,14 +42,37 @@ %%% Service Interface +add_route(Sock, TheirIP, TheirPort, Side) -> + case add_passive_route(Sock, TheirIP, TheirPort, Side) of + ok -> + transfer_socket(Sock, TheirIP, TheirPort); + {error, Reason} -> + {error, Reason} + end. + +add_passive_route(Sock, TheirIP, TheirPort, Side) -> + log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]), + gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}). + % A pretty complex function for an interface, but we need to transfer ownership % of the socket. -add_route(Sock, TheirIP, TheirPort, Side) -> - % Transfer the socket to the manager. We will then transfer it on to the - % router, once it is spawned. +transfer_socket(Sock, TheirIP, TheirPort) -> case gen_udp:controlling_process(Sock, whereis(?MODULE)) of ok -> - gen_server:cast(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}); + transfer_socket2(Sock, {TheirIP, TheirPort}); + {error, Reason} -> + {error, Reason} + end. + +transfer_socket2(Sock, Peer) -> + % Since it was transferred, any backlog of datagrams will now be in the + % channel manager's inbox. Turn it off from active, so that the manager + % knows that it will be fully caught up *before* the socket_transferred + % notification arrives. + case inet:setopts(Sock, [{active, false}]) of + ok -> + % Now the socket is all cleaned up, let the manager know. + gen_server:call(?MODULE, {socket_transferred, Sock, Peer}); {error, Reason} -> {error, Reason} end. @@ -59,9 +83,8 @@ create_channel(OurPort, TheirIP, TheirPort) -> % Deliver messages on behalf of msp_router, if it doesn't know where to send % something. -dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) -> - Route = #route{from = OurPort, to = {TheirIP, TheirPort}}, - gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}). +dispatch_fallback(Socket, Peer, ID, Packet) -> + gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}). %%% Worker Interface @@ -89,6 +112,12 @@ init(none) -> {ok, State}. +handle_call({add_route, Sock, Route, Side}, _, State) -> + {Result, NewState} = do_add_route(Sock, Route, Side, State), + {reply, Result, NewState}; +handle_call({socket_transferred, Sock, Peer}, _, State) -> + Result = do_socket_transferred(Sock, Peer, State), + {reply, Result, State}; handle_call({create_channel, Route}, _, State) -> {Result, NewState} = do_create_channel(State, Route), {reply, Result, NewState}; @@ -97,11 +126,8 @@ handle_call(Unexpected, From, State) -> {noreply, State}. -handle_cast({add_route, Sock, Route, Side}, State) -> - NewState = do_add_route(Sock, Route, Side, State), - {noreply, NewState}; -handle_cast({dispatch, Route, ID, Packet}, State) -> - NewState = do_dispatch(Route, ID, Packet, State), +handle_cast({dispatch, Socket, Peer, ID, Packet}, State) -> + NewState = do_dispatch(Socket, Peer, ID, Packet, State), {noreply, NewState}; handle_cast({enroll, PID}, State) -> NewState = do_enroll(PID, State), @@ -114,10 +140,23 @@ handle_cast(Unexpected, State) -> handle_info({'DOWN', Mon, process, PID, Reason}, State) -> NewState = handle_down(Mon, PID, Reason, State), {noreply, NewState}; +handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) -> + NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State), + {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. +handle_udp(Socket, Peer, <>, State) -> + do_dispatch(Socket, Peer, ID, Packet, State); +handle_udp(Socket, Peer, [ID | PacketList], State) -> + Packet = list_to_binary(PacketList), + do_dispatch(Socket, Peer, ID, Packet, State); +handle_udp(_, _, <<>>, State) -> + % Empty datagram... Ignore. + State; +handle_udp(_, _, [], State) -> + State. handle_down(Mon, PID, Reason, State) -> % TODO @@ -161,17 +200,53 @@ do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) -> {ok, OurPort} = inet:port(Sock), Route = #route{from = OurPort, to = Peer}, % Check that this route isn't registered already... - % TODO: Maybe detect this case before we transfer the socket? It's all - % annoying though. - false = maps:is_key(Route, Conns), + case maps:is_key(Route, Conns) of + true -> + Result = {error, already_added}, + {Result, State}; + false -> + do_add_route2(Sock, Route, Side, State) + end. + +do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) -> {ok, Router} = msp_router:start_link(), - ok = msp_router:begin_routing(Router, Sock, Peer, Side), NewConn = #route_info{router = Router, socket = Sock, + side = Side, unused_channels = lists:seq(0, 255), used_channels = #{}}, NewConns = maps:put(Route, NewConn, Conns), - State#s{connections = NewConns}. + NewState = State#s{connections = NewConns}, + {ok, NewState}. + +do_socket_transferred(Sock, Peer, #s{connections = Conns}) -> + {ok, OurPort} = inet:port(Sock), + Route = #route{from = OurPort, to = Peer}, + % Check that this route is actually registered. + case maps:find(Route, Conns) of + {ok, Conn} -> + do_socket_transferred2(Sock, Peer, Conn); + error -> + {error, unknown_connection} + end. + +do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels}) +-> + msp_router:begin_routing(Router, Sock, Peer), + channel_catchup(Router, maps:iterator(Channels)); +do_socket_transferred2(_, _, _) -> + {error, wrong_socket}. + +% If we have created channels without spawning the router process, then at some +% point we will need to tell the router process what channels currently exist. +channel_catchup(Router, Iter) -> + case maps:next(Iter) of + {ChanID, #channel{chan_pid = Chan}, NewIter} -> + msp_router:add_channel(Router, ChanID, Chan), + channel_catchup(Router, NewIter); + none -> + ok + end. do_create_channel(State = #s{connections = Conns}, Route) -> case maps:find(Route, Conns) of @@ -209,11 +284,18 @@ do_create_channel3(State, Route, Info, ChanID) -> {Result, NewState}. -do_dispatch(Route, ID, Packet, State) -> +do_dispatch(Socket, Peer, ID, Packet, State) -> + {ok, OurPort} = inet:port(Socket), + Route = #route{from = OurPort, to = Peer}, + do_dispatch2(Route, ID, Packet, State). + +do_dispatch2(Route, ID, Packet, State) -> case maps:find(Route, State#s.connections) of {ok, Info} -> do_dispatch2(Route, ID, Packet, State, Info); error -> + #route{from = From, to = {{X1, X2, X3, X4}, To}} = Route, + ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]), State end. diff --git a/src/msp_router.erl b/src/msp_router.erl index 6a9f4f5..9f7f477 100644 --- a/src/msp_router.erl +++ b/src/msp_router.erl @@ -9,7 +9,7 @@ -copyright("Jarvis Carroll "). -license("MIT"). --export([begin_routing/4, add_channel/3]). +-export([begin_routing/3, add_channel/3]). %% gen_server -export([start_link/0]). @@ -28,7 +28,6 @@ -record(s, {socket :: gen_udp:socket(), peer :: endpoint(), - side :: 0 | 1, connections = #{} :: #{integer() => channel()}}). -type state() :: none | #s{}. @@ -38,13 +37,13 @@ % msp does not negotiate new connections, but once you have a % socket, and a host that knows you will be talking MSP, then a % new msp_router can be initialized. -begin_routing(Router, OurSock, {TheirIP, TheirPort}, OurSide) -> +begin_routing(Router, OurSock, {TheirIP, TheirPort}) -> % Transfer the socket to the gen_server. If it is % active then a bunch of messages will be received % at once, but that is fine. case gen_udp:controlling_process(OurSock, Router) of ok -> - gen_server:cast(Router, {begin_routing, OurSock, {TheirIP, TheirPort}, OurSide}); + gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}}); {error, Reason} -> {error, Reason} end. @@ -67,14 +66,17 @@ init(none) -> {ok, none}. +handle_call({begin_routing, Sock, Peer}, _, none) -> + {Result, State} = do_begin_routing(Sock, Peer), + {reply, Result, State}; +handle_call({begin_routing, _, _, _}, _, State = #s{}) -> + Result = {error, already_routing}, + {reply, Result, State}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), {reply, undef, State}. -handle_cast({begin_routing, Sock, Peer, Side}, none) -> - State = do_begin_routing(Sock, Peer, Side), - {noreply, State}; handle_cast({add_channel, ChanID, ChanPID}, State) -> NewState = do_add_channel(ChanID, ChanPID, State), {noreply, NewState}; @@ -84,11 +86,11 @@ handle_cast(Unexpected, State) -> handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) -> - io:format("Got packet: ~p~n", [Packet]), + log(debug, "Got packet: ~p~n", [Packet]), do_dispatch(State, Packet), {noreply, State}; handle_info(Unexpected, State) -> - ok = io:format("Unexpected info: ~tp~n", [Unexpected]), + ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]), {noreply, State}. @@ -111,13 +113,14 @@ terminate(_, _) -> %%% Doer Functions -do_begin_routing(Sock, Peer, Side) -> +do_begin_routing(Sock, Peer) -> ok = inet:setopts(Sock, [{active, once}, {mode, binary}]), - State = #s{socket = Sock, - peer = Peer, - side = Side}, - State. + State = #s{socket = Sock, peer = Peer}, + {ok, State}. +do_add_channel(_, _, none) -> + % Do nothing... We will need to be told about these channels again later. + none; do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) -> NewConns = maps:put(ChanID, ChanPID, Conns), State#s{connections = NewConns}. @@ -128,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, < erlang:send(Conn, {msp_fragment, Packet}); error -> - {ok, OurPort} = inet:port(Sock), - {TheirIP, TheirPort} = Peer, - msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) + msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet) end; do_dispatch(_, <<>>) -> % Empty datagram? diff --git a/src/msp_tests.erl b/src/msp_tests.erl index f6f0855..e8e455d 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -22,8 +22,8 @@ run_tests_protected() -> run_tests() -> %ok = send_test(), - ok = out_of_order_test(), - %ok = race_test(), + %ok = out_of_order_test(), + ok = race_test(), ok. make_connection(OurPort, TheirIP, TheirPort, Side) ->