handle socket race conditions better

Now the channle manager takes full responsibility for dispatch as
soon as it is initialised, and only passes that responsibility on
to the router process once things have been cleaned up.

What a complex refactor just to pass a socket to a new process!
This commit is contained in:
Jarvis Carroll 2025-10-29 11:56:44 +00:00
parent e8ef0b4304
commit 78ea3fc85e
4 changed files with 125 additions and 41 deletions

View File

@ -125,6 +125,7 @@ handle_call(Unexpected, From, State) ->
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
log(debug, "Opening channel ~p~n", [Id]),
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
@ -170,7 +171,7 @@ do_send(Message, Mod, State = #s{chan_id = Id,
peer = Peer,
socket = Sock,
seq = Seq}) ->
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
@ -203,12 +204,12 @@ do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
% did get through and how it was interpreted.
do_handle_ack_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
io:format("Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->

View File

@ -9,7 +9,7 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([add_route/4, create_channel/3, dispatch_fallback/5]).
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
%% Worker interface
-export([enroll/0]).
%% gen_server
@ -31,6 +31,7 @@
-record(route_info,
{router :: pid(),
socket :: gen_udp:socket(),
side :: 0 | 1,
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
@ -41,14 +42,37 @@
%%% Service Interface
add_route(Sock, TheirIP, TheirPort, Side) ->
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
ok ->
transfer_socket(Sock, TheirIP, TheirPort);
{error, Reason} ->
{error, Reason}
end.
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
% A pretty complex function for an interface, but we need to transfer ownership
% of the socket.
add_route(Sock, TheirIP, TheirPort, Side) ->
% Transfer the socket to the manager. We will then transfer it on to the
% router, once it is spawned.
transfer_socket(Sock, TheirIP, TheirPort) ->
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
ok ->
gen_server:cast(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side});
transfer_socket2(Sock, {TheirIP, TheirPort});
{error, Reason} ->
{error, Reason}
end.
transfer_socket2(Sock, Peer) ->
% Since it was transferred, any backlog of datagrams will now be in the
% channel manager's inbox. Turn it off from active, so that the manager
% knows that it will be fully caught up *before* the socket_transferred
% notification arrives.
case inet:setopts(Sock, [{active, false}]) of
ok ->
% Now the socket is all cleaned up, let the manager know.
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
{error, Reason} ->
{error, Reason}
end.
@ -59,9 +83,8 @@ create_channel(OurPort, TheirIP, TheirPort) ->
% Deliver messages on behalf of msp_router, if it doesn't know where to send
% something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
dispatch_fallback(Socket, Peer, ID, Packet) ->
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
%%% Worker Interface
@ -89,6 +112,12 @@ init(none) ->
{ok, State}.
handle_call({add_route, Sock, Route, Side}, _, State) ->
{Result, NewState} = do_add_route(Sock, Route, Side, State),
{reply, Result, NewState};
handle_call({socket_transferred, Sock, Peer}, _, State) ->
Result = do_socket_transferred(Sock, Peer, State),
{reply, Result, State};
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
@ -97,11 +126,8 @@ handle_call(Unexpected, From, State) ->
{noreply, State}.
handle_cast({add_route, Sock, Route, Side}, State) ->
NewState = do_add_route(Sock, Route, Side, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
@ -114,10 +140,23 @@ handle_cast(Unexpected, State) ->
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(Socket, Peer, [ID | PacketList], State) ->
Packet = list_to_binary(PacketList),
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(_, _, <<>>, State) ->
% Empty datagram... Ignore.
State;
handle_udp(_, _, [], State) ->
State.
handle_down(Mon, PID, Reason, State) ->
% TODO
@ -161,17 +200,53 @@ do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route isn't registered already...
% TODO: Maybe detect this case before we transfer the socket? It's all
% annoying though.
false = maps:is_key(Route, Conns),
case maps:is_key(Route, Conns) of
true ->
Result = {error, already_added},
{Result, State};
false ->
do_add_route2(Sock, Route, Side, State)
end.
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
{ok, Router} = msp_router:start_link(),
ok = msp_router:begin_routing(Router, Sock, Peer, Side),
NewConn = #route_info{router = Router,
socket = Sock,
side = Side,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}.
NewState = State#s{connections = NewConns},
{ok, NewState}.
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route is actually registered.
case maps:find(Route, Conns) of
{ok, Conn} ->
do_socket_transferred2(Sock, Peer, Conn);
error ->
{error, unknown_connection}
end.
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
->
msp_router:begin_routing(Router, Sock, Peer),
channel_catchup(Router, maps:iterator(Channels));
do_socket_transferred2(_, _, _) ->
{error, wrong_socket}.
% If we have created channels without spawning the router process, then at some
% point we will need to tell the router process what channels currently exist.
channel_catchup(Router, Iter) ->
case maps:next(Iter) of
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
msp_router:add_channel(Router, ChanID, Chan),
channel_catchup(Router, NewIter);
none ->
ok
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
case maps:find(Route, Conns) of
@ -209,11 +284,18 @@ do_create_channel3(State, Route, Info, ChanID) ->
{Result, NewState}.
do_dispatch(Route, ID, Packet, State) ->
do_dispatch(Socket, Peer, ID, Packet, State) ->
{ok, OurPort} = inet:port(Socket),
Route = #route{from = OurPort, to = Peer},
do_dispatch2(Route, ID, Packet, State).
do_dispatch2(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
State
end.

View File

@ -9,7 +9,7 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([begin_routing/4, add_channel/3]).
-export([begin_routing/3, add_channel/3]).
%% gen_server
-export([start_link/0]).
@ -28,7 +28,6 @@
-record(s,
{socket :: gen_udp:socket(),
peer :: endpoint(),
side :: 0 | 1,
connections = #{} :: #{integer() => channel()}}).
-type state() :: none | #s{}.
@ -38,13 +37,13 @@
% msp does not negotiate new connections, but once you have a
% socket, and a host that knows you will be talking MSP, then a
% new msp_router can be initialized.
begin_routing(Router, OurSock, {TheirIP, TheirPort}, OurSide) ->
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
% Transfer the socket to the gen_server. If it is
% active then a bunch of messages will be received
% at once, but that is fine.
case gen_udp:controlling_process(OurSock, Router) of
ok ->
gen_server:cast(Router, {begin_routing, OurSock, {TheirIP, TheirPort}, OurSide});
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
{error, Reason} ->
{error, Reason}
end.
@ -67,14 +66,17 @@ init(none) ->
{ok, none}.
handle_call({begin_routing, Sock, Peer}, _, none) ->
{Result, State} = do_begin_routing(Sock, Peer),
{reply, Result, State};
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
Result = {error, already_routing},
{reply, Result, State};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{reply, undef, State}.
handle_cast({begin_routing, Sock, Peer, Side}, none) ->
State = do_begin_routing(Sock, Peer, Side),
{noreply, State};
handle_cast({add_channel, ChanID, ChanPID}, State) ->
NewState = do_add_channel(ChanID, ChanPID, State),
{noreply, NewState};
@ -84,11 +86,11 @@ handle_cast(Unexpected, State) ->
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
io:format("Got packet: ~p~n", [Packet]),
log(debug, "Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format("Unexpected info: ~tp~n", [Unexpected]),
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
{noreply, State}.
@ -111,13 +113,14 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_routing(Sock, Peer, Side) ->
do_begin_routing(Sock, Peer) ->
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
State = #s{socket = Sock, peer = Peer},
{ok, State}.
do_add_channel(_, _, none) ->
% Do nothing... We will need to be told about these channels again later.
none;
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
NewConns = maps:put(ChanID, ChanPID, Conns),
State#s{connections = NewConns}.
@ -128,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?

View File

@ -22,8 +22,8 @@ run_tests_protected() ->
run_tests() ->
%ok = send_test(),
ok = out_of_order_test(),
%ok = race_test(),
%ok = out_of_order_test(),
ok = race_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->