From 78ea3fc85e39b0fad51bfc42ab927f00b96fad94 Mon Sep 17 00:00:00 2001 From: Jarvis Carroll Date: Wed, 29 Oct 2025 11:56:44 +0000 Subject: [PATCH] handle socket race conditions better Now the channle manager takes full responsibility for dispatch as soon as it is initialised, and only passes that responsibility on to the router process once things have been cleaned up. What a complex refactor just to pass a socket to a new process! --- src/msp_channel.erl | 7 ++- src/msp_channel_man.erl | 120 +++++++++++++++++++++++++++++++++------- src/msp_router.erl | 35 ++++++------ src/msp_tests.erl | 4 +- 4 files changed, 125 insertions(+), 41 deletions(-) diff --git a/src/msp_channel.erl b/src/msp_channel.erl index d340503..7bedf94 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -125,6 +125,7 @@ handle_call(Unexpected, From, State) -> handle_cast({configure_channel, Id, Socket, Peer}, none) -> + log(debug, "Opening channel ~p~n", [Id]), State = #s{chan_id = Id, socket = Socket, peer = Peer}, @@ -170,7 +171,7 @@ do_send(Message, Mod, State = #s{chan_id = Id, peer = Peer, socket = Sock, seq = Seq}) -> - io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), + log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), IsNewChannel = ChanMode == closed, {Op, NewChanMode} = choose_opcode(IsNewChannel, Mod), Datagram = <>, @@ -203,12 +204,12 @@ do_handle_datagram(State, <>) -> % did get through and how it was interpreted. do_handle_ack_cancel(State, Data); do_handle_datagram(State, <>) -> - io:format("Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), + log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), % Reliable stream operations need to be ordered first. NewState = do_buffer_reliable(State, Op, Seq, Rest), do_execute_buffer(NewState); do_handle_datagram(State, Unexpected) -> - io:format("Got datagram ~p which is too short.~n", [Unexpected]), + log(warning, "Got datagram ~p which is too short.~n", [Unexpected]), State. do_handle_lossy(State, Data) -> diff --git a/src/msp_channel_man.erl b/src/msp_channel_man.erl index 681fe0c..e10b85f 100644 --- a/src/msp_channel_man.erl +++ b/src/msp_channel_man.erl @@ -9,7 +9,7 @@ -copyright("Jarvis Carroll "). -license("MIT"). --export([add_route/4, create_channel/3, dispatch_fallback/5]). +-export([add_route/4, create_channel/3, dispatch_fallback/4]). %% Worker interface -export([enroll/0]). %% gen_server @@ -31,6 +31,7 @@ -record(route_info, {router :: pid(), socket :: gen_udp:socket(), + side :: 0 | 1, unused_channels :: [integer()], used_channels :: #{integer() => #channel{}}}). @@ -41,14 +42,37 @@ %%% Service Interface +add_route(Sock, TheirIP, TheirPort, Side) -> + case add_passive_route(Sock, TheirIP, TheirPort, Side) of + ok -> + transfer_socket(Sock, TheirIP, TheirPort); + {error, Reason} -> + {error, Reason} + end. + +add_passive_route(Sock, TheirIP, TheirPort, Side) -> + log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]), + gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}). + % A pretty complex function for an interface, but we need to transfer ownership % of the socket. -add_route(Sock, TheirIP, TheirPort, Side) -> - % Transfer the socket to the manager. We will then transfer it on to the - % router, once it is spawned. +transfer_socket(Sock, TheirIP, TheirPort) -> case gen_udp:controlling_process(Sock, whereis(?MODULE)) of ok -> - gen_server:cast(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}); + transfer_socket2(Sock, {TheirIP, TheirPort}); + {error, Reason} -> + {error, Reason} + end. + +transfer_socket2(Sock, Peer) -> + % Since it was transferred, any backlog of datagrams will now be in the + % channel manager's inbox. Turn it off from active, so that the manager + % knows that it will be fully caught up *before* the socket_transferred + % notification arrives. + case inet:setopts(Sock, [{active, false}]) of + ok -> + % Now the socket is all cleaned up, let the manager know. + gen_server:call(?MODULE, {socket_transferred, Sock, Peer}); {error, Reason} -> {error, Reason} end. @@ -59,9 +83,8 @@ create_channel(OurPort, TheirIP, TheirPort) -> % Deliver messages on behalf of msp_router, if it doesn't know where to send % something. -dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) -> - Route = #route{from = OurPort, to = {TheirIP, TheirPort}}, - gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}). +dispatch_fallback(Socket, Peer, ID, Packet) -> + gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}). %%% Worker Interface @@ -89,6 +112,12 @@ init(none) -> {ok, State}. +handle_call({add_route, Sock, Route, Side}, _, State) -> + {Result, NewState} = do_add_route(Sock, Route, Side, State), + {reply, Result, NewState}; +handle_call({socket_transferred, Sock, Peer}, _, State) -> + Result = do_socket_transferred(Sock, Peer, State), + {reply, Result, State}; handle_call({create_channel, Route}, _, State) -> {Result, NewState} = do_create_channel(State, Route), {reply, Result, NewState}; @@ -97,11 +126,8 @@ handle_call(Unexpected, From, State) -> {noreply, State}. -handle_cast({add_route, Sock, Route, Side}, State) -> - NewState = do_add_route(Sock, Route, Side, State), - {noreply, NewState}; -handle_cast({dispatch, Route, ID, Packet}, State) -> - NewState = do_dispatch(Route, ID, Packet, State), +handle_cast({dispatch, Socket, Peer, ID, Packet}, State) -> + NewState = do_dispatch(Socket, Peer, ID, Packet, State), {noreply, NewState}; handle_cast({enroll, PID}, State) -> NewState = do_enroll(PID, State), @@ -114,10 +140,23 @@ handle_cast(Unexpected, State) -> handle_info({'DOWN', Mon, process, PID, Reason}, State) -> NewState = handle_down(Mon, PID, Reason, State), {noreply, NewState}; +handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) -> + NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State), + {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. +handle_udp(Socket, Peer, <>, State) -> + do_dispatch(Socket, Peer, ID, Packet, State); +handle_udp(Socket, Peer, [ID | PacketList], State) -> + Packet = list_to_binary(PacketList), + do_dispatch(Socket, Peer, ID, Packet, State); +handle_udp(_, _, <<>>, State) -> + % Empty datagram... Ignore. + State; +handle_udp(_, _, [], State) -> + State. handle_down(Mon, PID, Reason, State) -> % TODO @@ -161,17 +200,53 @@ do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) -> {ok, OurPort} = inet:port(Sock), Route = #route{from = OurPort, to = Peer}, % Check that this route isn't registered already... - % TODO: Maybe detect this case before we transfer the socket? It's all - % annoying though. - false = maps:is_key(Route, Conns), + case maps:is_key(Route, Conns) of + true -> + Result = {error, already_added}, + {Result, State}; + false -> + do_add_route2(Sock, Route, Side, State) + end. + +do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) -> {ok, Router} = msp_router:start_link(), - ok = msp_router:begin_routing(Router, Sock, Peer, Side), NewConn = #route_info{router = Router, socket = Sock, + side = Side, unused_channels = lists:seq(0, 255), used_channels = #{}}, NewConns = maps:put(Route, NewConn, Conns), - State#s{connections = NewConns}. + NewState = State#s{connections = NewConns}, + {ok, NewState}. + +do_socket_transferred(Sock, Peer, #s{connections = Conns}) -> + {ok, OurPort} = inet:port(Sock), + Route = #route{from = OurPort, to = Peer}, + % Check that this route is actually registered. + case maps:find(Route, Conns) of + {ok, Conn} -> + do_socket_transferred2(Sock, Peer, Conn); + error -> + {error, unknown_connection} + end. + +do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels}) +-> + msp_router:begin_routing(Router, Sock, Peer), + channel_catchup(Router, maps:iterator(Channels)); +do_socket_transferred2(_, _, _) -> + {error, wrong_socket}. + +% If we have created channels without spawning the router process, then at some +% point we will need to tell the router process what channels currently exist. +channel_catchup(Router, Iter) -> + case maps:next(Iter) of + {ChanID, #channel{chan_pid = Chan}, NewIter} -> + msp_router:add_channel(Router, ChanID, Chan), + channel_catchup(Router, NewIter); + none -> + ok + end. do_create_channel(State = #s{connections = Conns}, Route) -> case maps:find(Route, Conns) of @@ -209,11 +284,18 @@ do_create_channel3(State, Route, Info, ChanID) -> {Result, NewState}. -do_dispatch(Route, ID, Packet, State) -> +do_dispatch(Socket, Peer, ID, Packet, State) -> + {ok, OurPort} = inet:port(Socket), + Route = #route{from = OurPort, to = Peer}, + do_dispatch2(Route, ID, Packet, State). + +do_dispatch2(Route, ID, Packet, State) -> case maps:find(Route, State#s.connections) of {ok, Info} -> do_dispatch2(Route, ID, Packet, State, Info); error -> + #route{from = From, to = {{X1, X2, X3, X4}, To}} = Route, + ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]), State end. diff --git a/src/msp_router.erl b/src/msp_router.erl index 6a9f4f5..9f7f477 100644 --- a/src/msp_router.erl +++ b/src/msp_router.erl @@ -9,7 +9,7 @@ -copyright("Jarvis Carroll "). -license("MIT"). --export([begin_routing/4, add_channel/3]). +-export([begin_routing/3, add_channel/3]). %% gen_server -export([start_link/0]). @@ -28,7 +28,6 @@ -record(s, {socket :: gen_udp:socket(), peer :: endpoint(), - side :: 0 | 1, connections = #{} :: #{integer() => channel()}}). -type state() :: none | #s{}. @@ -38,13 +37,13 @@ % msp does not negotiate new connections, but once you have a % socket, and a host that knows you will be talking MSP, then a % new msp_router can be initialized. -begin_routing(Router, OurSock, {TheirIP, TheirPort}, OurSide) -> +begin_routing(Router, OurSock, {TheirIP, TheirPort}) -> % Transfer the socket to the gen_server. If it is % active then a bunch of messages will be received % at once, but that is fine. case gen_udp:controlling_process(OurSock, Router) of ok -> - gen_server:cast(Router, {begin_routing, OurSock, {TheirIP, TheirPort}, OurSide}); + gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}}); {error, Reason} -> {error, Reason} end. @@ -67,14 +66,17 @@ init(none) -> {ok, none}. +handle_call({begin_routing, Sock, Peer}, _, none) -> + {Result, State} = do_begin_routing(Sock, Peer), + {reply, Result, State}; +handle_call({begin_routing, _, _, _}, _, State = #s{}) -> + Result = {error, already_routing}, + {reply, Result, State}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), {reply, undef, State}. -handle_cast({begin_routing, Sock, Peer, Side}, none) -> - State = do_begin_routing(Sock, Peer, Side), - {noreply, State}; handle_cast({add_channel, ChanID, ChanPID}, State) -> NewState = do_add_channel(ChanID, ChanPID, State), {noreply, NewState}; @@ -84,11 +86,11 @@ handle_cast(Unexpected, State) -> handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) -> - io:format("Got packet: ~p~n", [Packet]), + log(debug, "Got packet: ~p~n", [Packet]), do_dispatch(State, Packet), {noreply, State}; handle_info(Unexpected, State) -> - ok = io:format("Unexpected info: ~tp~n", [Unexpected]), + ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]), {noreply, State}. @@ -111,13 +113,14 @@ terminate(_, _) -> %%% Doer Functions -do_begin_routing(Sock, Peer, Side) -> +do_begin_routing(Sock, Peer) -> ok = inet:setopts(Sock, [{active, once}, {mode, binary}]), - State = #s{socket = Sock, - peer = Peer, - side = Side}, - State. + State = #s{socket = Sock, peer = Peer}, + {ok, State}. +do_add_channel(_, _, none) -> + % Do nothing... We will need to be told about these channels again later. + none; do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) -> NewConns = maps:put(ChanID, ChanPID, Conns), State#s{connections = NewConns}. @@ -128,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, < erlang:send(Conn, {msp_fragment, Packet}); error -> - {ok, OurPort} = inet:port(Sock), - {TheirIP, TheirPort} = Peer, - msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) + msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet) end; do_dispatch(_, <<>>) -> % Empty datagram? diff --git a/src/msp_tests.erl b/src/msp_tests.erl index f6f0855..e8e455d 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -22,8 +22,8 @@ run_tests_protected() -> run_tests() -> %ok = send_test(), - ok = out_of_order_test(), - %ok = race_test(), + %ok = out_of_order_test(), + ok = race_test(), ok. make_connection(OurPort, TheirIP, TheirPort, Side) ->