From e8ef0b4304b928ae03c06be5bbb3655b745a6258 Mon Sep 17 00:00:00 2001 From: Jarvis Carroll Date: Mon, 27 Oct 2025 07:31:27 +0000 Subject: [PATCH] buffer out-of-order messages This took a while to debug!! Turns out I don't actually handle datagrams that have been received before the workers are all initialized... oops!! --- src/msp_channel.erl | 93 ++++++++++++++++++++++++++++++++------------- src/msp_router.erl | 3 +- src/msp_tests.erl | 43 +++++++++++++++++++-- 3 files changed, 109 insertions(+), 30 deletions(-) diff --git a/src/msp_channel.erl b/src/msp_channel.erl index 38266dd..d340503 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -75,12 +75,19 @@ -type endpoint() :: {inet:ip_address(), inet:port_number()}. +-record(instruction, + {op :: integer(), + seq :: integer(), + data :: binary()}). + -record(s, {chan_id :: integer(), socket :: gen_udp:socket(), peer :: endpoint(), % TODO configure the socket in advance? state = closed :: closed | sending | receiving, - seq = 0 :: integer()}). + seq = 0 :: integer(), + instructions = [] :: [#instruction{}], + fragments = [] :: [binary()]}). -type state() :: none | #s{}. @@ -181,10 +188,25 @@ choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}. %%% Receive Doer Functions +% First handle the messages that don't need to be reordered do_handle_datagram(State, <>) -> do_handle_lossy(State, Data); +do_handle_datagram(State, <>) -> + do_handle_ack(State, Data); +do_handle_datagram(State, <>) -> + do_handle_nack(State, Data); +do_handle_datagram(State, <>) -> + do_handle_cancel(State, Data); +do_handle_datagram(State, <>) -> + % TODO: do we need ack cancel? There isn't much you can do with the + % sequence information, you really just need the server to tell you what + % did get through and how it was interpreted. + do_handle_ack_cancel(State, Data); do_handle_datagram(State, <>) -> - do_handle_reliable(State, Op, Seq, Rest); + io:format("Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), + % Reliable stream operations need to be ordered first. + NewState = do_buffer_reliable(State, Op, Seq, Rest), + do_execute_buffer(NewState); do_handle_datagram(State, Unexpected) -> io:format("Got datagram ~p which is too short.~n", [Unexpected]), State. @@ -193,6 +215,49 @@ do_handle_lossy(State, Data) -> io:format("Got lossy datagram: ~p~n", [Data]), State. +do_handle_ack(State, Payload) -> + io:format("Unimplemented command. Data ~p~n", [Payload]), + State. + +do_handle_nack(State, Payload) -> + io:format("Unimplemented command. Data ~p~n", [Payload]), + State. + +do_handle_cancel(State, Payload) -> + io:format("Unimplemented command. Data ~p~n", [Payload]), + State. + +do_handle_ack_cancel(State, Payload) -> + io:format("Unimplemented command. Data ~p~n", [Payload]), + State. + +%%% Reliable message reordering + +do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) -> + NewInstructions = insert_instruction(Op, Seq, Rest, Is, []), + State#s{instructions = NewInstructions}. + +insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq -> + % We need to go later than this. Move it to the 'earlier than us' pile. + insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]); +insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) -> + % There is already an instruction with this sequence ID. Use that. + lists:reverse(Earlier, Is); +insert_instruction(Op, Seq, Data, Is, Earlier) -> + % If neither of the above conditions are met, we can add it here. + NewI = #instruction{op = Op, seq = Seq, data = Data}, + lists:reverse(Earlier, [NewI | Is]). + +%%% Reliable message execution + +do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) -> + PoppedState = State#s{instructions = Rest, seq = Seq + 1}, + NewState = do_handle_reliable(PoppedState, Op, Seq, Data), + do_execute_buffer(NewState); +do_execute_buffer(State) -> + % Empty instruction list, or the next instruction hasn't arrived yet. Done. + State. + do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) -> do_handle_fragment(State, false, continue, Seq, Payload); do_handle_reliable(State, ?OP_OPEN, Seq, Payload) -> @@ -205,16 +270,8 @@ do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) -> do_handle_fragment(State, false, close_channel, Seq, Payload); do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) -> do_handle_fragment(State, true, close_channel, Seq, Payload); -do_handle_reliable(State, ?OP_ACK, Seq, Payload) -> - do_handle_ack(State, Seq, Payload); do_handle_reliable(State, ?OP_STATUS, Seq, Payload) -> do_handle_status(State, Seq, Payload); -do_handle_reliable(State, ?OP_NACK, Seq, Payload) -> - do_handle_nack(State, Seq, Payload); -do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) -> - do_handle_cancel(State, Seq, Payload); -do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) -> - do_handle_ack_cancel(State, Seq, Payload); do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) -> do_handle_req_move(State, Seq, Payload); do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) -> @@ -227,26 +284,10 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) -> io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]), State. -do_handle_ack(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. - do_handle_status(State, Seq, Payload) -> io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. -do_handle_nack(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. - -do_handle_cancel(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. - -do_handle_ack_cancel(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. - do_handle_req_move(State, Seq, Payload) -> io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. diff --git a/src/msp_router.erl b/src/msp_router.erl index f67a4ad..6a9f4f5 100644 --- a/src/msp_router.erl +++ b/src/msp_router.erl @@ -84,10 +84,11 @@ handle_cast(Unexpected, State) -> handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) -> + io:format("Got packet: ~p~n", [Packet]), do_dispatch(State, Packet), {noreply, State}; handle_info(Unexpected, State) -> - ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]), + ok = io:format("Unexpected info: ~tp~n", [Unexpected]), {noreply, State}. diff --git a/src/msp_tests.erl b/src/msp_tests.erl index 17bc03d..f6f0855 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -6,6 +6,9 @@ spawn_tests() -> ok. run_tests_and_halt() -> + % Add a handler to print all log messages to the screen, so we can see why + % tests are failing, if they are. + logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}), Result = run_tests_protected(), io:format("Tests returned ~p~n", [Result]), halt(). @@ -18,7 +21,9 @@ run_tests_protected() -> end. run_tests() -> - ok = send_test(), + %ok = send_test(), + ok = out_of_order_test(), + %ok = race_test(), ok. make_connection(OurPort, TheirIP, TheirPort, Side) -> @@ -27,8 +32,8 @@ make_connection(OurPort, TheirIP, TheirPort, Side) -> send_test() -> IP = {127, 0, 0, 1}, - PortA = 5555, - PortB = 6666, + PortA = 5511, + PortB = 5512, ok = make_connection(PortA, IP, PortB, 0), ok = make_connection(PortB, IP, PortA, 1), @@ -43,4 +48,36 @@ send_test() -> timer:sleep(10), ok. +out_of_order_test() -> + IP = {127, 0, 0, 1}, + PortA = 5521, + PortB = 5522, + {ok, SockA} = gen_udp:open(PortA), + ok = make_connection(PortB, IP, PortA, 0), + + % TODO: fix race_test and remove this sleep + timer:sleep(10), + + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>), + + timer:sleep(10), + + ok. + +race_test() -> + IP = {127, 0, 0, 1}, + PortA = 5531, + PortB = 5532, + {ok, SockA} = gen_udp:open(PortA), + {ok, SockB} = gen_udp:open(PortB, [{active, true}]), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>), + + timer:sleep(10), + + ok = msp_channel_man:add_route(SockB, IP, PortA, 0), + + timer:sleep(10), + + ok.