diff --git a/src/msp_channel.erl b/src/msp_channel.erl index 7bedf94..ce79d3a 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -24,30 +24,21 @@ % Fragment/message opcodes: % _____________________________________________________________________________ % OP_FRAGMENT | Part of a message/stream -% OP_OPEN | Open a new channel, and send the first fragment of the -% | first message in it. -% OP_END_MESSAGE | End of a message/stream, expects a response -% OP_OPEN_END | Open a new channel, and send an entire message -% OP_CLOSE | End of a message/stream, do not respond, close the channel -% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the -% | channel, all in one datagram. Sort of like -% | gen_server:cast/2, or if UDP were reliable. +% OP_STATUS | Part of a message, but expects an immediate ACK, in case +% | the stream goes idle, or for any other +% | implementation-specific needs. +% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other +% | side to reply on this channel. +% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel. % % Transport Control Stuff % ----------------------------------------------------------------------------- % OP_ACK | Acknowledge that instructions up to N have been received. -% OP_STATUS | Request an ACK for this exact command. Has many uses - -% | diagnostics, benchmarking, keepalive, dropped ACKs, or just -% | to stop the receiver from coalescing ACKs too much. % OP_NACK | Instruction N has not been received. -% OP_CANCEL | Sender is hoping that messages M through N weren't -% | delivered, and can be cancelled. -% OP_ACK_CANCEL | Respond to OP_CANCEL. -% -% Stream Negotiation -% _____________________________________________________________________________ -% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and -% | should be moved to a different ID. +% OP_CANCEL | Ignore any fragments that weren't already delivered, and +% | end the stream now. The channel stays open so that the +% | other side can communicate what they did/didn't | receive, +% | and what actions were taken, if the application requires. % % Lossy % _____________________________________________________________________________ @@ -56,20 +47,15 @@ % OP_LOSSY | A lossy datagram, for the application to deal with. % Numbers aren't final. --define(OP_FRAGMENT, 0). --define(OP_OPEN, 1). --define(OP_END_MESSAGE, 2). --define(OP_OPEN_END, 3). --define(OP_CLOSE, 4). --define(OP_OPEN_CLOSE, 5). --define(OP_ACK, 6). --define(OP_STATUS, 7). --define(OP_NACK, 8). --define(OP_CANCEL, 9). --define(OP_ACK_CANCEL, 10). --define(OP_REQ_MOVE, 11). --define(OP_OPEN_DIRTY, 12). --define(OP_LOSSY, 13). +-define(OP_FRAGMENT, 0). +-define(OP_END_MESSAGE, 1). +-define(OP_CLOSE, 2). +-define(OP_STATUS, 3). +-define(OP_ACK, 4). +-define(OP_NACK, 5). +-define(OP_CANCEL, 6). +-define(OP_OPEN_DIRTY, 7). +-define(OP_LOSSY, 8). %%% Type and Record Definitions @@ -167,25 +153,20 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed -> do_send(_, _, State = #s{state = receiving}) -> {{error, not_sending}, State}; do_send(Message, Mod, State = #s{chan_id = Id, - state = ChanMode, peer = Peer, socket = Sock, seq = Seq}) -> log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), - IsNewChannel = ChanMode == closed, - {Op, NewChanMode} = choose_opcode(IsNewChannel, Mod), + {Op, NewChanMode} = choose_opcode(Mod), Datagram = <>, gen_udp:send(Sock, Peer, Datagram), NewState = State#s{state = NewChanMode, seq = Seq + 1}, {ok, NewState}. -choose_opcode(false, none) -> {?OP_FRAGMENT, sending}; -choose_opcode(true, none) -> {?OP_OPEN, sending}; -choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving}; -choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving}; -choose_opcode(false, close) -> {?OP_OPEN_END, closed}; -choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed}; -choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}. +choose_opcode(none) -> {?OP_FRAGMENT, sending}; +choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving}; +choose_opcode(close) -> {?OP_CLOSE, closed}; +choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}. %%% Receive Doer Functions @@ -198,11 +179,6 @@ do_handle_datagram(State, <>) -> do_handle_nack(State, Data); do_handle_datagram(State, <>) -> do_handle_cancel(State, Data); -do_handle_datagram(State, <>) -> - % TODO: do we need ack cancel? There isn't much you can do with the - % sequence information, you really just need the server to tell you what - % did get through and how it was interpreted. - do_handle_ack_cancel(State, Data); do_handle_datagram(State, <>) -> log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]), % Reliable stream operations need to be ordered first. @@ -228,10 +204,6 @@ do_handle_cancel(State, Payload) -> io:format("Unimplemented command. Data ~p~n", [Payload]), State. -do_handle_ack_cancel(State, Payload) -> - io:format("Unimplemented command. Data ~p~n", [Payload]), - State. - %%% Reliable message reordering do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) -> @@ -260,36 +232,24 @@ do_execute_buffer(State) -> State. do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) -> - do_handle_fragment(State, false, continue, Seq, Payload); -do_handle_reliable(State, ?OP_OPEN, Seq, Payload) -> - do_handle_fragment(State, true, continue, Seq, Payload); + do_handle_fragment(State, continue, Seq, Payload); do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) -> - do_handle_fragment(State, false, end_message, Seq, Payload); -do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) -> - do_handle_fragment(State, true, end_message, Seq, Payload); + do_handle_fragment(State, end_message, Seq, Payload); do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) -> - do_handle_fragment(State, false, close_channel, Seq, Payload); -do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) -> - do_handle_fragment(State, true, close_channel, Seq, Payload); + do_handle_fragment(State, close_channel, Seq, Payload); do_handle_reliable(State, ?OP_STATUS, Seq, Payload) -> do_handle_status(State, Seq, Payload); -do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) -> - do_handle_req_move(State, Seq, Payload); do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) -> - do_handle_fragment(State, true, dirty, Seq, Payload); + do_handle_fragment(State, dirty, Seq, Payload); do_handle_reliable(State, Unknown, _, _) -> io:format("Got unexpected opcode ~p~n", [Unknown]), State. -do_handle_fragment(State, NewChannel, Mod, Seq, Payload) -> - io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]), +do_handle_fragment(State, Mod, Seq, Payload) -> + io:format("Got fragment ~p with index ~p, and modifier ~p~n", [Payload, Seq, Mod]), State. do_handle_status(State, Seq, Payload) -> io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. -do_handle_req_move(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. - diff --git a/src/msp_tests.erl b/src/msp_tests.erl index e8e455d..81bcc2f 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -21,8 +21,8 @@ run_tests_protected() -> end. run_tests() -> - %ok = send_test(), - %ok = out_of_order_test(), + ok = send_test(), + ok = out_of_order_test(), ok = race_test(), ok. @@ -55,11 +55,8 @@ out_of_order_test() -> {ok, SockA} = gen_udp:open(PortA), ok = make_connection(PortB, IP, PortA, 0), - % TODO: fix race_test and remove this sleep - timer:sleep(10), - - ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>), - ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second message">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first message">>), timer:sleep(10), @@ -71,7 +68,7 @@ race_test() -> PortB = 5532, {ok, SockA} = gen_udp:open(PortA), {ok, SockB} = gen_udp:open(PortB, [{active, true}]), - ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>), timer:sleep(10),