Compare commits
8 Commits
0ccd64e2f1
..
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 39a75ca435 | |||
| 23276662bb | |||
| 78ea3fc85e | |||
| e8ef0b4304 | |||
| b6c16967e7 | |||
| 47612c2775 | |||
| cfd9eb748f | |||
| 669b2e51e1 |
+276
-90
@@ -10,7 +10,13 @@
|
||||
-license("MIT").
|
||||
|
||||
-export([configure_channel/4]).
|
||||
-export([send_and_close/2]).
|
||||
-export([send_and_close/2, send_message/2]).
|
||||
% A lot of interfaces, but the last two are only really written for
|
||||
% completeness. The distinctions here are blocking vs subscribing, and
|
||||
% fragments vs messages.
|
||||
-export([receive_fragment/1, receive_message/1,
|
||||
subscribe_fragment/1, subscribe_message/1,
|
||||
subscribe_fragment_once/1, subscribe_message_once/1]).
|
||||
|
||||
%% gen_server
|
||||
-export([start_link/0]).
|
||||
@@ -24,30 +30,21 @@
|
||||
% Fragment/message opcodes:
|
||||
% _____________________________________________________________________________
|
||||
% OP_FRAGMENT | Part of a message/stream
|
||||
% OP_OPEN | Open a new channel, and send the first fragment of the
|
||||
% | first message in it.
|
||||
% OP_END_MESSAGE | End of a message/stream, expects a response
|
||||
% OP_OPEN_END | Open a new channel, and send an entire message
|
||||
% OP_CLOSE | End of a message/stream, do not respond, close the channel
|
||||
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
|
||||
% | channel, all in one datagram. Sort of like
|
||||
% | gen_server:cast/2, or if UDP were reliable.
|
||||
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
|
||||
% | the stream goes idle, or for any other
|
||||
% | implementation-specific needs.
|
||||
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
|
||||
% | side to reply on this channel.
|
||||
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
|
||||
%
|
||||
% Transport Control Stuff
|
||||
% -----------------------------------------------------------------------------
|
||||
% OP_ACK | Acknowledge that instructions up to N have been received.
|
||||
% OP_STATUS | Request an ACK for this exact command. Has many uses -
|
||||
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
|
||||
% | to stop the receiver from coalescing ACKs too much.
|
||||
% OP_NACK | Instruction N has not been received.
|
||||
% OP_CANCEL | Sender is hoping that messages M through N weren't
|
||||
% | delivered, and can be cancelled.
|
||||
% OP_ACK_CANCEL | Respond to OP_CANCEL.
|
||||
%
|
||||
% Stream Negotiation
|
||||
% _____________________________________________________________________________
|
||||
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
|
||||
% | should be moved to a different ID.
|
||||
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
|
||||
% | end the stream now. The channel stays open so that the
|
||||
% | other side can communicate what they did/didn't | receive,
|
||||
% | and what actions were taken, if the application requires.
|
||||
%
|
||||
% Lossy
|
||||
% _____________________________________________________________________________
|
||||
@@ -56,31 +53,43 @@
|
||||
% OP_LOSSY | A lossy datagram, for the application to deal with.
|
||||
|
||||
% Numbers aren't final.
|
||||
-define(OP_FRAGMENT, 0).
|
||||
-define(OP_OPEN, 1).
|
||||
-define(OP_END_MESSAGE, 2).
|
||||
-define(OP_OPEN_END, 3).
|
||||
-define(OP_CLOSE, 4).
|
||||
-define(OP_OPEN_CLOSE, 5).
|
||||
-define(OP_ACK, 6).
|
||||
-define(OP_STATUS, 7).
|
||||
-define(OP_NACK, 8).
|
||||
-define(OP_CANCEL, 9).
|
||||
-define(OP_ACK_CANCEL, 10).
|
||||
-define(OP_REQ_MOVE, 11).
|
||||
-define(OP_OPEN_DIRTY, 12).
|
||||
-define(OP_LOSSY, 13).
|
||||
-define(OP_FRAGMENT, 0).
|
||||
-define(OP_END_MESSAGE, 1).
|
||||
-define(OP_CLOSE, 2).
|
||||
-define(OP_STATUS, 3).
|
||||
-define(OP_ACK, 4).
|
||||
-define(OP_NACK, 5).
|
||||
-define(OP_CANCEL, 6).
|
||||
-define(OP_OPEN_DIRTY, 7).
|
||||
-define(OP_LOSSY, 8).
|
||||
|
||||
%%% Type and Record Definitions
|
||||
|
||||
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
||||
|
||||
-record(instruction,
|
||||
{op :: integer(),
|
||||
seq :: integer(),
|
||||
data :: binary()}).
|
||||
|
||||
|
||||
-type request_type() :: fragment | message.
|
||||
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
|
||||
-record(request,
|
||||
{type :: request_type(),
|
||||
recipient :: request_recipient()}).
|
||||
-type request() :: #request{}.
|
||||
|
||||
-record(s,
|
||||
{chan_id :: integer(),
|
||||
socket :: gen_udp:socket(),
|
||||
peer :: endpoint(), % TODO configure the socket in advance?
|
||||
state = closed :: closed | sending | receiving,
|
||||
seq = 0 :: integer()}).
|
||||
seq = 0 :: integer(),
|
||||
requests = {[], []} :: {[request()], [request()]},
|
||||
subscriber = none :: none | request(),
|
||||
instructions = [] :: [#instruction{}],
|
||||
fragments = {[], []} :: {[binary()], [binary()]}}).
|
||||
|
||||
|
||||
-type state() :: none | #s{}.
|
||||
@@ -95,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) ->
|
||||
send_and_close(Chan, Message) ->
|
||||
gen_server:call(Chan, {send, Message, close}).
|
||||
|
||||
send_message(Chan, Message) ->
|
||||
gen_server:call(Chan, {send, Message, end_message}).
|
||||
|
||||
receive_fragment(Chan) ->
|
||||
gen_server:call(Chan, {add_sync_request, fragment}).
|
||||
|
||||
receive_message(Chan) ->
|
||||
gen_server:call(Chan, {add_sync_request, message}).
|
||||
|
||||
subscribe_fragment(Chan) ->
|
||||
Pid = self(),
|
||||
gen_server:call(Chan, {subscribe, fragment, Pid}).
|
||||
|
||||
subscribe_message(Chan) ->
|
||||
Pid = self(),
|
||||
gen_server:call(Chan, {subscribe, message, Pid}).
|
||||
|
||||
subscribe_fragment_once(Chan) ->
|
||||
Pid = self(),
|
||||
gen_server:call(Chan, {add_async_request, fragment, Pid}).
|
||||
|
||||
subscribe_message_once(Chan) ->
|
||||
Pid = self(),
|
||||
gen_server:call(Chan, {add_async_request, message, Pid}).
|
||||
|
||||
%%% gen_server
|
||||
|
||||
-spec start_link() -> Result
|
||||
@@ -112,12 +146,22 @@ init(none) ->
|
||||
handle_call({send, Message, Mod}, _, State) ->
|
||||
{Result, NewState} = do_send(Message, Mod, State),
|
||||
{reply, Result, NewState};
|
||||
handle_call({add_sync_request, Type}, From, State) ->
|
||||
NewState = do_add_sync_request(Type, From, State),
|
||||
{noreply, NewState};
|
||||
handle_call({add_async_request, Type, Pid}, _, State) ->
|
||||
{Result, NewState} = do_add_async_request(Type, Pid, State),
|
||||
{reply, Result, NewState};
|
||||
handle_call({subscribe, Type, Pid}, _, State) ->
|
||||
{Result, NewState} = do_subscribe(Type, Pid, State),
|
||||
{reply, Result, NewState};
|
||||
handle_call(Unexpected, From, State) ->
|
||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
|
||||
log(debug, "Opening channel ~p~n", [Id]),
|
||||
State = #s{chan_id = Id,
|
||||
socket = Socket,
|
||||
peer = Peer},
|
||||
@@ -159,95 +203,237 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
|
||||
do_send(_, _, State = #s{state = receiving}) ->
|
||||
{{error, not_sending}, State};
|
||||
do_send(Message, Mod, State = #s{chan_id = Id,
|
||||
state = ChanMode,
|
||||
peer = Peer,
|
||||
socket = Sock,
|
||||
seq = Seq}) ->
|
||||
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||
IsNewChannel = ChanMode == closed,
|
||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
||||
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||
{Op, NewChanMode} = choose_opcode(Mod),
|
||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
||||
gen_udp:send(Sock, Peer, Datagram),
|
||||
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
||||
{ok, NewState}.
|
||||
|
||||
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
|
||||
choose_opcode(true, none) -> {?OP_OPEN, sending};
|
||||
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
|
||||
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
|
||||
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
|
||||
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
|
||||
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||
choose_opcode(none) -> {?OP_FRAGMENT, sending};
|
||||
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
|
||||
choose_opcode(close) -> {?OP_CLOSE, closed};
|
||||
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||
|
||||
%%% Subscription Doer Functions
|
||||
|
||||
% Adds a request to the queue, or responds to that request directly, if there
|
||||
% is a subscriber already.
|
||||
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
|
||||
do_add_request(Type, {sync, From}, State);
|
||||
do_add_sync_request(_, From, State) ->
|
||||
Result = {error, already_subscribed},
|
||||
gen_server:reply(From, Result),
|
||||
State.
|
||||
|
||||
% Adds a request to the queue and returns ok, or returns an error if there is a
|
||||
% subscriber already.
|
||||
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
|
||||
NewState = do_add_request(Type, {async, Pid}, State),
|
||||
{ok, NewState};
|
||||
do_add_async_request(_, _, State) ->
|
||||
Result = {error, already_subscribed},
|
||||
{Result, State}.
|
||||
|
||||
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
|
||||
Request = #request{type = Type, recipient = Recipient},
|
||||
NewRequests = {[Request | In], Out},
|
||||
StateWithRequest = State#s{requests = NewRequests},
|
||||
try_fill_requests(StateWithRequest).
|
||||
|
||||
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
|
||||
Request = #request{type = Type, recipient = {async, Pid}},
|
||||
StateWithSub = State#s{subscriber = Request},
|
||||
NewState = try_fill_requests(StateWithSub),
|
||||
{ok, NewState};
|
||||
do_subscribe(_, _, State) ->
|
||||
Result = {error, already_subscribed},
|
||||
{Result, State}.
|
||||
|
||||
%%% Receive Doer Functions
|
||||
|
||||
% First handle the messages that don't need to be reordered
|
||||
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
|
||||
do_handle_lossy(State, Data);
|
||||
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
|
||||
do_handle_ack(State, Data);
|
||||
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
|
||||
do_handle_nack(State, Data);
|
||||
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
||||
do_handle_cancel(State, Data);
|
||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
||||
do_handle_reliable(State, Op, Seq, Rest);
|
||||
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
||||
% Reliable stream operations need to be ordered first.
|
||||
NewState = do_buffer_reliable(State, Op, Seq, Rest),
|
||||
do_execute_buffer(NewState);
|
||||
do_handle_datagram(State, Unexpected) ->
|
||||
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
|
||||
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
|
||||
State.
|
||||
|
||||
do_handle_lossy(State, Data) ->
|
||||
io:format("Got lossy datagram: ~p~n", [Data]),
|
||||
State.
|
||||
|
||||
do_handle_ack(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_nack(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_cancel(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
%%% Reliable message reordering
|
||||
|
||||
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
||||
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
|
||||
State#s{instructions = NewInstructions}.
|
||||
|
||||
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
|
||||
% We need to go later than this. Move it to the 'earlier than us' pile.
|
||||
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
|
||||
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
|
||||
% There is already an instruction with this sequence ID. Use that.
|
||||
lists:reverse(Earlier, Is);
|
||||
insert_instruction(Op, Seq, Data, Is, Earlier) ->
|
||||
% If neither of the above conditions are met, we can add it here.
|
||||
NewI = #instruction{op = Op, seq = Seq, data = Data},
|
||||
lists:reverse(Earlier, [NewI | Is]).
|
||||
|
||||
%%% Reliable message execution
|
||||
|
||||
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
|
||||
% If we are closed, then anything we receive opens the channel.
|
||||
StateOpened = State#s{state = receiving},
|
||||
do_execute_buffer(StateOpened);
|
||||
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
|
||||
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
|
||||
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
|
||||
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
|
||||
do_execute_buffer(NewState);
|
||||
do_execute_buffer(State) ->
|
||||
% We've finished consuming instructions; now see if we have any requests we
|
||||
% can fill.
|
||||
try_fill_requests(State).
|
||||
|
||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, continue, Seq, Payload);
|
||||
do_handle_fragment(State, continue, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, end_message, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, end_message, Seq, Payload);
|
||||
do_handle_fragment(State, end_message, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
|
||||
do_handle_ack(State, Seq, Payload);
|
||||
do_handle_fragment(State, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
||||
do_handle_status(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
|
||||
do_handle_nack(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
|
||||
do_handle_cancel(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
|
||||
do_handle_ack_cancel(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
||||
do_handle_req_move(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, dirty, Seq, Payload);
|
||||
do_handle_fragment(State, dirty, Seq, Payload);
|
||||
do_handle_reliable(State, Unknown, _, _) ->
|
||||
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
||||
State.
|
||||
|
||||
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
||||
State.
|
||||
|
||||
do_handle_ack(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_status(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
send_ack(State, Seq),
|
||||
do_handle_fragment(State, continue, Seq, Payload).
|
||||
|
||||
do_handle_nack(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
|
||||
% TODO: backpressure
|
||||
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
|
||||
|
||||
do_handle_cancel(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
do_handle_fragment(State, Mod, _Seq, Payload) ->
|
||||
WithFragment = push_fragment(State, Payload),
|
||||
update_chan_mode(WithFragment, Mod).
|
||||
|
||||
do_handle_ack_cancel(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
|
||||
State#s{fragments = {[Payload | In], Out}}.
|
||||
|
||||
do_handle_req_move(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
update_chan_mode(State, continue) ->
|
||||
State#s{state = receiving};
|
||||
update_chan_mode(State, end_message) ->
|
||||
State#s{state = sending};
|
||||
update_chan_mode(State, close_channel) ->
|
||||
% TODO: Properly clean up subscriptions and requests and things, if the
|
||||
% channel closes... Also if the channel switches to sending, I guess a
|
||||
% fragment subscriber should be notified?
|
||||
State#s{state = closed};
|
||||
update_chan_mode(State, dirty) ->
|
||||
% TODO: create and then quarantine dirty channels
|
||||
State#s{state = receiving}.
|
||||
|
||||
try_fill_requests(State = #s{requests = Requests}) ->
|
||||
case pop_queue(Requests) of
|
||||
{ok, {Remaining, Request}} ->
|
||||
try_fill_requests2(State, Remaining, Request);
|
||||
error ->
|
||||
% No requests, try subscriptions instead.
|
||||
try_fill_subscription(State)
|
||||
end.
|
||||
|
||||
try_fill_requests2(State, RequestsRemaining, Request) ->
|
||||
Without = State#s{requests = RequestsRemaining},
|
||||
case try_fill_request(Without, Request) of
|
||||
{ok, NewState} ->
|
||||
% Done successfully! Start the loop again with the new state.
|
||||
try_fill_requests(NewState);
|
||||
error ->
|
||||
% The request didn't get filled... Just revert.
|
||||
State
|
||||
end.
|
||||
|
||||
try_fill_subscription(State = #s{subscriber = none}) ->
|
||||
% No subscription either! Do nothing.
|
||||
State;
|
||||
try_fill_subscription(State = #s{subscriber = Sub}) ->
|
||||
% A subscription! Try to fill it as many times as possible.
|
||||
try_fill_subscription_loop(State, Sub).
|
||||
|
||||
try_fill_subscription_loop(State, Sub) ->
|
||||
case try_fill_request(State, Sub) of
|
||||
{ok, NewState} ->
|
||||
try_fill_subscription_loop(NewState, Sub);
|
||||
error ->
|
||||
State
|
||||
end.
|
||||
|
||||
try_fill_request(#s{state = receiving}, #request{type = message}) ->
|
||||
% Message not finished yet.
|
||||
error;
|
||||
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
|
||||
% No fragments have been received, not even empty fragments. This
|
||||
% channel/message hasn't even been opened yet.
|
||||
error;
|
||||
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
|
||||
Fragments = Out ++ lists:reverse(In),
|
||||
Message = list_to_binary(Fragments),
|
||||
send_result(Recip, Message),
|
||||
NewState = State#s{fragments = {[], []}},
|
||||
{ok, NewState};
|
||||
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
|
||||
case pop_queue(Fragments) of
|
||||
{ok, {NewFragments, Fragment}} ->
|
||||
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
|
||||
send_result(Recip, Fragment),
|
||||
NewState = State#s{fragments = NewFragments},
|
||||
{ok, NewState};
|
||||
error ->
|
||||
State
|
||||
end.
|
||||
|
||||
send_result({async, Pid}, Result) ->
|
||||
erlang:send(Pid, {msp_fragment, self(), Result});
|
||||
send_result({sync, From}, Result) ->
|
||||
gen_server:reply(From, {ok, Result}).
|
||||
|
||||
pop_queue({[], []}) ->
|
||||
error;
|
||||
pop_queue({In, [Next | Out]}) ->
|
||||
NewFragments = {In, Out},
|
||||
{ok, {NewFragments, Next}};
|
||||
pop_queue({In, []}) ->
|
||||
Out = lists:reverse(In),
|
||||
pop_queue({[], Out}).
|
||||
|
||||
|
||||
+121
-26
@@ -9,7 +9,7 @@
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
|
||||
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
|
||||
%% Worker interface
|
||||
-export([enroll/0]).
|
||||
%% gen_server
|
||||
@@ -29,8 +29,9 @@
|
||||
{chan_pid :: pid()}).
|
||||
|
||||
-record(route_info,
|
||||
{conn_pid :: pid(),
|
||||
{router :: pid(),
|
||||
socket :: gen_udp:socket(),
|
||||
side :: 0 | 1,
|
||||
unused_channels :: [integer()],
|
||||
used_channels :: #{integer() => #channel{}}}).
|
||||
|
||||
@@ -41,19 +42,49 @@
|
||||
|
||||
%%% Service Interface
|
||||
|
||||
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
|
||||
add_route(Sock, TheirIP, TheirPort, Side) ->
|
||||
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
|
||||
ok ->
|
||||
transfer_socket(Sock, TheirIP, TheirPort);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
|
||||
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
|
||||
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
|
||||
|
||||
% A pretty complex function for an interface, but we need to transfer ownership
|
||||
% of the socket.
|
||||
transfer_socket(Sock, TheirIP, TheirPort) ->
|
||||
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
|
||||
ok ->
|
||||
transfer_socket2(Sock, {TheirIP, TheirPort});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
transfer_socket2(Sock, Peer) ->
|
||||
% Since it was transferred, any backlog of datagrams will now be in the
|
||||
% channel manager's inbox. Turn it off from active, so that the manager
|
||||
% knows that it will be fully caught up *before* the socket_transferred
|
||||
% notification arrives.
|
||||
case inet:setopts(Sock, [{active, false}]) of
|
||||
ok ->
|
||||
% Now the socket is all cleaned up, let the manager know.
|
||||
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
create_channel(OurPort, TheirIP, TheirPort) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:call(?MODULE, {create_channel, Route}).
|
||||
|
||||
% Deliver messages on behalf of msp_connection, if it doesn't know where to
|
||||
% send something.
|
||||
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
|
||||
% Deliver messages on behalf of msp_router, if it doesn't know where to send
|
||||
% something.
|
||||
dispatch_fallback(Socket, Peer, ID, Packet) ->
|
||||
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
|
||||
|
||||
%%% Worker Interface
|
||||
|
||||
@@ -81,6 +112,12 @@ init(none) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
handle_call({add_route, Sock, Route, Side}, _, State) ->
|
||||
{Result, NewState} = do_add_route(Sock, Route, Side, State),
|
||||
{reply, Result, NewState};
|
||||
handle_call({socket_transferred, Sock, Peer}, _, State) ->
|
||||
Result = do_socket_transferred(Sock, Peer, State),
|
||||
{reply, Result, State};
|
||||
handle_call({create_channel, Route}, _, State) ->
|
||||
{Result, NewState} = do_create_channel(State, Route),
|
||||
{reply, Result, NewState};
|
||||
@@ -89,11 +126,8 @@ handle_call(Unexpected, From, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_cast({update_router, Route, Conn, Sock}, State) ->
|
||||
NewState = do_update_router(Route, Conn, Sock, State),
|
||||
{noreply, NewState};
|
||||
handle_cast({dispatch, Route, ID, Packet}, State) ->
|
||||
NewState = do_dispatch(Route, ID, Packet, State),
|
||||
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
|
||||
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
|
||||
{noreply, NewState};
|
||||
handle_cast({enroll, PID}, State) ->
|
||||
NewState = do_enroll(PID, State),
|
||||
@@ -106,10 +140,23 @@ handle_cast(Unexpected, State) ->
|
||||
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
|
||||
NewState = handle_down(Mon, PID, Reason, State),
|
||||
{noreply, NewState};
|
||||
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
|
||||
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
|
||||
{noreply, NewState};
|
||||
handle_info(Unexpected, State) ->
|
||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
|
||||
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||
handle_udp(Socket, Peer, [ID | PacketList], State) ->
|
||||
Packet = list_to_binary(PacketList),
|
||||
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||
handle_udp(_, _, <<>>, State) ->
|
||||
% Empty datagram... Ignore.
|
||||
State;
|
||||
handle_udp(_, _, [], State) ->
|
||||
State.
|
||||
|
||||
handle_down(Mon, PID, Reason, State) ->
|
||||
% TODO
|
||||
@@ -149,18 +196,56 @@ do_enroll2(PID, State, Channels) ->
|
||||
State
|
||||
end.
|
||||
|
||||
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
|
||||
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
% Check that this route isn't registered already...
|
||||
case maps:is_key(Route, Conns) of
|
||||
true ->
|
||||
Result = {error, already_added},
|
||||
{Result, State};
|
||||
false ->
|
||||
do_add_route2(Sock, Route, Side, State)
|
||||
end.
|
||||
|
||||
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
|
||||
{ok, Router} = msp_router:start_link(),
|
||||
NewConn = #route_info{router = Router,
|
||||
socket = Sock,
|
||||
side = Side,
|
||||
unused_channels = lists:seq(0, 255),
|
||||
used_channels = #{}},
|
||||
NewConns = maps:put(Route, NewConn, Conns),
|
||||
NewState = State#s{connections = NewConns},
|
||||
{ok, NewState}.
|
||||
|
||||
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
% Check that this route is actually registered.
|
||||
case maps:find(Route, Conns) of
|
||||
{ok, _Info} ->
|
||||
io:format("Updating routes not yet implemented.~n", []),
|
||||
init:stop();
|
||||
{ok, Conn} ->
|
||||
do_socket_transferred2(Sock, Peer, Conn);
|
||||
error ->
|
||||
NewConn = #route_info{conn_pid = Conn,
|
||||
socket = Sock,
|
||||
unused_channels = lists:seq(0, 255),
|
||||
used_channels = #{}},
|
||||
NewConns = maps:put(Route, NewConn, Conns),
|
||||
State#s{connections = NewConns}
|
||||
{error, unknown_connection}
|
||||
end.
|
||||
|
||||
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
|
||||
->
|
||||
msp_router:begin_routing(Router, Sock, Peer),
|
||||
channel_catchup(Router, maps:iterator(Channels));
|
||||
do_socket_transferred2(_, _, _) ->
|
||||
{error, wrong_socket}.
|
||||
|
||||
% If we have created channels without spawning the router process, then at some
|
||||
% point we will need to tell the router process what channels currently exist.
|
||||
channel_catchup(Router, Iter) ->
|
||||
case maps:next(Iter) of
|
||||
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
|
||||
msp_router:add_channel(Router, ChanID, Chan),
|
||||
channel_catchup(Router, NewIter);
|
||||
none ->
|
||||
ok
|
||||
end.
|
||||
|
||||
do_create_channel(State = #s{connections = Conns}, Route) ->
|
||||
@@ -194,13 +279,23 @@ do_create_channel3(State, Route, Info, ChanID) ->
|
||||
NewConns = maps:put(Route, NewInfo, State#s.connections),
|
||||
NewState = State#s{connections = NewConns},
|
||||
|
||||
% Also add it to the router!
|
||||
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
|
||||
|
||||
{Result, NewState}.
|
||||
|
||||
do_dispatch(Route, ID, Packet, State) ->
|
||||
do_dispatch(Socket, Peer, ID, Packet, State) ->
|
||||
{ok, OurPort} = inet:port(Socket),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
do_dispatch2(Route, ID, Packet, State).
|
||||
|
||||
do_dispatch2(Route, ID, Packet, State) ->
|
||||
case maps:find(Route, State#s.connections) of
|
||||
{ok, Info} ->
|
||||
do_dispatch2(Route, ID, Packet, State, Info);
|
||||
error ->
|
||||
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
|
||||
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
|
||||
State
|
||||
end.
|
||||
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
%%% @doc
|
||||
%%% Minimal Stream Protocol : Channel Worker Supervisor
|
||||
%%% @end
|
||||
|
||||
-module(msp_channel_sup).
|
||||
-vsn("0.1.0").
|
||||
-behaviour(supervisor).
|
||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
|
||||
-export([start_channel/0]).
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
|
||||
-spec start_channel() -> Result
|
||||
when Result :: {ok, pid()}
|
||||
| {error, Reason},
|
||||
Reason :: {already_started, pid()}
|
||||
| {shutdown, term()}
|
||||
| term().
|
||||
|
||||
start_channel() ->
|
||||
supervisor:start_child(?MODULE, []).
|
||||
|
||||
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
|
||||
|
||||
|
||||
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
||||
%% @private
|
||||
%% The OTP init/1 function.
|
||||
|
||||
init(none) ->
|
||||
RestartStrategy = {simple_one_for_one, 1, 60},
|
||||
Channel =
|
||||
{msp_channel,
|
||||
{msp_channel, start_link, []},
|
||||
temporary,
|
||||
brutal_kill,
|
||||
worker,
|
||||
[msp_channel]},
|
||||
{ok, {RestartStrategy, [Channel]}}.
|
||||
@@ -2,14 +2,14 @@
|
||||
%%% Minimal Stream Protocol: Connection Worker
|
||||
%%% @end
|
||||
|
||||
-module(msp_connection).
|
||||
-module(msp_router).
|
||||
-vsn("0.1.0").
|
||||
-behavior(gen_server).
|
||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
-export([begin_msp/5]).
|
||||
-export([begin_routing/3, add_channel/3]).
|
||||
|
||||
%% gen_server
|
||||
-export([start_link/0]).
|
||||
@@ -28,7 +28,6 @@
|
||||
-record(s,
|
||||
{socket :: gen_udp:socket(),
|
||||
peer :: endpoint(),
|
||||
side :: 0 | 1,
|
||||
connections = #{} :: #{integer() => channel()}}).
|
||||
|
||||
-type state() :: none | #s{}.
|
||||
@@ -37,18 +36,21 @@
|
||||
|
||||
% msp does not negotiate new connections, but once you have a
|
||||
% socket, and a host that knows you will be talking MSP, then a
|
||||
% new msp_connection can be initialized.
|
||||
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
|
||||
% new msp_router can be initialized.
|
||||
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
|
||||
% Transfer the socket to the gen_server. If it is
|
||||
% active then a bunch of messages will be received
|
||||
% at once, but that is fine.
|
||||
case gen_udp:controlling_process(OurSock, Connection) of
|
||||
case gen_udp:controlling_process(OurSock, Router) of
|
||||
ok ->
|
||||
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
|
||||
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
add_channel(Router, ChanID, ChanPID) ->
|
||||
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
|
||||
|
||||
%%% gen_server
|
||||
|
||||
-spec start_link() -> Result
|
||||
@@ -59,29 +61,36 @@ start_link() ->
|
||||
gen_server:start_link(?MODULE, none, []).
|
||||
|
||||
|
||||
% TODO: Ask msp_connection_man for the socket and endpoint
|
||||
% TODO: SWP? SWP of SWPs?
|
||||
init(none) ->
|
||||
{ok, none}.
|
||||
|
||||
|
||||
handle_call({begin_routing, Sock, Peer}, _, none) ->
|
||||
{Result, State} = do_begin_routing(Sock, Peer),
|
||||
{reply, Result, State};
|
||||
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
|
||||
Result = {error, already_routing},
|
||||
{reply, Result, State};
|
||||
handle_call(Unexpected, From, State) ->
|
||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||
{reply, undef, State}.
|
||||
|
||||
|
||||
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
|
||||
State = do_begin_msp(Sock, Peer, Side),
|
||||
{noreply, State};
|
||||
handle_cast({add_channel, ChanID, ChanPID}, State) ->
|
||||
NewState = do_add_channel(ChanID, ChanPID, State),
|
||||
{noreply, NewState};
|
||||
handle_cast(Unexpected, State) ->
|
||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
||||
log(debug, "Got packet: ~p~n", [Packet]),
|
||||
do_dispatch(State, Packet),
|
||||
{noreply, State};
|
||||
handle_info(Unexpected, State) ->
|
||||
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
@@ -104,12 +113,17 @@ terminate(_, _) ->
|
||||
|
||||
%%% Doer Functions
|
||||
|
||||
do_begin_msp(Sock, Peer, Side) ->
|
||||
do_begin_routing(Sock, Peer) ->
|
||||
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
|
||||
State = #s{socket = Sock,
|
||||
peer = Peer,
|
||||
side = Side},
|
||||
State.
|
||||
State = #s{socket = Sock, peer = Peer},
|
||||
{ok, State}.
|
||||
|
||||
do_add_channel(_, _, none) ->
|
||||
% Do nothing... We will need to be told about these channels again later.
|
||||
none;
|
||||
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
|
||||
NewConns = maps:put(ChanID, ChanPID, Conns),
|
||||
State#s{connections = NewConns}.
|
||||
|
||||
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
|
||||
ok = inet:setopts(Sock, [{active, once}]),
|
||||
@@ -117,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
|
||||
{ok, Conn} ->
|
||||
erlang:send(Conn, {msp_fragment, Packet});
|
||||
error ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
{TheirIP, TheirPort} = Peer,
|
||||
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
|
||||
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
|
||||
end;
|
||||
do_dispatch(_, <<>>) ->
|
||||
% Empty datagram?
|
||||
+61
-16
@@ -6,6 +6,9 @@ spawn_tests() ->
|
||||
ok.
|
||||
|
||||
run_tests_and_halt() ->
|
||||
% Add a handler to print all log messages to the screen, so we can see why
|
||||
% tests are failing, if they are.
|
||||
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
|
||||
Result = run_tests_protected(),
|
||||
io:format("Tests returned ~p~n", [Result]),
|
||||
halt().
|
||||
@@ -19,31 +22,73 @@ run_tests_protected() ->
|
||||
|
||||
run_tests() ->
|
||||
ok = send_test(),
|
||||
ok = out_of_order_test(),
|
||||
ok = race_test(),
|
||||
ok.
|
||||
|
||||
make_connection(OurPort, TheirIP, TheirPort, Side) ->
|
||||
{ok, Sock} = gen_udp:open(OurPort),
|
||||
{ok, Pid} = msp_connection:start_link(),
|
||||
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
|
||||
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
|
||||
{Pid, Sock}.
|
||||
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
|
||||
|
||||
send_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5555,
|
||||
PortB = 6666,
|
||||
{A, SockA} = make_connection(PortA, IP, PortB, 0),
|
||||
{B, SockB} = make_connection(PortB, IP, PortA, 1),
|
||||
PortA = 5511,
|
||||
PortB = 5512,
|
||||
ok = make_connection(PortA, IP, PortB, 0),
|
||||
ok = make_connection(PortB, IP, PortA, 1),
|
||||
|
||||
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
||||
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
|
||||
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
|
||||
timer:sleep(10),
|
||||
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
||||
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||
|
||||
MessageA = <<"message sent from A to B">>,
|
||||
msp_channel:send_and_close(ChanA, MessageA),
|
||||
io:format("A has tried to send a message to B~n"),
|
||||
{ok, MessageA} = msp_channel:receive_message(ChanB),
|
||||
io:format("received successfully~n"),
|
||||
|
||||
MessageB = <<"message sent from B to A">>,
|
||||
msp_channel:send_and_close(ChanB, MessageB),
|
||||
io:format("B has tried to send a message to A~n"),
|
||||
{ok, MessageB} = msp_channel:receive_message(ChanA),
|
||||
io:format("received successfully~n"),
|
||||
|
||||
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
|
||||
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
|
||||
timer:sleep(10),
|
||||
ok.
|
||||
|
||||
out_of_order_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5521,
|
||||
PortB = 5522,
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
ok = make_connection(PortB, IP, PortA, 0),
|
||||
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
|
||||
|
||||
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
|
||||
io:format("Received out of order fragments successfully.~n"),
|
||||
|
||||
ok.
|
||||
|
||||
race_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5531,
|
||||
PortB = 5532,
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
|
||||
% which needs to be properly detected and handled by the socket transfer
|
||||
% functions.
|
||||
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
|
||||
|
||||
% TODO We could test that the datagrams were dispatched properly by
|
||||
% receiving on the channel, but for now we just manually check the
|
||||
% logs/console output for msp_channel_man errors.
|
||||
timer:sleep(10),
|
||||
|
||||
ok.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user