buffer out-of-order messages

This took a while to debug!! Turns out I don't actually
handle datagrams that have been received before the
workers are all initialized... oops!!
This commit is contained in:
Jarvis Carroll 2025-10-27 07:31:27 +00:00
parent b6c16967e7
commit e8ef0b4304
3 changed files with 109 additions and 30 deletions

View File

@ -75,12 +75,19 @@
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(instruction,
{op :: integer(),
seq :: integer(),
data :: binary()}).
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer()}).
seq = 0 :: integer(),
instructions = [] :: [#instruction{}],
fragments = [] :: [binary()]}).
-type state() :: none | #s{}.
@ -181,10 +188,25 @@ choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
do_handle_ack(State, Data);
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
do_handle_nack(State, Data);
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
do_handle_cancel(State, Data);
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
% TODO: do we need ack cancel? There isn't much you can do with the
% sequence information, you really just need the server to tell you what
% did get through and how it was interpreted.
do_handle_ack_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
do_handle_reliable(State, Op, Seq, Rest);
io:format("Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
State.
@ -193,6 +215,49 @@ do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_ack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_nack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_ack_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
%%% Reliable message reordering
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
State#s{instructions = NewInstructions}.
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
% We need to go later than this. Move it to the 'earlier than us' pile.
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
% There is already an instruction with this sequence ID. Use that.
lists:reverse(Earlier, Is);
insert_instruction(Op, Seq, Data, Is, Earlier) ->
% If neither of the above conditions are met, we can add it here.
NewI = #instruction{op = Op, seq = Seq, data = Data},
lists:reverse(Earlier, [NewI | Is]).
%%% Reliable message execution
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
State.
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
@ -205,16 +270,8 @@ do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
@ -227,26 +284,10 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.

View File

@ -84,10 +84,11 @@ handle_cast(Unexpected, State) ->
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
io:format("Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
ok = io:format("Unexpected info: ~tp~n", [Unexpected]),
{noreply, State}.

View File

@ -6,6 +6,9 @@ spawn_tests() ->
ok.
run_tests_and_halt() ->
% Add a handler to print all log messages to the screen, so we can see why
% tests are failing, if they are.
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
Result = run_tests_protected(),
io:format("Tests returned ~p~n", [Result]),
halt().
@ -18,7 +21,9 @@ run_tests_protected() ->
end.
run_tests() ->
ok = send_test(),
%ok = send_test(),
ok = out_of_order_test(),
%ok = race_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->
@ -27,8 +32,8 @@ make_connection(OurPort, TheirIP, TheirPort, Side) ->
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5555,
PortB = 6666,
PortA = 5511,
PortB = 5512,
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
@ -43,4 +48,36 @@ send_test() ->
timer:sleep(10),
ok.
out_of_order_test() ->
IP = {127, 0, 0, 1},
PortA = 5521,
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
% TODO: fix race_test and remove this sleep
timer:sleep(10),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
timer:sleep(10),
ok.
race_test() ->
IP = {127, 0, 0, 1},
PortA = 5531,
PortB = 5532,
{ok, SockA} = gen_udp:open(PortA),
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
timer:sleep(10),
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
timer:sleep(10),
ok.