Compare commits

..

8 Commits

Author SHA1 Message Date
Jarvis Carroll 39a75ca435 Receive data on the other end.
I added a bunch of receipt interfaces, but I only test the syncronous
message receipt at the moment, not fragments or async.

I'm not sure whether 'subscribe_message' even makes sense as a concept,
but there it is.
2025-10-31 11:35:05 +00:00
Jarvis Carroll 23276662bb simplify opcodes
OPEN is not really relevant. And a lot of the stream negotiation stuff isn't that useful either. Gone.
2025-10-30 05:52:33 +00:00
Jarvis Carroll 78ea3fc85e handle socket race conditions better
Now the channle manager takes full responsibility for dispatch as
soon as it is initialised, and only passes that responsibility on
to the router process once things have been cleaned up.

What a complex refactor just to pass a socket to a new process!
2025-10-29 11:56:44 +00:00
Jarvis Carroll e8ef0b4304 buffer out-of-order messages
This took a while to debug!! Turns out I don't actually
handle datagrams that have been received before the
workers are all initialized... oops!!
2025-10-27 07:31:27 +00:00
Jarvis Carroll b6c16967e7 notify router of new channels
This way the second message received on a channel should be
routed by the router without involving the fallback.

Lolspeed achieved.
2025-10-26 11:23:34 +00:00
Jarvis Carroll 47612c2775 have channel_man spawn its own router
Once I added dispatch_fallback to channel_man, it became clear that
the router is actually subordinate to the channel_man.
2025-10-26 11:14:09 +00:00
Jarvis Carroll cfd9eb748f rename Connection to Router
Connection is more of a supervision concept than a server concept.
I should have a SWP of SWPs too, but I can't be bothered just yet.
2025-10-26 10:46:27 +00:00
Jarvis Carroll 669b2e51e1 actually add msp_channel_sup.erl
oops
2025-10-26 10:39:08 +00:00
5 changed files with 538 additions and 152 deletions
+276 -90
View File
@@ -10,7 +10,13 @@
-license("MIT").
-export([configure_channel/4]).
-export([send_and_close/2]).
-export([send_and_close/2, send_message/2]).
% A lot of interfaces, but the last two are only really written for
% completeness. The distinctions here are blocking vs subscribing, and
% fragments vs messages.
-export([receive_fragment/1, receive_message/1,
subscribe_fragment/1, subscribe_message/1,
subscribe_fragment_once/1, subscribe_message_once/1]).
%% gen_server
-export([start_link/0]).
@@ -24,30 +30,21 @@
% Fragment/message opcodes:
% _____________________________________________________________________________
% OP_FRAGMENT | Part of a message/stream
% OP_OPEN | Open a new channel, and send the first fragment of the
% | first message in it.
% OP_END_MESSAGE | End of a message/stream, expects a response
% OP_OPEN_END | Open a new channel, and send an entire message
% OP_CLOSE | End of a message/stream, do not respond, close the channel
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
% | channel, all in one datagram. Sort of like
% | gen_server:cast/2, or if UDP were reliable.
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
% | the stream goes idle, or for any other
% | implementation-specific needs.
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
% | side to reply on this channel.
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
%
% Transport Control Stuff
% -----------------------------------------------------------------------------
% OP_ACK | Acknowledge that instructions up to N have been received.
% OP_STATUS | Request an ACK for this exact command. Has many uses -
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
% | to stop the receiver from coalescing ACKs too much.
% OP_NACK | Instruction N has not been received.
% OP_CANCEL | Sender is hoping that messages M through N weren't
% | delivered, and can be cancelled.
% OP_ACK_CANCEL | Respond to OP_CANCEL.
%
% Stream Negotiation
% _____________________________________________________________________________
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
% | should be moved to a different ID.
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
% | end the stream now. The channel stays open so that the
% | other side can communicate what they did/didn't | receive,
% | and what actions were taken, if the application requires.
%
% Lossy
% _____________________________________________________________________________
@@ -56,31 +53,43 @@
% OP_LOSSY | A lossy datagram, for the application to deal with.
% Numbers aren't final.
-define(OP_FRAGMENT, 0).
-define(OP_OPEN, 1).
-define(OP_END_MESSAGE, 2).
-define(OP_OPEN_END, 3).
-define(OP_CLOSE, 4).
-define(OP_OPEN_CLOSE, 5).
-define(OP_ACK, 6).
-define(OP_STATUS, 7).
-define(OP_NACK, 8).
-define(OP_CANCEL, 9).
-define(OP_ACK_CANCEL, 10).
-define(OP_REQ_MOVE, 11).
-define(OP_OPEN_DIRTY, 12).
-define(OP_LOSSY, 13).
-define(OP_FRAGMENT, 0).
-define(OP_END_MESSAGE, 1).
-define(OP_CLOSE, 2).
-define(OP_STATUS, 3).
-define(OP_ACK, 4).
-define(OP_NACK, 5).
-define(OP_CANCEL, 6).
-define(OP_OPEN_DIRTY, 7).
-define(OP_LOSSY, 8).
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(instruction,
{op :: integer(),
seq :: integer(),
data :: binary()}).
-type request_type() :: fragment | message.
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
-record(request,
{type :: request_type(),
recipient :: request_recipient()}).
-type request() :: #request{}.
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer()}).
seq = 0 :: integer(),
requests = {[], []} :: {[request()], [request()]},
subscriber = none :: none | request(),
instructions = [] :: [#instruction{}],
fragments = {[], []} :: {[binary()], [binary()]}}).
-type state() :: none | #s{}.
@@ -95,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) ->
send_and_close(Chan, Message) ->
gen_server:call(Chan, {send, Message, close}).
send_message(Chan, Message) ->
gen_server:call(Chan, {send, Message, end_message}).
receive_fragment(Chan) ->
gen_server:call(Chan, {add_sync_request, fragment}).
receive_message(Chan) ->
gen_server:call(Chan, {add_sync_request, message}).
subscribe_fragment(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, fragment, Pid}).
subscribe_message(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, message, Pid}).
subscribe_fragment_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, fragment, Pid}).
subscribe_message_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, message, Pid}).
%%% gen_server
-spec start_link() -> Result
@@ -112,12 +146,22 @@ init(none) ->
handle_call({send, Message, Mod}, _, State) ->
{Result, NewState} = do_send(Message, Mod, State),
{reply, Result, NewState};
handle_call({add_sync_request, Type}, From, State) ->
NewState = do_add_sync_request(Type, From, State),
{noreply, NewState};
handle_call({add_async_request, Type, Pid}, _, State) ->
{Result, NewState} = do_add_async_request(Type, Pid, State),
{reply, Result, NewState};
handle_call({subscribe, Type, Pid}, _, State) ->
{Result, NewState} = do_subscribe(Type, Pid, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
log(debug, "Opening channel ~p~n", [Id]),
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
@@ -159,95 +203,237 @@ do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
do_send(_, _, State = #s{state = receiving}) ->
{{error, not_sending}, State};
do_send(Message, Mod, State = #s{chan_id = Id,
state = ChanMode,
peer = Peer,
socket = Sock,
seq = Seq}) ->
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
{Op, NewChanMode} = choose_opcode(Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
gen_udp:send(Sock, Peer, Datagram),
NewState = State#s{state = NewChanMode, seq = Seq + 1},
{ok, NewState}.
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
choose_opcode(true, none) -> {?OP_OPEN, sending};
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
choose_opcode(none) -> {?OP_FRAGMENT, sending};
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(close) -> {?OP_CLOSE, closed};
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Subscription Doer Functions
% Adds a request to the queue, or responds to that request directly, if there
% is a subscriber already.
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
do_add_request(Type, {sync, From}, State);
do_add_sync_request(_, From, State) ->
Result = {error, already_subscribed},
gen_server:reply(From, Result),
State.
% Adds a request to the queue and returns ok, or returns an error if there is a
% subscriber already.
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
NewState = do_add_request(Type, {async, Pid}, State),
{ok, NewState};
do_add_async_request(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
Request = #request{type = Type, recipient = Recipient},
NewRequests = {[Request | In], Out},
StateWithRequest = State#s{requests = NewRequests},
try_fill_requests(StateWithRequest).
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
Request = #request{type = Type, recipient = {async, Pid}},
StateWithSub = State#s{subscriber = Request},
NewState = try_fill_requests(StateWithSub),
{ok, NewState};
do_subscribe(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
do_handle_ack(State, Data);
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
do_handle_nack(State, Data);
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
do_handle_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
do_handle_reliable(State, Op, Seq, Rest);
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_ack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_nack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
%%% Reliable message reordering
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
State#s{instructions = NewInstructions}.
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
% We need to go later than this. Move it to the 'earlier than us' pile.
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
% There is already an instruction with this sequence ID. Use that.
lists:reverse(Earlier, Is);
insert_instruction(Op, Seq, Data, Is, Earlier) ->
% If neither of the above conditions are met, we can add it here.
NewI = #instruction{op = Op, seq = Seq, data = Data},
lists:reverse(Earlier, [NewI | Is]).
%%% Reliable message execution
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
% If we are closed, then anything we receive opens the channel.
StateOpened = State#s{state = receiving},
do_execute_buffer(StateOpened);
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% We've finished consuming instructions; now see if we have any requests we
% can fill.
try_fill_requests(State).
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
do_handle_fragment(State, true, continue, Seq, Payload);
do_handle_fragment(State, continue, Seq, Payload);
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
do_handle_fragment(State, false, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
do_handle_fragment(State, true, end_message, Seq, Payload);
do_handle_fragment(State, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_fragment(State, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
do_handle_fragment(State, true, dirty, Seq, Payload);
do_handle_fragment(State, dirty, Seq, Payload);
do_handle_reliable(State, Unknown, _, _) ->
io:format("Got unexpected opcode ~p~n", [Unknown]),
State.
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
send_ack(State, Seq),
do_handle_fragment(State, continue, Seq, Payload).
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
% TODO: backpressure
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_fragment(State, Mod, _Seq, Payload) ->
WithFragment = push_fragment(State, Payload),
update_chan_mode(WithFragment, Mod).
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
State#s{fragments = {[Payload | In], Out}}.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
update_chan_mode(State, continue) ->
State#s{state = receiving};
update_chan_mode(State, end_message) ->
State#s{state = sending};
update_chan_mode(State, close_channel) ->
% TODO: Properly clean up subscriptions and requests and things, if the
% channel closes... Also if the channel switches to sending, I guess a
% fragment subscriber should be notified?
State#s{state = closed};
update_chan_mode(State, dirty) ->
% TODO: create and then quarantine dirty channels
State#s{state = receiving}.
try_fill_requests(State = #s{requests = Requests}) ->
case pop_queue(Requests) of
{ok, {Remaining, Request}} ->
try_fill_requests2(State, Remaining, Request);
error ->
% No requests, try subscriptions instead.
try_fill_subscription(State)
end.
try_fill_requests2(State, RequestsRemaining, Request) ->
Without = State#s{requests = RequestsRemaining},
case try_fill_request(Without, Request) of
{ok, NewState} ->
% Done successfully! Start the loop again with the new state.
try_fill_requests(NewState);
error ->
% The request didn't get filled... Just revert.
State
end.
try_fill_subscription(State = #s{subscriber = none}) ->
% No subscription either! Do nothing.
State;
try_fill_subscription(State = #s{subscriber = Sub}) ->
% A subscription! Try to fill it as many times as possible.
try_fill_subscription_loop(State, Sub).
try_fill_subscription_loop(State, Sub) ->
case try_fill_request(State, Sub) of
{ok, NewState} ->
try_fill_subscription_loop(NewState, Sub);
error ->
State
end.
try_fill_request(#s{state = receiving}, #request{type = message}) ->
% Message not finished yet.
error;
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
% No fragments have been received, not even empty fragments. This
% channel/message hasn't even been opened yet.
error;
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
Fragments = Out ++ lists:reverse(In),
Message = list_to_binary(Fragments),
send_result(Recip, Message),
NewState = State#s{fragments = {[], []}},
{ok, NewState};
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
case pop_queue(Fragments) of
{ok, {NewFragments, Fragment}} ->
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
send_result(Recip, Fragment),
NewState = State#s{fragments = NewFragments},
{ok, NewState};
error ->
State
end.
send_result({async, Pid}, Result) ->
erlang:send(Pid, {msp_fragment, self(), Result});
send_result({sync, From}, Result) ->
gen_server:reply(From, {ok, Result}).
pop_queue({[], []}) ->
error;
pop_queue({In, [Next | Out]}) ->
NewFragments = {In, Out},
{ok, {NewFragments, Next}};
pop_queue({In, []}) ->
Out = lists:reverse(In),
pop_queue({[], Out}).
+121 -26
View File
@@ -9,7 +9,7 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
%% Worker interface
-export([enroll/0]).
%% gen_server
@@ -29,8 +29,9 @@
{chan_pid :: pid()}).
-record(route_info,
{conn_pid :: pid(),
{router :: pid(),
socket :: gen_udp:socket(),
side :: 0 | 1,
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
@@ -41,19 +42,49 @@
%%% Service Interface
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
add_route(Sock, TheirIP, TheirPort, Side) ->
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
ok ->
transfer_socket(Sock, TheirIP, TheirPort);
{error, Reason} ->
{error, Reason}
end.
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
% A pretty complex function for an interface, but we need to transfer ownership
% of the socket.
transfer_socket(Sock, TheirIP, TheirPort) ->
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
ok ->
transfer_socket2(Sock, {TheirIP, TheirPort});
{error, Reason} ->
{error, Reason}
end.
transfer_socket2(Sock, Peer) ->
% Since it was transferred, any backlog of datagrams will now be in the
% channel manager's inbox. Turn it off from active, so that the manager
% knows that it will be fully caught up *before* the socket_transferred
% notification arrives.
case inet:setopts(Sock, [{active, false}]) of
ok ->
% Now the socket is all cleaned up, let the manager know.
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
{error, Reason} ->
{error, Reason}
end.
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_connection, if it doesn't know where to
% send something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
% Deliver messages on behalf of msp_router, if it doesn't know where to send
% something.
dispatch_fallback(Socket, Peer, ID, Packet) ->
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
%%% Worker Interface
@@ -81,6 +112,12 @@ init(none) ->
{ok, State}.
handle_call({add_route, Sock, Route, Side}, _, State) ->
{Result, NewState} = do_add_route(Sock, Route, Side, State),
{reply, Result, NewState};
handle_call({socket_transferred, Sock, Peer}, _, State) ->
Result = do_socket_transferred(Sock, Peer, State),
{reply, Result, State};
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
@@ -89,11 +126,8 @@ handle_call(Unexpected, From, State) ->
{noreply, State}.
handle_cast({update_router, Route, Conn, Sock}, State) ->
NewState = do_update_router(Route, Conn, Sock, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
@@ -106,10 +140,23 @@ handle_cast(Unexpected, State) ->
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(Socket, Peer, [ID | PacketList], State) ->
Packet = list_to_binary(PacketList),
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(_, _, <<>>, State) ->
% Empty datagram... Ignore.
State;
handle_udp(_, _, [], State) ->
State.
handle_down(Mon, PID, Reason, State) ->
% TODO
@@ -149,18 +196,56 @@ do_enroll2(PID, State, Channels) ->
State
end.
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route isn't registered already...
case maps:is_key(Route, Conns) of
true ->
Result = {error, already_added},
{Result, State};
false ->
do_add_route2(Sock, Route, Side, State)
end.
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
{ok, Router} = msp_router:start_link(),
NewConn = #route_info{router = Router,
socket = Sock,
side = Side,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
NewState = State#s{connections = NewConns},
{ok, NewState}.
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route is actually registered.
case maps:find(Route, Conns) of
{ok, _Info} ->
io:format("Updating routes not yet implemented.~n", []),
init:stop();
{ok, Conn} ->
do_socket_transferred2(Sock, Peer, Conn);
error ->
NewConn = #route_info{conn_pid = Conn,
socket = Sock,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}
{error, unknown_connection}
end.
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
->
msp_router:begin_routing(Router, Sock, Peer),
channel_catchup(Router, maps:iterator(Channels));
do_socket_transferred2(_, _, _) ->
{error, wrong_socket}.
% If we have created channels without spawning the router process, then at some
% point we will need to tell the router process what channels currently exist.
channel_catchup(Router, Iter) ->
case maps:next(Iter) of
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
msp_router:add_channel(Router, ChanID, Chan),
channel_catchup(Router, NewIter);
none ->
ok
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
@@ -194,13 +279,23 @@ do_create_channel3(State, Route, Info, ChanID) ->
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
% Also add it to the router!
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
{Result, NewState}.
do_dispatch(Route, ID, Packet, State) ->
do_dispatch(Socket, Peer, ID, Packet, State) ->
{ok, OurPort} = inet:port(Socket),
Route = #route{from = OurPort, to = Peer},
do_dispatch2(Route, ID, Packet, State).
do_dispatch2(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
State
end.
+48
View File
@@ -0,0 +1,48 @@
%%% @doc
%%% Minimal Stream Protocol : Channel Worker Supervisor
%%% @end
-module(msp_channel_sup).
-vsn("0.1.0").
-behaviour(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_channel/0]).
-export([start_link/0]).
-export([init/1]).
-spec start_channel() -> Result
when Result :: {ok, pid()}
| {error, Reason},
Reason :: {already_started, pid()}
| {shutdown, term()}
| term().
start_channel() ->
supervisor:start_child(?MODULE, []).
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {simple_one_for_one, 1, 60},
Channel =
{msp_channel,
{msp_channel, start_link, []},
temporary,
brutal_kill,
worker,
[msp_channel]},
{ok, {RestartStrategy, [Channel]}}.
+32 -20
View File
@@ -2,14 +2,14 @@
%%% Minimal Stream Protocol: Connection Worker
%%% @end
-module(msp_connection).
-module(msp_router).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([begin_msp/5]).
-export([begin_routing/3, add_channel/3]).
%% gen_server
-export([start_link/0]).
@@ -28,7 +28,6 @@
-record(s,
{socket :: gen_udp:socket(),
peer :: endpoint(),
side :: 0 | 1,
connections = #{} :: #{integer() => channel()}}).
-type state() :: none | #s{}.
@@ -37,18 +36,21 @@
% msp does not negotiate new connections, but once you have a
% socket, and a host that knows you will be talking MSP, then a
% new msp_connection can be initialized.
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
% new msp_router can be initialized.
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
% Transfer the socket to the gen_server. If it is
% active then a bunch of messages will be received
% at once, but that is fine.
case gen_udp:controlling_process(OurSock, Connection) of
case gen_udp:controlling_process(OurSock, Router) of
ok ->
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
{error, Reason} ->
{error, Reason}
end.
add_channel(Router, ChanID, ChanPID) ->
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
%%% gen_server
-spec start_link() -> Result
@@ -59,29 +61,36 @@ start_link() ->
gen_server:start_link(?MODULE, none, []).
% TODO: Ask msp_connection_man for the socket and endpoint
% TODO: SWP? SWP of SWPs?
init(none) ->
{ok, none}.
handle_call({begin_routing, Sock, Peer}, _, none) ->
{Result, State} = do_begin_routing(Sock, Peer),
{reply, Result, State};
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
Result = {error, already_routing},
{reply, Result, State};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{reply, undef, State}.
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
State = do_begin_msp(Sock, Peer, Side),
{noreply, State};
handle_cast({add_channel, ChanID, ChanPID}, State) ->
NewState = do_add_channel(ChanID, ChanPID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
log(debug, "Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
{noreply, State}.
@@ -104,12 +113,17 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_msp(Sock, Peer, Side) ->
do_begin_routing(Sock, Peer) ->
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
State = #s{socket = Sock, peer = Peer},
{ok, State}.
do_add_channel(_, _, none) ->
% Do nothing... We will need to be told about these channels again later.
none;
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
NewConns = maps:put(ChanID, ChanPID, Conns),
State#s{connections = NewConns}.
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
@@ -117,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?
+61 -16
View File
@@ -6,6 +6,9 @@ spawn_tests() ->
ok.
run_tests_and_halt() ->
% Add a handler to print all log messages to the screen, so we can see why
% tests are failing, if they are.
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
Result = run_tests_protected(),
io:format("Tests returned ~p~n", [Result]),
halt().
@@ -19,31 +22,73 @@ run_tests_protected() ->
run_tests() ->
ok = send_test(),
ok = out_of_order_test(),
ok = race_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
{ok, Pid} = msp_connection:start_link(),
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
{Pid, Sock}.
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5555,
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
PortA = 5511,
PortB = 5512,
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
timer:sleep(10),
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
MessageA = <<"message sent from A to B">>,
msp_channel:send_and_close(ChanA, MessageA),
io:format("A has tried to send a message to B~n"),
{ok, MessageA} = msp_channel:receive_message(ChanB),
io:format("received successfully~n"),
MessageB = <<"message sent from B to A">>,
msp_channel:send_and_close(ChanB, MessageB),
io:format("B has tried to send a message to A~n"),
{ok, MessageB} = msp_channel:receive_message(ChanA),
io:format("received successfully~n"),
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
timer:sleep(10),
ok.
out_of_order_test() ->
IP = {127, 0, 0, 1},
PortA = 5521,
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
io:format("Received out of order fragments successfully.~n"),
ok.
race_test() ->
IP = {127, 0, 0, 1},
PortA = 5531,
PortB = 5532,
{ok, SockA} = gen_udp:open(PortA),
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
timer:sleep(10),
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
% which needs to be properly detected and handled by the socket transfer
% functions.
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
% TODO We could test that the datagrams were dispatched properly by
% receiving on the channel, but for now we just manually check the
% logs/console output for msp_channel_man errors.
timer:sleep(10),
ok.