Compare commits
8 Commits
0ccd64e2f1
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 39a75ca435 | |||
| 23276662bb | |||
| 78ea3fc85e | |||
| e8ef0b4304 | |||
| b6c16967e7 | |||
| 47612c2775 | |||
| cfd9eb748f | |||
| 669b2e51e1 |
+276
-90
@@ -10,7 +10,13 @@
|
|||||||
-license("MIT").
|
-license("MIT").
|
||||||
|
|
||||||
-export([configure_channel/4]).
|
-export([configure_channel/4]).
|
||||||
-export([send_and_close/2]).
|
-export([send_and_close/2, send_message/2]).
|
||||||
|
% A lot of interfaces, but the last two are only really written for
|
||||||
|
% completeness. The distinctions here are blocking vs subscribing, and
|
||||||
|
% fragments vs messages.
|
||||||
|
-export([receive_fragment/1, receive_message/1,
|
||||||
|
subscribe_fragment/1, subscribe_message/1,
|
||||||
|
subscribe_fragment_once/1, subscribe_message_once/1]).
|
||||||
|
|
||||||
%% gen_server
|
%% gen_server
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
@@ -24,30 +30,21 @@
|
|||||||
% Fragment/message opcodes:
|
% Fragment/message opcodes:
|
||||||
% _____________________________________________________________________________
|
% _____________________________________________________________________________
|
||||||
% OP_FRAGMENT | Part of a message/stream
|
% OP_FRAGMENT | Part of a message/stream
|
||||||
% OP_OPEN | Open a new channel, and send the first fragment of the
|
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
|
||||||
% | first message in it.
|
% | the stream goes idle, or for any other
|
||||||
% OP_END_MESSAGE | End of a message/stream, expects a response
|
% | implementation-specific needs.
|
||||||
% OP_OPEN_END | Open a new channel, and send an entire message
|
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
|
||||||
% OP_CLOSE | End of a message/stream, do not respond, close the channel
|
% | side to reply on this channel.
|
||||||
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
|
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
|
||||||
% | channel, all in one datagram. Sort of like
|
|
||||||
% | gen_server:cast/2, or if UDP were reliable.
|
|
||||||
%
|
%
|
||||||
% Transport Control Stuff
|
% Transport Control Stuff
|
||||||
% -----------------------------------------------------------------------------
|
% -----------------------------------------------------------------------------
|
||||||
% OP_ACK | Acknowledge that instructions up to N have been received.
|
% OP_ACK | Acknowledge that instructions up to N have been received.
|
||||||
% OP_STATUS | Request an ACK for this exact command. Has many uses -
|
|
||||||
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
|
|
||||||
% | to stop the receiver from coalescing ACKs too much.
|
|
||||||
% OP_NACK | Instruction N has not been received.
|
% OP_NACK | Instruction N has not been received.
|
||||||
% OP_CANCEL | Sender is hoping that messages M through N weren't
|
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
|
||||||
% | delivered, and can be cancelled.
|
% | end the stream now. The channel stays open so that the
|
||||||
% OP_ACK_CANCEL | Respond to OP_CANCEL.
|
% | other side can communicate what they did/didn't | receive,
|
||||||
%
|
% | and what actions were taken, if the application requires.
|
||||||
% Stream Negotiation
|
|
||||||
% _____________________________________________________________________________
|
|
||||||
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
|
|
||||||
% | should be moved to a different ID.
|
|
||||||
%
|
%
|
||||||
% Lossy
|
% Lossy
|
||||||
% _____________________________________________________________________________
|
% _____________________________________________________________________________
|
||||||
@@ -56,31 +53,43 @@
|
|||||||
% OP_LOSSY | A lossy datagram, for the application to deal with.
|
% OP_LOSSY | A lossy datagram, for the application to deal with.
|
||||||
|
|
||||||
% Numbers aren't final.
|
% Numbers aren't final.
|
||||||
-define(OP_FRAGMENT, 0).
|
-define(OP_FRAGMENT, 0).
|
||||||
-define(OP_OPEN, 1).
|
-define(OP_END_MESSAGE, 1).
|
||||||
-define(OP_END_MESSAGE, 2).
|
-define(OP_CLOSE, 2).
|
||||||
-define(OP_OPEN_END, 3).
|
-define(OP_STATUS, 3).
|
||||||
-define(OP_CLOSE, 4).
|
-define(OP_ACK, 4).
|
||||||
-define(OP_OPEN_CLOSE, 5).
|
-define(OP_NACK, 5).
|
||||||
-define(OP_ACK, 6).
|
-define(OP_CANCEL, 6).
|
||||||
-define(OP_STATUS, 7).
|
-define(OP_OPEN_DIRTY, 7).
|
||||||
-define(OP_NACK, 8).
|
-define(OP_LOSSY, 8).
|
||||||
-define(OP_CANCEL, 9).
|
|
||||||
-define(OP_ACK_CANCEL, 10).
|
|
||||||
-define(OP_REQ_MOVE, 11).
|
|
||||||
-define(OP_OPEN_DIRTY, 12).
|
|
||||||
-define(OP_LOSSY, 13).
|
|
||||||
|
|
||||||
%%% Type and Record Definitions
|
%%% Type and Record Definitions
|
||||||
|
|
||||||
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
||||||
|
|
||||||
|
-record(instruction,
|
||||||
|
{op :: integer(),
|
||||||
|
seq :: integer(),
|
||||||
|
data :: binary()}).
|
||||||
|
|
||||||
|
|
||||||
|
-type request_type() :: fragment | message.
|
||||||
|
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
|
||||||
|
-record(request,
|
||||||
|
{type :: request_type(),
|
||||||
|
recipient :: request_recipient()}).
|
||||||
|
-type request() :: #request{}.
|
||||||
|
|
||||||
-record(s,
|
-record(s,
|
||||||
{chan_id :: integer(),
|
{chan_id :: integer(),
|
||||||
socket :: gen_udp:socket(),
|
socket :: gen_udp:socket(),
|
||||||
peer :: endpoint(), % TODO configure the socket in advance?
|
peer :: endpoint(), % TODO configure the socket in advance?
|
||||||
state = closed :: closed | sending | receiving,
|
state = closed :: closed | sending | receiving,
|
||||||
seq = 0 :: integer()}).
|
seq = 0 :: integer(),
|
||||||
|
requests = {[], []} :: {[request()], [request()]},
|
||||||
|
subscriber = none :: none | request(),
|
||||||
|
instructions = [] :: [#instruction{}],
|
||||||
|
fragments = {[], []} :: {[binary()], [binary()]}}).
|
||||||
|
|
||||||
|
|
||||||
-type state() :: none | #s{}.
|
-type state() :: none | #s{}.
|
||||||
@@ -95,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) ->
|
|||||||
send_and_close(Chan, Message) ->
|
send_and_close(Chan, Message) ->
|
||||||
gen_server:call(Chan, {send, Message, close}).
|
gen_server:call(Chan, {send, Message, close}).
|
||||||
|
|
||||||
|
send_message(Chan, Message) ->
|
||||||
|
gen_server:call(Chan, {send, Message, end_message}).
|
||||||
|
|
||||||
|
receive_fragment(Chan) ->
|
||||||
|
gen_server:call(Chan, {add_sync_request, fragment}).
|
||||||
|
|
||||||
|
receive_message(Chan) ->
|
||||||
|
gen_server:call(Chan, {add_sync_request, message}).
|
||||||
|
|
||||||
|
subscribe_fragment(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {subscribe, fragment, Pid}).
|
||||||
|
|
||||||
|
subscribe_message(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {subscribe, message, Pid}).
|
||||||
|
|
||||||
|
subscribe_fragment_once(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {add_async_request, fragment, Pid}).
|
||||||
|
|
||||||
|
subscribe_message_once(Chan) ->
|
||||||
|
Pid = self(),
|
||||||
|
gen_server:call(Chan, {add_async_request, message, Pid}).
|
||||||
|
|
||||||
%%% gen_server
|
%%% gen_server
|
||||||
|
|
||||||
-spec start_link() -> Result
|
-spec start_link() -> Result
|
||||||
@@ -112,12 +146,22 @@ init(none) ->
|
|||||||
handle_call({send, Message, Mod}, _, State) ->
|
handle_call({send, Message, Mod}, _, State) ->
|
||||||
{Result, NewState} = do_send(Message, Mod, State),
|
{Result, NewState} = do_send(Message, Mod, State),
|
||||||
{reply, Result, NewState};
|
{reply, Result, NewState};
|
||||||
|
handle_call({add_sync_request, Type}, From, State) ->
|
||||||
|
NewState = do_add_sync_request(Type, From, State),
|
||||||
|
{noreply, NewState};
|
||||||
|
handle_call({add_async_request, Type, Pid}, _, State) ->
|
||||||
|
{Result, NewState} = do_add_async_request(Type, Pid, State),
|
||||||
|
{reply, Result, NewState};
|
||||||
|
handle_call({subscribe, Type, Pid}, _, State) ->
|
||||||
|
{Result, NewState} = do_subscribe(Type, Pid, State),
|
||||||
|
{reply, Result, NewState};
|
||||||
handle_call(Unexpected, From, State) ->
|
handle_call(Unexpected, From, State) ->
|
||||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
|
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
|
||||||
|
log(debug, "Opening channel ~p~n", [Id]),
|
||||||
State = #s{chan_id = Id,
|
State = #s{chan_id = Id,
|
||||||
socket = Socket,
|
socket = Socket,
|
||||||
peer = Peer},
|
peer = Peer},
|
||||||
@@ -159,95 +203,237 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
|
|||||||
do_send(_, _, State = #s{state = receiving}) ->
|
do_send(_, _, State = #s{state = receiving}) ->
|
||||||
{{error, not_sending}, State};
|
{{error, not_sending}, State};
|
||||||
do_send(Message, Mod, State = #s{chan_id = Id,
|
do_send(Message, Mod, State = #s{chan_id = Id,
|
||||||
state = ChanMode,
|
|
||||||
peer = Peer,
|
peer = Peer,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
seq = Seq}) ->
|
seq = Seq}) ->
|
||||||
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||||
IsNewChannel = ChanMode == closed,
|
{Op, NewChanMode} = choose_opcode(Mod),
|
||||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
|
||||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
||||||
gen_udp:send(Sock, Peer, Datagram),
|
gen_udp:send(Sock, Peer, Datagram),
|
||||||
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
||||||
{ok, NewState}.
|
{ok, NewState}.
|
||||||
|
|
||||||
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
|
choose_opcode(none) -> {?OP_FRAGMENT, sending};
|
||||||
choose_opcode(true, none) -> {?OP_OPEN, sending};
|
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
|
||||||
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
|
choose_opcode(close) -> {?OP_CLOSE, closed};
|
||||||
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
|
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||||
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
|
|
||||||
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
|
%%% Subscription Doer Functions
|
||||||
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
|
||||||
|
% Adds a request to the queue, or responds to that request directly, if there
|
||||||
|
% is a subscriber already.
|
||||||
|
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
|
||||||
|
do_add_request(Type, {sync, From}, State);
|
||||||
|
do_add_sync_request(_, From, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
gen_server:reply(From, Result),
|
||||||
|
State.
|
||||||
|
|
||||||
|
% Adds a request to the queue and returns ok, or returns an error if there is a
|
||||||
|
% subscriber already.
|
||||||
|
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
|
||||||
|
NewState = do_add_request(Type, {async, Pid}, State),
|
||||||
|
{ok, NewState};
|
||||||
|
do_add_async_request(_, _, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
{Result, State}.
|
||||||
|
|
||||||
|
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
|
||||||
|
Request = #request{type = Type, recipient = Recipient},
|
||||||
|
NewRequests = {[Request | In], Out},
|
||||||
|
StateWithRequest = State#s{requests = NewRequests},
|
||||||
|
try_fill_requests(StateWithRequest).
|
||||||
|
|
||||||
|
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
|
||||||
|
Request = #request{type = Type, recipient = {async, Pid}},
|
||||||
|
StateWithSub = State#s{subscriber = Request},
|
||||||
|
NewState = try_fill_requests(StateWithSub),
|
||||||
|
{ok, NewState};
|
||||||
|
do_subscribe(_, _, State) ->
|
||||||
|
Result = {error, already_subscribed},
|
||||||
|
{Result, State}.
|
||||||
|
|
||||||
%%% Receive Doer Functions
|
%%% Receive Doer Functions
|
||||||
|
|
||||||
|
% First handle the messages that don't need to be reordered
|
||||||
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
|
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
|
||||||
do_handle_lossy(State, Data);
|
do_handle_lossy(State, Data);
|
||||||
|
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
|
||||||
|
do_handle_ack(State, Data);
|
||||||
|
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
|
||||||
|
do_handle_nack(State, Data);
|
||||||
|
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
||||||
|
do_handle_cancel(State, Data);
|
||||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
||||||
do_handle_reliable(State, Op, Seq, Rest);
|
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
||||||
|
% Reliable stream operations need to be ordered first.
|
||||||
|
NewState = do_buffer_reliable(State, Op, Seq, Rest),
|
||||||
|
do_execute_buffer(NewState);
|
||||||
do_handle_datagram(State, Unexpected) ->
|
do_handle_datagram(State, Unexpected) ->
|
||||||
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
|
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
do_handle_lossy(State, Data) ->
|
do_handle_lossy(State, Data) ->
|
||||||
io:format("Got lossy datagram: ~p~n", [Data]),
|
io:format("Got lossy datagram: ~p~n", [Data]),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
do_handle_ack(State, Payload) ->
|
||||||
|
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||||
|
State.
|
||||||
|
|
||||||
|
do_handle_nack(State, Payload) ->
|
||||||
|
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||||
|
State.
|
||||||
|
|
||||||
|
do_handle_cancel(State, Payload) ->
|
||||||
|
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||||
|
State.
|
||||||
|
|
||||||
|
%%% Reliable message reordering
|
||||||
|
|
||||||
|
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
||||||
|
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
|
||||||
|
State#s{instructions = NewInstructions}.
|
||||||
|
|
||||||
|
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
|
||||||
|
% We need to go later than this. Move it to the 'earlier than us' pile.
|
||||||
|
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
|
||||||
|
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
|
||||||
|
% There is already an instruction with this sequence ID. Use that.
|
||||||
|
lists:reverse(Earlier, Is);
|
||||||
|
insert_instruction(Op, Seq, Data, Is, Earlier) ->
|
||||||
|
% If neither of the above conditions are met, we can add it here.
|
||||||
|
NewI = #instruction{op = Op, seq = Seq, data = Data},
|
||||||
|
lists:reverse(Earlier, [NewI | Is]).
|
||||||
|
|
||||||
|
%%% Reliable message execution
|
||||||
|
|
||||||
|
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
|
||||||
|
% If we are closed, then anything we receive opens the channel.
|
||||||
|
StateOpened = State#s{state = receiving},
|
||||||
|
do_execute_buffer(StateOpened);
|
||||||
|
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
|
||||||
|
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
|
||||||
|
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
|
||||||
|
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
|
||||||
|
do_execute_buffer(NewState);
|
||||||
|
do_execute_buffer(State) ->
|
||||||
|
% We've finished consuming instructions; now see if we have any requests we
|
||||||
|
% can fill.
|
||||||
|
try_fill_requests(State).
|
||||||
|
|
||||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
do_handle_fragment(State, continue, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, continue, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, end_message, Seq, Payload);
|
do_handle_fragment(State, end_message, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, end_message, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
||||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
do_handle_fragment(State, close_channel, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
|
|
||||||
do_handle_ack(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
||||||
do_handle_status(State, Seq, Payload);
|
do_handle_status(State, Seq, Payload);
|
||||||
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
|
|
||||||
do_handle_nack(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
|
|
||||||
do_handle_cancel(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
|
|
||||||
do_handle_ack_cancel(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
|
||||||
do_handle_req_move(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
||||||
do_handle_fragment(State, true, dirty, Seq, Payload);
|
do_handle_fragment(State, dirty, Seq, Payload);
|
||||||
do_handle_reliable(State, Unknown, _, _) ->
|
do_handle_reliable(State, Unknown, _, _) ->
|
||||||
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
|
||||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_ack(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_status(State, Seq, Payload) ->
|
do_handle_status(State, Seq, Payload) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
send_ack(State, Seq),
|
||||||
State.
|
do_handle_fragment(State, continue, Seq, Payload).
|
||||||
|
|
||||||
do_handle_nack(State, Seq, Payload) ->
|
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
% TODO: backpressure
|
||||||
State.
|
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
|
||||||
|
|
||||||
do_handle_cancel(State, Seq, Payload) ->
|
do_handle_fragment(State, Mod, _Seq, Payload) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
WithFragment = push_fragment(State, Payload),
|
||||||
State.
|
update_chan_mode(WithFragment, Mod).
|
||||||
|
|
||||||
do_handle_ack_cancel(State, Seq, Payload) ->
|
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
State#s{fragments = {[Payload | In], Out}}.
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_req_move(State, Seq, Payload) ->
|
update_chan_mode(State, continue) ->
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
State#s{state = receiving};
|
||||||
State.
|
update_chan_mode(State, end_message) ->
|
||||||
|
State#s{state = sending};
|
||||||
|
update_chan_mode(State, close_channel) ->
|
||||||
|
% TODO: Properly clean up subscriptions and requests and things, if the
|
||||||
|
% channel closes... Also if the channel switches to sending, I guess a
|
||||||
|
% fragment subscriber should be notified?
|
||||||
|
State#s{state = closed};
|
||||||
|
update_chan_mode(State, dirty) ->
|
||||||
|
% TODO: create and then quarantine dirty channels
|
||||||
|
State#s{state = receiving}.
|
||||||
|
|
||||||
|
try_fill_requests(State = #s{requests = Requests}) ->
|
||||||
|
case pop_queue(Requests) of
|
||||||
|
{ok, {Remaining, Request}} ->
|
||||||
|
try_fill_requests2(State, Remaining, Request);
|
||||||
|
error ->
|
||||||
|
% No requests, try subscriptions instead.
|
||||||
|
try_fill_subscription(State)
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_requests2(State, RequestsRemaining, Request) ->
|
||||||
|
Without = State#s{requests = RequestsRemaining},
|
||||||
|
case try_fill_request(Without, Request) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
% Done successfully! Start the loop again with the new state.
|
||||||
|
try_fill_requests(NewState);
|
||||||
|
error ->
|
||||||
|
% The request didn't get filled... Just revert.
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_subscription(State = #s{subscriber = none}) ->
|
||||||
|
% No subscription either! Do nothing.
|
||||||
|
State;
|
||||||
|
try_fill_subscription(State = #s{subscriber = Sub}) ->
|
||||||
|
% A subscription! Try to fill it as many times as possible.
|
||||||
|
try_fill_subscription_loop(State, Sub).
|
||||||
|
|
||||||
|
try_fill_subscription_loop(State, Sub) ->
|
||||||
|
case try_fill_request(State, Sub) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
try_fill_subscription_loop(NewState, Sub);
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_fill_request(#s{state = receiving}, #request{type = message}) ->
|
||||||
|
% Message not finished yet.
|
||||||
|
error;
|
||||||
|
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
|
||||||
|
% No fragments have been received, not even empty fragments. This
|
||||||
|
% channel/message hasn't even been opened yet.
|
||||||
|
error;
|
||||||
|
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
|
||||||
|
Fragments = Out ++ lists:reverse(In),
|
||||||
|
Message = list_to_binary(Fragments),
|
||||||
|
send_result(Recip, Message),
|
||||||
|
NewState = State#s{fragments = {[], []}},
|
||||||
|
{ok, NewState};
|
||||||
|
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
|
||||||
|
case pop_queue(Fragments) of
|
||||||
|
{ok, {NewFragments, Fragment}} ->
|
||||||
|
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
|
||||||
|
send_result(Recip, Fragment),
|
||||||
|
NewState = State#s{fragments = NewFragments},
|
||||||
|
{ok, NewState};
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_result({async, Pid}, Result) ->
|
||||||
|
erlang:send(Pid, {msp_fragment, self(), Result});
|
||||||
|
send_result({sync, From}, Result) ->
|
||||||
|
gen_server:reply(From, {ok, Result}).
|
||||||
|
|
||||||
|
pop_queue({[], []}) ->
|
||||||
|
error;
|
||||||
|
pop_queue({In, [Next | Out]}) ->
|
||||||
|
NewFragments = {In, Out},
|
||||||
|
{ok, {NewFragments, Next}};
|
||||||
|
pop_queue({In, []}) ->
|
||||||
|
Out = lists:reverse(In),
|
||||||
|
pop_queue({[], Out}).
|
||||||
|
|
||||||
|
|||||||
+121
-26
@@ -9,7 +9,7 @@
|
|||||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||||
-license("MIT").
|
-license("MIT").
|
||||||
|
|
||||||
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
|
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
|
||||||
%% Worker interface
|
%% Worker interface
|
||||||
-export([enroll/0]).
|
-export([enroll/0]).
|
||||||
%% gen_server
|
%% gen_server
|
||||||
@@ -29,8 +29,9 @@
|
|||||||
{chan_pid :: pid()}).
|
{chan_pid :: pid()}).
|
||||||
|
|
||||||
-record(route_info,
|
-record(route_info,
|
||||||
{conn_pid :: pid(),
|
{router :: pid(),
|
||||||
socket :: gen_udp:socket(),
|
socket :: gen_udp:socket(),
|
||||||
|
side :: 0 | 1,
|
||||||
unused_channels :: [integer()],
|
unused_channels :: [integer()],
|
||||||
used_channels :: #{integer() => #channel{}}}).
|
used_channels :: #{integer() => #channel{}}}).
|
||||||
|
|
||||||
@@ -41,19 +42,49 @@
|
|||||||
|
|
||||||
%%% Service Interface
|
%%% Service Interface
|
||||||
|
|
||||||
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
|
add_route(Sock, TheirIP, TheirPort, Side) ->
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
|
||||||
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
|
ok ->
|
||||||
|
transfer_socket(Sock, TheirIP, TheirPort);
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
|
||||||
|
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
|
||||||
|
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
|
||||||
|
|
||||||
|
% A pretty complex function for an interface, but we need to transfer ownership
|
||||||
|
% of the socket.
|
||||||
|
transfer_socket(Sock, TheirIP, TheirPort) ->
|
||||||
|
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
|
||||||
|
ok ->
|
||||||
|
transfer_socket2(Sock, {TheirIP, TheirPort});
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
transfer_socket2(Sock, Peer) ->
|
||||||
|
% Since it was transferred, any backlog of datagrams will now be in the
|
||||||
|
% channel manager's inbox. Turn it off from active, so that the manager
|
||||||
|
% knows that it will be fully caught up *before* the socket_transferred
|
||||||
|
% notification arrives.
|
||||||
|
case inet:setopts(Sock, [{active, false}]) of
|
||||||
|
ok ->
|
||||||
|
% Now the socket is all cleaned up, let the manager know.
|
||||||
|
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
create_channel(OurPort, TheirIP, TheirPort) ->
|
create_channel(OurPort, TheirIP, TheirPort) ->
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||||
gen_server:call(?MODULE, {create_channel, Route}).
|
gen_server:call(?MODULE, {create_channel, Route}).
|
||||||
|
|
||||||
% Deliver messages on behalf of msp_connection, if it doesn't know where to
|
% Deliver messages on behalf of msp_router, if it doesn't know where to send
|
||||||
% send something.
|
% something.
|
||||||
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
|
dispatch_fallback(Socket, Peer, ID, Packet) ->
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
|
||||||
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
|
|
||||||
|
|
||||||
%%% Worker Interface
|
%%% Worker Interface
|
||||||
|
|
||||||
@@ -81,6 +112,12 @@ init(none) ->
|
|||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
|
||||||
|
handle_call({add_route, Sock, Route, Side}, _, State) ->
|
||||||
|
{Result, NewState} = do_add_route(Sock, Route, Side, State),
|
||||||
|
{reply, Result, NewState};
|
||||||
|
handle_call({socket_transferred, Sock, Peer}, _, State) ->
|
||||||
|
Result = do_socket_transferred(Sock, Peer, State),
|
||||||
|
{reply, Result, State};
|
||||||
handle_call({create_channel, Route}, _, State) ->
|
handle_call({create_channel, Route}, _, State) ->
|
||||||
{Result, NewState} = do_create_channel(State, Route),
|
{Result, NewState} = do_create_channel(State, Route),
|
||||||
{reply, Result, NewState};
|
{reply, Result, NewState};
|
||||||
@@ -89,11 +126,8 @@ handle_call(Unexpected, From, State) ->
|
|||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
handle_cast({update_router, Route, Conn, Sock}, State) ->
|
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
|
||||||
NewState = do_update_router(Route, Conn, Sock, State),
|
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
|
||||||
{noreply, NewState};
|
|
||||||
handle_cast({dispatch, Route, ID, Packet}, State) ->
|
|
||||||
NewState = do_dispatch(Route, ID, Packet, State),
|
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
handle_cast({enroll, PID}, State) ->
|
handle_cast({enroll, PID}, State) ->
|
||||||
NewState = do_enroll(PID, State),
|
NewState = do_enroll(PID, State),
|
||||||
@@ -106,10 +140,23 @@ handle_cast(Unexpected, State) ->
|
|||||||
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
|
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
|
||||||
NewState = handle_down(Mon, PID, Reason, State),
|
NewState = handle_down(Mon, PID, Reason, State),
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
|
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
|
||||||
|
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
|
||||||
|
{noreply, NewState};
|
||||||
handle_info(Unexpected, State) ->
|
handle_info(Unexpected, State) ->
|
||||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
|
||||||
|
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||||
|
handle_udp(Socket, Peer, [ID | PacketList], State) ->
|
||||||
|
Packet = list_to_binary(PacketList),
|
||||||
|
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||||
|
handle_udp(_, _, <<>>, State) ->
|
||||||
|
% Empty datagram... Ignore.
|
||||||
|
State;
|
||||||
|
handle_udp(_, _, [], State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
handle_down(Mon, PID, Reason, State) ->
|
handle_down(Mon, PID, Reason, State) ->
|
||||||
% TODO
|
% TODO
|
||||||
@@ -149,18 +196,56 @@ do_enroll2(PID, State, Channels) ->
|
|||||||
State
|
State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
|
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
|
||||||
|
{ok, OurPort} = inet:port(Sock),
|
||||||
|
Route = #route{from = OurPort, to = Peer},
|
||||||
|
% Check that this route isn't registered already...
|
||||||
|
case maps:is_key(Route, Conns) of
|
||||||
|
true ->
|
||||||
|
Result = {error, already_added},
|
||||||
|
{Result, State};
|
||||||
|
false ->
|
||||||
|
do_add_route2(Sock, Route, Side, State)
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
|
||||||
|
{ok, Router} = msp_router:start_link(),
|
||||||
|
NewConn = #route_info{router = Router,
|
||||||
|
socket = Sock,
|
||||||
|
side = Side,
|
||||||
|
unused_channels = lists:seq(0, 255),
|
||||||
|
used_channels = #{}},
|
||||||
|
NewConns = maps:put(Route, NewConn, Conns),
|
||||||
|
NewState = State#s{connections = NewConns},
|
||||||
|
{ok, NewState}.
|
||||||
|
|
||||||
|
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
|
||||||
|
{ok, OurPort} = inet:port(Sock),
|
||||||
|
Route = #route{from = OurPort, to = Peer},
|
||||||
|
% Check that this route is actually registered.
|
||||||
case maps:find(Route, Conns) of
|
case maps:find(Route, Conns) of
|
||||||
{ok, _Info} ->
|
{ok, Conn} ->
|
||||||
io:format("Updating routes not yet implemented.~n", []),
|
do_socket_transferred2(Sock, Peer, Conn);
|
||||||
init:stop();
|
|
||||||
error ->
|
error ->
|
||||||
NewConn = #route_info{conn_pid = Conn,
|
{error, unknown_connection}
|
||||||
socket = Sock,
|
end.
|
||||||
unused_channels = lists:seq(0, 255),
|
|
||||||
used_channels = #{}},
|
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
|
||||||
NewConns = maps:put(Route, NewConn, Conns),
|
->
|
||||||
State#s{connections = NewConns}
|
msp_router:begin_routing(Router, Sock, Peer),
|
||||||
|
channel_catchup(Router, maps:iterator(Channels));
|
||||||
|
do_socket_transferred2(_, _, _) ->
|
||||||
|
{error, wrong_socket}.
|
||||||
|
|
||||||
|
% If we have created channels without spawning the router process, then at some
|
||||||
|
% point we will need to tell the router process what channels currently exist.
|
||||||
|
channel_catchup(Router, Iter) ->
|
||||||
|
case maps:next(Iter) of
|
||||||
|
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
|
||||||
|
msp_router:add_channel(Router, ChanID, Chan),
|
||||||
|
channel_catchup(Router, NewIter);
|
||||||
|
none ->
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_create_channel(State = #s{connections = Conns}, Route) ->
|
do_create_channel(State = #s{connections = Conns}, Route) ->
|
||||||
@@ -194,13 +279,23 @@ do_create_channel3(State, Route, Info, ChanID) ->
|
|||||||
NewConns = maps:put(Route, NewInfo, State#s.connections),
|
NewConns = maps:put(Route, NewInfo, State#s.connections),
|
||||||
NewState = State#s{connections = NewConns},
|
NewState = State#s{connections = NewConns},
|
||||||
|
|
||||||
|
% Also add it to the router!
|
||||||
|
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
|
||||||
|
|
||||||
{Result, NewState}.
|
{Result, NewState}.
|
||||||
|
|
||||||
do_dispatch(Route, ID, Packet, State) ->
|
do_dispatch(Socket, Peer, ID, Packet, State) ->
|
||||||
|
{ok, OurPort} = inet:port(Socket),
|
||||||
|
Route = #route{from = OurPort, to = Peer},
|
||||||
|
do_dispatch2(Route, ID, Packet, State).
|
||||||
|
|
||||||
|
do_dispatch2(Route, ID, Packet, State) ->
|
||||||
case maps:find(Route, State#s.connections) of
|
case maps:find(Route, State#s.connections) of
|
||||||
{ok, Info} ->
|
{ok, Info} ->
|
||||||
do_dispatch2(Route, ID, Packet, State, Info);
|
do_dispatch2(Route, ID, Packet, State, Info);
|
||||||
error ->
|
error ->
|
||||||
|
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
|
||||||
|
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
|
||||||
State
|
State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
%%% @doc
|
||||||
|
%%% Minimal Stream Protocol : Channel Worker Supervisor
|
||||||
|
%%% @end
|
||||||
|
|
||||||
|
-module(msp_channel_sup).
|
||||||
|
-vsn("0.1.0").
|
||||||
|
-behaviour(supervisor).
|
||||||
|
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||||
|
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||||
|
-license("MIT").
|
||||||
|
|
||||||
|
|
||||||
|
-export([start_channel/0]).
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
|
||||||
|
-spec start_channel() -> Result
|
||||||
|
when Result :: {ok, pid()}
|
||||||
|
| {error, Reason},
|
||||||
|
Reason :: {already_started, pid()}
|
||||||
|
| {shutdown, term()}
|
||||||
|
| term().
|
||||||
|
|
||||||
|
start_channel() ->
|
||||||
|
supervisor:start_child(?MODULE, []).
|
||||||
|
|
||||||
|
|
||||||
|
-spec start_link() -> {ok, pid()}.
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
|
||||||
|
|
||||||
|
|
||||||
|
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
||||||
|
%% @private
|
||||||
|
%% The OTP init/1 function.
|
||||||
|
|
||||||
|
init(none) ->
|
||||||
|
RestartStrategy = {simple_one_for_one, 1, 60},
|
||||||
|
Channel =
|
||||||
|
{msp_channel,
|
||||||
|
{msp_channel, start_link, []},
|
||||||
|
temporary,
|
||||||
|
brutal_kill,
|
||||||
|
worker,
|
||||||
|
[msp_channel]},
|
||||||
|
{ok, {RestartStrategy, [Channel]}}.
|
||||||
@@ -2,14 +2,14 @@
|
|||||||
%%% Minimal Stream Protocol: Connection Worker
|
%%% Minimal Stream Protocol: Connection Worker
|
||||||
%%% @end
|
%%% @end
|
||||||
|
|
||||||
-module(msp_connection).
|
-module(msp_router).
|
||||||
-vsn("0.1.0").
|
-vsn("0.1.0").
|
||||||
-behavior(gen_server).
|
-behavior(gen_server).
|
||||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||||
-license("MIT").
|
-license("MIT").
|
||||||
|
|
||||||
-export([begin_msp/5]).
|
-export([begin_routing/3, add_channel/3]).
|
||||||
|
|
||||||
%% gen_server
|
%% gen_server
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
@@ -28,7 +28,6 @@
|
|||||||
-record(s,
|
-record(s,
|
||||||
{socket :: gen_udp:socket(),
|
{socket :: gen_udp:socket(),
|
||||||
peer :: endpoint(),
|
peer :: endpoint(),
|
||||||
side :: 0 | 1,
|
|
||||||
connections = #{} :: #{integer() => channel()}}).
|
connections = #{} :: #{integer() => channel()}}).
|
||||||
|
|
||||||
-type state() :: none | #s{}.
|
-type state() :: none | #s{}.
|
||||||
@@ -37,18 +36,21 @@
|
|||||||
|
|
||||||
% msp does not negotiate new connections, but once you have a
|
% msp does not negotiate new connections, but once you have a
|
||||||
% socket, and a host that knows you will be talking MSP, then a
|
% socket, and a host that knows you will be talking MSP, then a
|
||||||
% new msp_connection can be initialized.
|
% new msp_router can be initialized.
|
||||||
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
|
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
|
||||||
% Transfer the socket to the gen_server. If it is
|
% Transfer the socket to the gen_server. If it is
|
||||||
% active then a bunch of messages will be received
|
% active then a bunch of messages will be received
|
||||||
% at once, but that is fine.
|
% at once, but that is fine.
|
||||||
case gen_udp:controlling_process(OurSock, Connection) of
|
case gen_udp:controlling_process(OurSock, Router) of
|
||||||
ok ->
|
ok ->
|
||||||
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
|
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
add_channel(Router, ChanID, ChanPID) ->
|
||||||
|
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
|
||||||
|
|
||||||
%%% gen_server
|
%%% gen_server
|
||||||
|
|
||||||
-spec start_link() -> Result
|
-spec start_link() -> Result
|
||||||
@@ -59,29 +61,36 @@ start_link() ->
|
|||||||
gen_server:start_link(?MODULE, none, []).
|
gen_server:start_link(?MODULE, none, []).
|
||||||
|
|
||||||
|
|
||||||
% TODO: Ask msp_connection_man for the socket and endpoint
|
% TODO: SWP? SWP of SWPs?
|
||||||
init(none) ->
|
init(none) ->
|
||||||
{ok, none}.
|
{ok, none}.
|
||||||
|
|
||||||
|
|
||||||
|
handle_call({begin_routing, Sock, Peer}, _, none) ->
|
||||||
|
{Result, State} = do_begin_routing(Sock, Peer),
|
||||||
|
{reply, Result, State};
|
||||||
|
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
|
||||||
|
Result = {error, already_routing},
|
||||||
|
{reply, Result, State};
|
||||||
handle_call(Unexpected, From, State) ->
|
handle_call(Unexpected, From, State) ->
|
||||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||||
{reply, undef, State}.
|
{reply, undef, State}.
|
||||||
|
|
||||||
|
|
||||||
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
|
handle_cast({add_channel, ChanID, ChanPID}, State) ->
|
||||||
State = do_begin_msp(Sock, Peer, Side),
|
NewState = do_add_channel(ChanID, ChanPID, State),
|
||||||
{noreply, State};
|
{noreply, NewState};
|
||||||
handle_cast(Unexpected, State) ->
|
handle_cast(Unexpected, State) ->
|
||||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
||||||
|
log(debug, "Got packet: ~p~n", [Packet]),
|
||||||
do_dispatch(State, Packet),
|
do_dispatch(State, Packet),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info(Unexpected, State) ->
|
handle_info(Unexpected, State) ->
|
||||||
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
|
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
@@ -104,12 +113,17 @@ terminate(_, _) ->
|
|||||||
|
|
||||||
%%% Doer Functions
|
%%% Doer Functions
|
||||||
|
|
||||||
do_begin_msp(Sock, Peer, Side) ->
|
do_begin_routing(Sock, Peer) ->
|
||||||
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
|
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
|
||||||
State = #s{socket = Sock,
|
State = #s{socket = Sock, peer = Peer},
|
||||||
peer = Peer,
|
{ok, State}.
|
||||||
side = Side},
|
|
||||||
State.
|
do_add_channel(_, _, none) ->
|
||||||
|
% Do nothing... We will need to be told about these channels again later.
|
||||||
|
none;
|
||||||
|
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
|
||||||
|
NewConns = maps:put(ChanID, ChanPID, Conns),
|
||||||
|
State#s{connections = NewConns}.
|
||||||
|
|
||||||
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
|
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
|
||||||
ok = inet:setopts(Sock, [{active, once}]),
|
ok = inet:setopts(Sock, [{active, once}]),
|
||||||
@@ -117,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
|
|||||||
{ok, Conn} ->
|
{ok, Conn} ->
|
||||||
erlang:send(Conn, {msp_fragment, Packet});
|
erlang:send(Conn, {msp_fragment, Packet});
|
||||||
error ->
|
error ->
|
||||||
{ok, OurPort} = inet:port(Sock),
|
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
|
||||||
{TheirIP, TheirPort} = Peer,
|
|
||||||
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
|
|
||||||
end;
|
end;
|
||||||
do_dispatch(_, <<>>) ->
|
do_dispatch(_, <<>>) ->
|
||||||
% Empty datagram?
|
% Empty datagram?
|
||||||
+61
-16
@@ -6,6 +6,9 @@ spawn_tests() ->
|
|||||||
ok.
|
ok.
|
||||||
|
|
||||||
run_tests_and_halt() ->
|
run_tests_and_halt() ->
|
||||||
|
% Add a handler to print all log messages to the screen, so we can see why
|
||||||
|
% tests are failing, if they are.
|
||||||
|
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
|
||||||
Result = run_tests_protected(),
|
Result = run_tests_protected(),
|
||||||
io:format("Tests returned ~p~n", [Result]),
|
io:format("Tests returned ~p~n", [Result]),
|
||||||
halt().
|
halt().
|
||||||
@@ -19,31 +22,73 @@ run_tests_protected() ->
|
|||||||
|
|
||||||
run_tests() ->
|
run_tests() ->
|
||||||
ok = send_test(),
|
ok = send_test(),
|
||||||
|
ok = out_of_order_test(),
|
||||||
|
ok = race_test(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
make_connection(OurPort, TheirIP, TheirPort, Side) ->
|
make_connection(OurPort, TheirIP, TheirPort, Side) ->
|
||||||
{ok, Sock} = gen_udp:open(OurPort),
|
{ok, Sock} = gen_udp:open(OurPort),
|
||||||
{ok, Pid} = msp_connection:start_link(),
|
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
|
||||||
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
|
|
||||||
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
|
|
||||||
{Pid, Sock}.
|
|
||||||
|
|
||||||
send_test() ->
|
send_test() ->
|
||||||
IP = {127, 0, 0, 1},
|
IP = {127, 0, 0, 1},
|
||||||
PortA = 5555,
|
PortA = 5511,
|
||||||
PortB = 6666,
|
PortB = 5512,
|
||||||
{A, SockA} = make_connection(PortA, IP, PortB, 0),
|
ok = make_connection(PortA, IP, PortB, 0),
|
||||||
{B, SockB} = make_connection(PortB, IP, PortA, 1),
|
ok = make_connection(PortB, IP, PortA, 1),
|
||||||
|
|
||||||
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
||||||
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
|
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||||
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
|
|
||||||
timer:sleep(10),
|
MessageA = <<"message sent from A to B">>,
|
||||||
|
msp_channel:send_and_close(ChanA, MessageA),
|
||||||
|
io:format("A has tried to send a message to B~n"),
|
||||||
|
{ok, MessageA} = msp_channel:receive_message(ChanB),
|
||||||
|
io:format("received successfully~n"),
|
||||||
|
|
||||||
|
MessageB = <<"message sent from B to A">>,
|
||||||
|
msp_channel:send_and_close(ChanB, MessageB),
|
||||||
|
io:format("B has tried to send a message to A~n"),
|
||||||
|
{ok, MessageB} = msp_channel:receive_message(ChanA),
|
||||||
|
io:format("received successfully~n"),
|
||||||
|
|
||||||
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
|
||||||
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
|
|
||||||
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
|
|
||||||
timer:sleep(10),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
out_of_order_test() ->
|
||||||
|
IP = {127, 0, 0, 1},
|
||||||
|
PortA = 5521,
|
||||||
|
PortB = 5522,
|
||||||
|
{ok, SockA} = gen_udp:open(PortA),
|
||||||
|
ok = make_connection(PortB, IP, PortA, 0),
|
||||||
|
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
||||||
|
|
||||||
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
|
||||||
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
|
||||||
|
|
||||||
|
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
|
||||||
|
io:format("Received out of order fragments successfully.~n"),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
race_test() ->
|
||||||
|
IP = {127, 0, 0, 1},
|
||||||
|
PortA = 5531,
|
||||||
|
PortB = 5532,
|
||||||
|
{ok, SockA} = gen_udp:open(PortA),
|
||||||
|
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
||||||
|
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
|
||||||
|
|
||||||
|
timer:sleep(10),
|
||||||
|
|
||||||
|
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
|
||||||
|
% which needs to be properly detected and handled by the socket transfer
|
||||||
|
% functions.
|
||||||
|
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
|
||||||
|
|
||||||
|
% TODO We could test that the datagrams were dispatched properly by
|
||||||
|
% receiving on the channel, but for now we just manually check the
|
||||||
|
% logs/console output for msp_channel_man errors.
|
||||||
|
timer:sleep(10),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user