simplify opcodes
OPEN is not really relevant. And a lot of the stream negotiation stuff isn't that useful either. Gone.
This commit is contained in:
parent
78ea3fc85e
commit
23276662bb
@ -24,30 +24,21 @@
|
||||
% Fragment/message opcodes:
|
||||
% _____________________________________________________________________________
|
||||
% OP_FRAGMENT | Part of a message/stream
|
||||
% OP_OPEN | Open a new channel, and send the first fragment of the
|
||||
% | first message in it.
|
||||
% OP_END_MESSAGE | End of a message/stream, expects a response
|
||||
% OP_OPEN_END | Open a new channel, and send an entire message
|
||||
% OP_CLOSE | End of a message/stream, do not respond, close the channel
|
||||
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
|
||||
% | channel, all in one datagram. Sort of like
|
||||
% | gen_server:cast/2, or if UDP were reliable.
|
||||
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
|
||||
% | the stream goes idle, or for any other
|
||||
% | implementation-specific needs.
|
||||
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
|
||||
% | side to reply on this channel.
|
||||
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
|
||||
%
|
||||
% Transport Control Stuff
|
||||
% -----------------------------------------------------------------------------
|
||||
% OP_ACK | Acknowledge that instructions up to N have been received.
|
||||
% OP_STATUS | Request an ACK for this exact command. Has many uses -
|
||||
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
|
||||
% | to stop the receiver from coalescing ACKs too much.
|
||||
% OP_NACK | Instruction N has not been received.
|
||||
% OP_CANCEL | Sender is hoping that messages M through N weren't
|
||||
% | delivered, and can be cancelled.
|
||||
% OP_ACK_CANCEL | Respond to OP_CANCEL.
|
||||
%
|
||||
% Stream Negotiation
|
||||
% _____________________________________________________________________________
|
||||
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
|
||||
% | should be moved to a different ID.
|
||||
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
|
||||
% | end the stream now. The channel stays open so that the
|
||||
% | other side can communicate what they did/didn't | receive,
|
||||
% | and what actions were taken, if the application requires.
|
||||
%
|
||||
% Lossy
|
||||
% _____________________________________________________________________________
|
||||
@ -56,20 +47,15 @@
|
||||
% OP_LOSSY | A lossy datagram, for the application to deal with.
|
||||
|
||||
% Numbers aren't final.
|
||||
-define(OP_FRAGMENT, 0).
|
||||
-define(OP_OPEN, 1).
|
||||
-define(OP_END_MESSAGE, 2).
|
||||
-define(OP_OPEN_END, 3).
|
||||
-define(OP_CLOSE, 4).
|
||||
-define(OP_OPEN_CLOSE, 5).
|
||||
-define(OP_ACK, 6).
|
||||
-define(OP_STATUS, 7).
|
||||
-define(OP_NACK, 8).
|
||||
-define(OP_CANCEL, 9).
|
||||
-define(OP_ACK_CANCEL, 10).
|
||||
-define(OP_REQ_MOVE, 11).
|
||||
-define(OP_OPEN_DIRTY, 12).
|
||||
-define(OP_LOSSY, 13).
|
||||
-define(OP_FRAGMENT, 0).
|
||||
-define(OP_END_MESSAGE, 1).
|
||||
-define(OP_CLOSE, 2).
|
||||
-define(OP_STATUS, 3).
|
||||
-define(OP_ACK, 4).
|
||||
-define(OP_NACK, 5).
|
||||
-define(OP_CANCEL, 6).
|
||||
-define(OP_OPEN_DIRTY, 7).
|
||||
-define(OP_LOSSY, 8).
|
||||
|
||||
%%% Type and Record Definitions
|
||||
|
||||
@ -167,25 +153,20 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
|
||||
do_send(_, _, State = #s{state = receiving}) ->
|
||||
{{error, not_sending}, State};
|
||||
do_send(Message, Mod, State = #s{chan_id = Id,
|
||||
state = ChanMode,
|
||||
peer = Peer,
|
||||
socket = Sock,
|
||||
seq = Seq}) ->
|
||||
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||
IsNewChannel = ChanMode == closed,
|
||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
||||
{Op, NewChanMode} = choose_opcode(Mod),
|
||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
||||
gen_udp:send(Sock, Peer, Datagram),
|
||||
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
||||
{ok, NewState}.
|
||||
|
||||
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
|
||||
choose_opcode(true, none) -> {?OP_OPEN, sending};
|
||||
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
|
||||
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
|
||||
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
|
||||
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
|
||||
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||
choose_opcode(none) -> {?OP_FRAGMENT, sending};
|
||||
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
|
||||
choose_opcode(close) -> {?OP_CLOSE, closed};
|
||||
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||
|
||||
%%% Receive Doer Functions
|
||||
|
||||
@ -198,11 +179,6 @@ do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
|
||||
do_handle_nack(State, Data);
|
||||
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
||||
do_handle_cancel(State, Data);
|
||||
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
|
||||
% TODO: do we need ack cancel? There isn't much you can do with the
|
||||
% sequence information, you really just need the server to tell you what
|
||||
% did get through and how it was interpreted.
|
||||
do_handle_ack_cancel(State, Data);
|
||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
||||
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
||||
% Reliable stream operations need to be ordered first.
|
||||
@ -228,10 +204,6 @@ do_handle_cancel(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_ack_cancel(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
%%% Reliable message reordering
|
||||
|
||||
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
||||
@ -260,36 +232,24 @@ do_execute_buffer(State) ->
|
||||
State.
|
||||
|
||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, continue, Seq, Payload);
|
||||
do_handle_fragment(State, continue, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, end_message, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, end_message, Seq, Payload);
|
||||
do_handle_fragment(State, end_message, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
||||
do_handle_fragment(State, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
||||
do_handle_status(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
||||
do_handle_req_move(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, dirty, Seq, Payload);
|
||||
do_handle_fragment(State, dirty, Seq, Payload);
|
||||
do_handle_reliable(State, Unknown, _, _) ->
|
||||
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
||||
State.
|
||||
|
||||
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
||||
do_handle_fragment(State, Mod, Seq, Payload) ->
|
||||
io:format("Got fragment ~p with index ~p, and modifier ~p~n", [Payload, Seq, Mod]),
|
||||
State.
|
||||
|
||||
do_handle_status(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_req_move(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
|
||||
@ -21,8 +21,8 @@ run_tests_protected() ->
|
||||
end.
|
||||
|
||||
run_tests() ->
|
||||
%ok = send_test(),
|
||||
%ok = out_of_order_test(),
|
||||
ok = send_test(),
|
||||
ok = out_of_order_test(),
|
||||
ok = race_test(),
|
||||
ok.
|
||||
|
||||
@ -55,11 +55,8 @@ out_of_order_test() ->
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
ok = make_connection(PortB, IP, PortA, 0),
|
||||
|
||||
% TODO: fix race_test and remove this sleep
|
||||
timer:sleep(10),
|
||||
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second message">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first message">>),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
@ -71,7 +68,7 @@ race_test() ->
|
||||
PortB = 5532,
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user