Compare commits
2 Commits
78ea3fc85e
...
39a75ca435
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
39a75ca435 | ||
|
|
23276662bb |
@ -10,7 +10,13 @@
|
|||||||
-license("MIT").
|
-license("MIT").
|
||||||
|
|
||||||
-export([configure_channel/4]).
|
-export([configure_channel/4]).
|
||||||
-export([send_and_close/2]).
|
-export([send_and_close/2, send_message/2]).
|
||||||
|
% A lot of interfaces, but the last two are only really written for
|
||||||
|
% completeness. The distinctions here are blocking vs subscribing, and
|
||||||
|
% fragments vs messages.
|
||||||
|
-export([receive_fragment/1, receive_message/1,
|
||||||
|
subscribe_fragment/1, subscribe_message/1,
|
||||||
|
subscribe_fragment_once/1, subscribe_message_once/1]).
|
||||||
|
|
||||||
%% gen_server
|
%% gen_server
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
@ -24,30 +30,21 @@
|
|||||||
% Fragment/message opcodes:
|
% Fragment/message opcodes:
|
||||||
% _____________________________________________________________________________
|
% _____________________________________________________________________________
|
||||||
% OP_FRAGMENT | Part of a message/stream
|
% OP_FRAGMENT | Part of a message/stream
|
||||||
% OP_OPEN | Open a new channel, and send the first fragment of the
|
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
|
||||||
% | first message in it.
|
% | the stream goes idle, or for any other
|
||||||
% OP_END_MESSAGE | End of a message/stream, expects a response
|
% | implementation-specific needs.
|
||||||
% OP_OPEN_END | Open a new channel, and send an entire message
|
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
|
||||||
% OP_CLOSE | End of a message/stream, do not respond, close the channel
|
% | side to reply on this channel.
|
||||||
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
|
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
|
||||||
% | channel, all in one datagram. Sort of like
|
|
||||||
% | gen_server:cast/2, or if UDP were reliable.
|
|
||||||
%
|
%
|
||||||
% Transport Control Stuff
|
% Transport Control Stuff
|
||||||
% -----------------------------------------------------------------------------
|
% -----------------------------------------------------------------------------
|
||||||
% OP_ACK | Acknowledge that instructions up to N have been received.
|
% OP_ACK | Acknowledge that instructions up to N have been received.
|
||||||
% OP_STATUS | Request an ACK for this exact command. Has many uses -
|
|
||||||
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
|
|
||||||
% | to stop the receiver from coalescing ACKs too much.
|
|
||||||
% OP_NACK | Instruction N has not been received.
|
% OP_NACK | Instruction N has not been received.
|
||||||
% OP_CANCEL | Sender is hoping that messages M through N weren't
|
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
|
||||||
% | delivered, and can be cancelled.
|
% | end the stream now. The channel stays open so that the
|
||||||
% OP_ACK_CANCEL | Respond to OP_CANCEL.
|
% | other side can communicate what they did/didn't | receive,
|
||||||
%
|
% | and what actions were taken, if the application requires.
|
||||||
% Stream Negotiation
|
|
||||||
% _____________________________________________________________________________
|
|
||||||
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
|
|
||||||
% | should be moved to a different ID.
|
|
||||||
%
|
%
|
||||||
% Lossy
|
% Lossy
|
||||||
% _____________________________________________________________________________
|
% _____________________________________________________________________________
|
||||||
@ -57,19 +54,14 @@
|
|||||||
|
|
||||||
% Numbers aren't final.
|
% Numbers aren't final.
|
||||||
-define(OP_FRAGMENT, 0).
|
-define(OP_FRAGMENT, 0).
|
||||||
-define(OP_OPEN, 1).
|
-define(OP_END_MESSAGE, 1).
|
||||||
-define(OP_END_MESSAGE, 2).
|
-define(OP_CLOSE, 2).
|
||||||
-define(OP_OPEN_END, 3).
|
-define(OP_STATUS, 3).
|
||||||
-define(OP_CLOSE, 4).
|
-define(OP_ACK, 4).
|
||||||
-define(OP_OPEN_CLOSE, 5).
|
-define(OP_NACK, 5).
|
||||||
-define(OP_ACK, 6).
|
-define(OP_CANCEL, 6).
|
||||||
-define(OP_STATUS, 7).
|
-define(OP_OPEN_DIRTY, 7).
|
||||||
-define(OP_NACK, 8).
|
-define(OP_LOSSY, 8).
|
||||||
-define(OP_CANCEL, 9).
|
|
||||||
-define(OP_ACK_CANCEL, 10).
|
|
||||||
-define(OP_REQ_MOVE, 11).
|
|
||||||
-define(OP_OPEN_DIRTY, 12).
|
|
||||||
-define(OP_LOSSY, 13).
|
|
||||||
|
|
||||||
%%% Type and Record Definitions
|
%%% Type and Record Definitions
|
||||||
|
|
||||||
@ -80,14 +72,24 @@
|
|||||||
seq :: integer(),
|
seq :: integer(),
|
||||||
data :: binary()}).
|
data :: binary()}).
|
||||||
|
|
||||||
|
|
||||||
|
-type request_type() :: fragment | message.
|
||||||
|
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
|
||||||
|
-record(request,
|
||||||
|
{type :: request_type(),
|
||||||
|
recipient :: request_recipient()}).
|
||||||
|
-type request() :: #request{}.
|
||||||
|
|
||||||
-record(s,
|
-record(s,
|
||||||
{chan_id :: integer(),
|
{chan_id :: integer(),
|
||||||
socket :: gen_udp:socket(),
|
socket :: gen_udp:socket(),
|
||||||
peer :: endpoint(), % TODO configure the socket in advance?
|
peer :: endpoint(), % TODO configure the socket in advance?
|
||||||
state = closed :: closed | sending | receiving,
|
state = closed :: closed | sending | receiving,
|
||||||
seq = 0 :: integer(),
|
seq = 0 :: integer(),
|
||||||
|
requests = {[], []} :: {[request()], [request()]},
|
||||||
|
subscriber = none :: none | request(),
|
||||||
instructions = [] :: [#instruction{}],
|
instructions = [] :: [#instruction{}],
|
||||||
fragments = [] :: [binary()]}).
|
fragments = {[], []} :: {[binary()], [binary()]}}).
|
||||||
|
|
||||||
|
|
||||||
-type state() :: none | #s{}.
|
-type state() :: none | #s{}.
|
||||||
@ -102,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) ->
|
|||||||
send_and_close(Chan, Message) ->
|
send_and_close(Chan, Message) ->
|
||||||
gen_server:call(Chan, {send, Message, close}).
|
gen_server:call(Chan, {send, Message, close}).
|
||||||
|
|
||||||
|
send_message(Chan, Message) ->
|
||||||
|
gen_server:call(Chan, {send, Message, end_message}).
|
||||||
|
|
||||||
|
receive_fragment(Chan) ->
|
||||||
|
gen_server:call(Chan, {add_sync_request, fragment}).
|
||||||
|
|
||||||
|
receive_message(Chan) ->
|
||||||
|
gen_server:call(Chan, {add_sync_request, message}).
|
||||||
|
|
||||||
|
subscribe_fragment(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {subscribe, fragment, Pid}).
|
||||||
|
|
||||||
|
subscribe_message(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {subscribe, message, Pid}).
|
||||||
|
|
||||||
|
subscribe_fragment_once(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {add_async_request, fragment, Pid}).
|
||||||
|
|
||||||
|
subscribe_message_once(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {add_async_request, message, Pid}).
|
||||||
|
|
||||||
%%% gen_server
|
%%% gen_server
|
||||||
|
|
||||||
-spec start_link() -> Result
|
-spec start_link() -> Result
|
||||||
@ -119,6 +146,15 @@ init(none) ->
|
|||||||
handle_call({send, Message, Mod}, _, State) ->
|
handle_call({send, Message, Mod}, _, State) ->
|
||||||
{Result, NewState} = do_send(Message, Mod, State),
|
{Result, NewState} = do_send(Message, Mod, State),
|
||||||
{reply, Result, NewState};
|
{reply, Result, NewState};
|
||||||
|
handle_call({add_sync_request, Type}, From, State) ->
|
||||||
|
NewState = do_add_sync_request(Type, From, State),
|
||||||
|
{noreply, NewState};
|
||||||
|
handle_call({add_async_request, Type, Pid}, _, State) ->
|
||||||
|
{Result, NewState} = do_add_async_request(Type, Pid, State),
|
||||||
|
{reply, Result, NewState};
|
||||||
|
handle_call({subscribe, Type, Pid}, _, State) ->
|
||||||
|
{Result, NewState} = do_subscribe(Type, Pid, State),
|
||||||
|
{reply, Result, NewState};
|
||||||
handle_call(Unexpected, From, State) ->
|
handle_call(Unexpected, From, State) ->
|
||||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
@ -167,25 +203,55 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
|
|||||||
do_send(_, _, State = #s{state = receiving}) ->
|
do_send(_, _, State = #s{state = receiving}) ->
|
||||||
{{error, not_sending}, State};
|
{{error, not_sending}, State};
|
||||||
do_send(Message, Mod, State = #s{chan_id = Id,
|
do_send(Message, Mod, State = #s{chan_id = Id,
|
||||||
state = ChanMode,
|
|
||||||
peer = Peer,
|
peer = Peer,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
seq = Seq}) ->
|
seq = Seq}) ->
|
||||||
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||||
IsNewChannel = ChanMode == closed,
|
{Op, NewChanMode} = choose_opcode(Mod),
|
||||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
|
||||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
||||||
gen_udp:send(Sock, Peer, Datagram),
|
gen_udp:send(Sock, Peer, Datagram),
|
||||||
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
||||||
{ok, NewState}.
|
{ok, NewState}.
|
||||||
|
|
||||||
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
|
choose_opcode(none) -> {?OP_FRAGMENT, sending};
|
||||||
choose_opcode(true, none) -> {?OP_OPEN, sending};
|
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
|
||||||
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
|
choose_opcode(close) -> {?OP_CLOSE, closed};
|
||||||
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
|
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||||
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
|
|
||||||
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
|
%%% Subscription Doer Functions
|
||||||
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
|
||||||
|
% Adds a request to the queue, or responds to that request directly, if there
|
||||||
|
% is a subscriber already.
|
||||||
|
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
|
||||||
|
do_add_request(Type, {sync, From}, State);
|
||||||
|
do_add_sync_request(_, From, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
gen_server:reply(From, Result),
|
||||||
|
State.
|
||||||
|
|
||||||
|
% Adds a request to the queue and returns ok, or returns an error if there is a
|
||||||
|
% subscriber already.
|
||||||
|
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
|
||||||
|
NewState = do_add_request(Type, {async, Pid}, State),
|
||||||
|
{ok, NewState};
|
||||||
|
do_add_async_request(_, _, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
{Result, State}.
|
||||||
|
|
||||||
|
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
|
||||||
|
Request = #request{type = Type, recipient = Recipient},
|
||||||
|
NewRequests = {[Request | In], Out},
|
||||||
|
StateWithRequest = State#s{requests = NewRequests},
|
||||||
|
try_fill_requests(StateWithRequest).
|
||||||
|
|
||||||
|
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
|
||||||
|
Request = #request{type = Type, recipient = {async, Pid}},
|
||||||
|
StateWithSub = State#s{subscriber = Request},
|
||||||
|
NewState = try_fill_requests(StateWithSub),
|
||||||
|
{ok, NewState};
|
||||||
|
do_subscribe(_, _, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
{Result, State}.
|
||||||
|
|
||||||
%%% Receive Doer Functions
|
%%% Receive Doer Functions
|
||||||
|
|
||||||
@ -198,11 +264,6 @@ do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
|
|||||||
do_handle_nack(State, Data);
|
do_handle_nack(State, Data);
|
||||||
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
||||||
do_handle_cancel(State, Data);
|
do_handle_cancel(State, Data);
|
||||||
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
|
|
||||||
% TODO: do we need ack cancel? There isn't much you can do with the
|
|
||||||
% sequence information, you really just need the server to tell you what
|
|
||||||
% did get through and how it was interpreted.
|
|
||||||
do_handle_ack_cancel(State, Data);
|
|
||||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
||||||
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
||||||
% Reliable stream operations need to be ordered first.
|
% Reliable stream operations need to be ordered first.
|
||||||
@ -228,10 +289,6 @@ do_handle_cancel(State, Payload) ->
|
|||||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
do_handle_ack_cancel(State, Payload) ->
|
|
||||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
%%% Reliable message reordering
|
%%% Reliable message reordering
|
||||||
|
|
||||||
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
||||||
@ -251,45 +308,132 @@ insert_instruction(Op, Seq, Data, Is, Earlier) ->
|
|||||||
|
|
||||||
%%% Reliable message execution
|
%%% Reliable message execution
|
||||||
|
|
||||||
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
|
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
|
||||||
|
% If we are closed, then anything we receive opens the channel.
|
||||||
|
StateOpened = State#s{state = receiving},
|
||||||
|
do_execute_buffer(StateOpened);
|
||||||
|
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
|
||||||
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
|
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
|
||||||
|
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
|
||||||
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
|
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
|
||||||
do_execute_buffer(NewState);
|
do_execute_buffer(NewState);
|
||||||
do_execute_buffer(State) ->
|
do_execute_buffer(State) ->
|
||||||
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
|
% We've finished consuming instructions; now see if we have any requests we
|
||||||
State.
|
% can fill.
|
||||||
|
try_fill_requests(State).
|
||||||
|
|
||||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
do_handle_fragment(State, continue, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, continue, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, end_message, Seq, Payload);
|
do_handle_fragment(State, end_message, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, end_message, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
do_handle_fragment(State, close_channel, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
||||||
do_handle_status(State, Seq, Payload);
|
do_handle_status(State, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
|
||||||
do_handle_req_move(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
||||||
do_handle_fragment(State, true, dirty, Seq, Payload);
|
do_handle_fragment(State, dirty, Seq, Payload);
|
||||||
do_handle_reliable(State, Unknown, _, _) ->
|
do_handle_reliable(State, Unknown, _, _) ->
|
||||||
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
|
||||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_status(State, Seq, Payload) ->
|
do_handle_status(State, Seq, Payload) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
send_ack(State, Seq),
|
||||||
State.
|
do_handle_fragment(State, continue, Seq, Payload).
|
||||||
|
|
||||||
do_handle_req_move(State, Seq, Payload) ->
|
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
% TODO: backpressure
|
||||||
State.
|
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
|
||||||
|
|
||||||
|
do_handle_fragment(State, Mod, _Seq, Payload) ->
|
||||||
|
WithFragment = push_fragment(State, Payload),
|
||||||
|
update_chan_mode(WithFragment, Mod).
|
||||||
|
|
||||||
|
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
|
||||||
|
State#s{fragments = {[Payload | In], Out}}.
|
||||||
|
|
||||||
|
update_chan_mode(State, continue) ->
|
||||||
|
State#s{state = receiving};
|
||||||
|
update_chan_mode(State, end_message) ->
|
||||||
|
State#s{state = sending};
|
||||||
|
update_chan_mode(State, close_channel) ->
|
||||||
|
% TODO: Properly clean up subscriptions and requests and things, if the
|
||||||
|
% channel closes... Also if the channel switches to sending, I guess a
|
||||||
|
% fragment subscriber should be notified?
|
||||||
|
State#s{state = closed};
|
||||||
|
update_chan_mode(State, dirty) ->
|
||||||
|
% TODO: create and then quarantine dirty channels
|
||||||
|
State#s{state = receiving}.
|
||||||
|
|
||||||
|
try_fill_requests(State = #s{requests = Requests}) ->
|
||||||
|
case pop_queue(Requests) of
|
||||||
|
{ok, {Remaining, Request}} ->
|
||||||
|
try_fill_requests2(State, Remaining, Request);
|
||||||
|
error ->
|
||||||
|
% No requests, try subscriptions instead.
|
||||||
|
try_fill_subscription(State)
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_requests2(State, RequestsRemaining, Request) ->
|
||||||
|
Without = State#s{requests = RequestsRemaining},
|
||||||
|
case try_fill_request(Without, Request) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
% Done successfully! Start the loop again with the new state.
|
||||||
|
try_fill_requests(NewState);
|
||||||
|
error ->
|
||||||
|
% The request didn't get filled... Just revert.
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_subscription(State = #s{subscriber = none}) ->
|
||||||
|
% No subscription either! Do nothing.
|
||||||
|
State;
|
||||||
|
try_fill_subscription(State = #s{subscriber = Sub}) ->
|
||||||
|
% A subscription! Try to fill it as many times as possible.
|
||||||
|
try_fill_subscription_loop(State, Sub).
|
||||||
|
|
||||||
|
try_fill_subscription_loop(State, Sub) ->
|
||||||
|
case try_fill_request(State, Sub) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
try_fill_subscription_loop(NewState, Sub);
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_request(#s{state = receiving}, #request{type = message}) ->
|
||||||
|
% Message not finished yet.
|
||||||
|
error;
|
||||||
|
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
|
||||||
|
% No fragments have been received, not even empty fragments. This
|
||||||
|
% channel/message hasn't even been opened yet.
|
||||||
|
error;
|
||||||
|
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
|
||||||
|
Fragments = Out ++ lists:reverse(In),
|
||||||
|
Message = list_to_binary(Fragments),
|
||||||
|
send_result(Recip, Message),
|
||||||
|
NewState = State#s{fragments = {[], []}},
|
||||||
|
{ok, NewState};
|
||||||
|
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
|
||||||
|
case pop_queue(Fragments) of
|
||||||
|
{ok, {NewFragments, Fragment}} ->
|
||||||
|
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
|
||||||
|
send_result(Recip, Fragment),
|
||||||
|
NewState = State#s{fragments = NewFragments},
|
||||||
|
{ok, NewState};
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_result({async, Pid}, Result) ->
|
||||||
|
erlang:send(Pid, {msp_fragment, self(), Result});
|
||||||
|
send_result({sync, From}, Result) ->
|
||||||
|
gen_server:reply(From, {ok, Result}).
|
||||||
|
|
||||||
|
pop_queue({[], []}) ->
|
||||||
|
error;
|
||||||
|
pop_queue({In, [Next | Out]}) ->
|
||||||
|
NewFragments = {In, Out},
|
||||||
|
{ok, {NewFragments, Next}};
|
||||||
|
pop_queue({In, []}) ->
|
||||||
|
Out = lists:reverse(In),
|
||||||
|
pop_queue({[], Out}).
|
||||||
|
|
||||||
|
|||||||
@ -21,8 +21,8 @@ run_tests_protected() ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
run_tests() ->
|
run_tests() ->
|
||||||
%ok = send_test(),
|
ok = send_test(),
|
||||||
%ok = out_of_order_test(),
|
ok = out_of_order_test(),
|
||||||
ok = race_test(),
|
ok = race_test(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
@ -37,15 +37,21 @@ send_test() ->
|
|||||||
ok = make_connection(PortA, IP, PortB, 0),
|
ok = make_connection(PortA, IP, PortB, 0),
|
||||||
ok = make_connection(PortB, IP, PortA, 1),
|
ok = make_connection(PortB, IP, PortA, 1),
|
||||||
|
|
||||||
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
||||||
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
|
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||||
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
|
|
||||||
timer:sleep(10),
|
MessageA = <<"message sent from A to B">>,
|
||||||
|
msp_channel:send_and_close(ChanA, MessageA),
|
||||||
|
io:format("A has tried to send a message to B~n"),
|
||||||
|
{ok, MessageA} = msp_channel:receive_message(ChanB),
|
||||||
|
io:format("received successfully~n"),
|
||||||
|
|
||||||
|
MessageB = <<"message sent from B to A">>,
|
||||||
|
msp_channel:send_and_close(ChanB, MessageB),
|
||||||
|
io:format("B has tried to send a message to A~n"),
|
||||||
|
{ok, MessageB} = msp_channel:receive_message(ChanA),
|
||||||
|
io:format("received successfully~n"),
|
||||||
|
|
||||||
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
|
||||||
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
|
|
||||||
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
|
|
||||||
timer:sleep(10),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
out_of_order_test() ->
|
out_of_order_test() ->
|
||||||
@ -54,14 +60,13 @@ out_of_order_test() ->
|
|||||||
PortB = 5522,
|
PortB = 5522,
|
||||||
{ok, SockA} = gen_udp:open(PortA),
|
{ok, SockA} = gen_udp:open(PortA),
|
||||||
ok = make_connection(PortB, IP, PortA, 0),
|
ok = make_connection(PortB, IP, PortA, 0),
|
||||||
|
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||||
|
|
||||||
% TODO: fix race_test and remove this sleep
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
|
||||||
timer:sleep(10),
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
|
||||||
|
|
||||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
|
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
|
||||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
|
io:format("Received out of order fragments successfully.~n"),
|
||||||
|
|
||||||
timer:sleep(10),
|
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
@ -71,12 +76,18 @@ race_test() ->
|
|||||||
PortB = 5532,
|
PortB = 5532,
|
||||||
{ok, SockA} = gen_udp:open(PortA),
|
{ok, SockA} = gen_udp:open(PortA),
|
||||||
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
||||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
|
||||||
|
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
|
|
||||||
|
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
|
||||||
|
% which needs to be properly detected and handled by the socket transfer
|
||||||
|
% functions.
|
||||||
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
|
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
|
||||||
|
|
||||||
|
% TODO We could test that the datagrams were dispatched properly by
|
||||||
|
% receiving on the channel, but for now we just manually check the
|
||||||
|
% logs/console output for msp_channel_man errors.
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user