Compare commits

...

12 Commits

Author SHA1 Message Date
Jarvis Carroll 39a75ca435 Receive data on the other end.
I added a bunch of receipt interfaces, but I only test the syncronous
message receipt at the moment, not fragments or async.

I'm not sure whether 'subscribe_message' even makes sense as a concept,
but there it is.
2025-10-31 11:35:05 +00:00
Jarvis Carroll 23276662bb simplify opcodes
OPEN is not really relevant. And a lot of the stream negotiation stuff isn't that useful either. Gone.
2025-10-30 05:52:33 +00:00
Jarvis Carroll 78ea3fc85e handle socket race conditions better
Now the channle manager takes full responsibility for dispatch as
soon as it is initialised, and only passes that responsibility on
to the router process once things have been cleaned up.

What a complex refactor just to pass a socket to a new process!
2025-10-29 11:56:44 +00:00
Jarvis Carroll e8ef0b4304 buffer out-of-order messages
This took a while to debug!! Turns out I don't actually
handle datagrams that have been received before the
workers are all initialized... oops!!
2025-10-27 07:31:27 +00:00
Jarvis Carroll b6c16967e7 notify router of new channels
This way the second message received on a channel should be
routed by the router without involving the fallback.

Lolspeed achieved.
2025-10-26 11:23:34 +00:00
Jarvis Carroll 47612c2775 have channel_man spawn its own router
Once I added dispatch_fallback to channel_man, it became clear that
the router is actually subordinate to the channel_man.
2025-10-26 11:14:09 +00:00
Jarvis Carroll cfd9eb748f rename Connection to Router
Connection is more of a supervision concept than a server concept.
I should have a SWP of SWPs too, but I can't be bothered just yet.
2025-10-26 10:46:27 +00:00
Jarvis Carroll 669b2e51e1 actually add msp_channel_sup.erl
oops
2025-10-26 10:39:08 +00:00
Jarvis Carroll 0ccd64e2f1 some kind of channel allocation
Now sending messages results in some basic channel state
tracking, but receiving doesn't do any of that yet.
2025-10-26 05:01:44 +00:00
Jarvis Carroll 7503463ff2 message dispatch stub
This is probably about as far as I can go without actual
manager processes... I need something that knows what
channels are in use!!
2025-10-26 00:25:01 +00:00
Jarvis Carroll 587fa1710c retab
Still setting this computer up...
2025-10-24 12:13:57 +00:00
Jarvis Carroll 4d7dcf160c UDP receipt 2025-10-24 12:06:29 +00:00
9 changed files with 1120 additions and 20 deletions
+12 -2
View File
@@ -9,8 +9,18 @@ start() -> application:start(hakuzaru).
stop() -> application:stop(hakuzaru).
start(normal, _Args) ->
msp_sup:start_link().
{ok, Sup} = msp_sup:start_link(),
case init:get_plain_arguments() of
[_] ->
io:format("MSP started as idle process.~n");
[_, "test"] ->
io:format("Running tests.~n", []),
msp_tests:spawn_tests();
[_ | CLArgs] ->
io:format("Unknown args ~p~n", [CLArgs])
end,
{ok, Sup}.
stop(_State) ->
ok.
ok.
+439
View File
@@ -0,0 +1,439 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker
%%% @end
-module(msp_channel).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([configure_channel/4]).
-export([send_and_close/2, send_message/2]).
% A lot of interfaces, but the last two are only really written for
% completeness. The distinctions here are blocking vs subscribing, and
% fragments vs messages.
-export([receive_fragment/1, receive_message/1,
subscribe_fragment/1, subscribe_message/1,
subscribe_fragment_once/1, subscribe_message_once/1]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Opcodes
% Fragment/message opcodes:
% _____________________________________________________________________________
% OP_FRAGMENT | Part of a message/stream
% OP_STATUS | Part of a message, but expects an immediate ACK, in case
% | the stream goes idle, or for any other
% | implementation-specific needs.
% OP_END_MESSAGE | Last/only fragment of a message/stream. Expects the other
% | side to reply on this channel.
% OP_CLOSE | Last/only fragment of a message/stream. Closes the channel.
%
% Transport Control Stuff
% -----------------------------------------------------------------------------
% OP_ACK | Acknowledge that instructions up to N have been received.
% OP_NACK | Instruction N has not been received.
% OP_CANCEL | Ignore any fragments that weren't already delivered, and
% | end the stream now. The channel stays open so that the
% | other side can communicate what they did/didn't | receive,
% | and what actions were taken, if the application requires.
%
% Lossy
% _____________________________________________________________________________
% OP_OPEN_DIRTY | Like OP_OPEN, but marks the channel as 'dirty', meaning
% | it is allowed to send lossy datagrams too, for voip, etc.
% OP_LOSSY | A lossy datagram, for the application to deal with.
% Numbers aren't final.
-define(OP_FRAGMENT, 0).
-define(OP_END_MESSAGE, 1).
-define(OP_CLOSE, 2).
-define(OP_STATUS, 3).
-define(OP_ACK, 4).
-define(OP_NACK, 5).
-define(OP_CANCEL, 6).
-define(OP_OPEN_DIRTY, 7).
-define(OP_LOSSY, 8).
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(instruction,
{op :: integer(),
seq :: integer(),
data :: binary()}).
-type request_type() :: fragment | message.
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
-record(request,
{type :: request_type(),
recipient :: request_recipient()}).
-type request() :: #request{}.
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer(),
requests = {[], []} :: {[request()], [request()]},
subscriber = none :: none | request(),
instructions = [] :: [#instruction{}],
fragments = {[], []} :: {[binary()], [binary()]}}).
-type state() :: none | #s{}.
%%% Interface
configure_channel(Pid, Id, Socket, Peer) ->
gen_server:cast(Pid, {configure_channel, Id, Socket, Peer}).
send_and_close(Chan, Message) ->
gen_server:call(Chan, {send, Message, close}).
send_message(Chan, Message) ->
gen_server:call(Chan, {send, Message, end_message}).
receive_fragment(Chan) ->
gen_server:call(Chan, {add_sync_request, fragment}).
receive_message(Chan) ->
gen_server:call(Chan, {add_sync_request, message}).
subscribe_fragment(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, fragment, Pid}).
subscribe_message(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, message, Pid}).
subscribe_fragment_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, fragment, Pid}).
subscribe_message_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, message, Pid}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link(?MODULE, none, []).
init(none) ->
{ok, none}.
handle_call({send, Message, Mod}, _, State) ->
{Result, NewState} = do_send(Message, Mod, State),
{reply, Result, NewState};
handle_call({add_sync_request, Type}, From, State) ->
NewState = do_add_sync_request(Type, From, State),
{noreply, NewState};
handle_call({add_async_request, Type, Pid}, _, State) ->
{Result, NewState} = do_add_async_request(Type, Pid, State),
{reply, Result, NewState};
handle_call({subscribe, Type, Pid}, _, State) ->
{Result, NewState} = do_subscribe(Type, Pid, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
log(debug, "Opening channel ~p~n", [Id]),
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
{noreply, State};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({msp_fragment, Packet}, State) ->
NewState = do_handle_datagram(State, Packet),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason}
when OldVersion :: Version | {old, Version},
Version :: term(),
State :: state(),
Extra :: term(),
NewState :: term(),
Reason :: term().
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Send Doer Functions
do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
{{error, already_open}, State};
do_send(_, _, State = #s{state = receiving}) ->
{{error, not_sending}, State};
do_send(Message, Mod, State = #s{chan_id = Id,
peer = Peer,
socket = Sock,
seq = Seq}) ->
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
{Op, NewChanMode} = choose_opcode(Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
gen_udp:send(Sock, Peer, Datagram),
NewState = State#s{state = NewChanMode, seq = Seq + 1},
{ok, NewState}.
choose_opcode(none) -> {?OP_FRAGMENT, sending};
choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(close) -> {?OP_CLOSE, closed};
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Subscription Doer Functions
% Adds a request to the queue, or responds to that request directly, if there
% is a subscriber already.
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
do_add_request(Type, {sync, From}, State);
do_add_sync_request(_, From, State) ->
Result = {error, already_subscribed},
gen_server:reply(From, Result),
State.
% Adds a request to the queue and returns ok, or returns an error if there is a
% subscriber already.
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
NewState = do_add_request(Type, {async, Pid}, State),
{ok, NewState};
do_add_async_request(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
Request = #request{type = Type, recipient = Recipient},
NewRequests = {[Request | In], Out},
StateWithRequest = State#s{requests = NewRequests},
try_fill_requests(StateWithRequest).
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
Request = #request{type = Type, recipient = {async, Pid}},
StateWithSub = State#s{subscriber = Request},
NewState = try_fill_requests(StateWithSub),
{ok, NewState};
do_subscribe(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
do_handle_ack(State, Data);
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
do_handle_nack(State, Data);
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
do_handle_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_datagram(State, Unexpected) ->
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_ack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_nack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
%%% Reliable message reordering
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
State#s{instructions = NewInstructions}.
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
% We need to go later than this. Move it to the 'earlier than us' pile.
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
% There is already an instruction with this sequence ID. Use that.
lists:reverse(Earlier, Is);
insert_instruction(Op, Seq, Data, Is, Earlier) ->
% If neither of the above conditions are met, we can add it here.
NewI = #instruction{op = Op, seq = Seq, data = Data},
lists:reverse(Earlier, [NewI | Is]).
%%% Reliable message execution
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
% If we are closed, then anything we receive opens the channel.
StateOpened = State#s{state = receiving},
do_execute_buffer(StateOpened);
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% We've finished consuming instructions; now see if we have any requests we
% can fill.
try_fill_requests(State).
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, continue, Seq, Payload);
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
do_handle_fragment(State, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
do_handle_fragment(State, dirty, Seq, Payload);
do_handle_reliable(State, Unknown, _, _) ->
io:format("Got unexpected opcode ~p~n", [Unknown]),
State.
do_handle_status(State, Seq, Payload) ->
send_ack(State, Seq),
do_handle_fragment(State, continue, Seq, Payload).
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
% TODO: backpressure
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
do_handle_fragment(State, Mod, _Seq, Payload) ->
WithFragment = push_fragment(State, Payload),
update_chan_mode(WithFragment, Mod).
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
State#s{fragments = {[Payload | In], Out}}.
update_chan_mode(State, continue) ->
State#s{state = receiving};
update_chan_mode(State, end_message) ->
State#s{state = sending};
update_chan_mode(State, close_channel) ->
% TODO: Properly clean up subscriptions and requests and things, if the
% channel closes... Also if the channel switches to sending, I guess a
% fragment subscriber should be notified?
State#s{state = closed};
update_chan_mode(State, dirty) ->
% TODO: create and then quarantine dirty channels
State#s{state = receiving}.
try_fill_requests(State = #s{requests = Requests}) ->
case pop_queue(Requests) of
{ok, {Remaining, Request}} ->
try_fill_requests2(State, Remaining, Request);
error ->
% No requests, try subscriptions instead.
try_fill_subscription(State)
end.
try_fill_requests2(State, RequestsRemaining, Request) ->
Without = State#s{requests = RequestsRemaining},
case try_fill_request(Without, Request) of
{ok, NewState} ->
% Done successfully! Start the loop again with the new state.
try_fill_requests(NewState);
error ->
% The request didn't get filled... Just revert.
State
end.
try_fill_subscription(State = #s{subscriber = none}) ->
% No subscription either! Do nothing.
State;
try_fill_subscription(State = #s{subscriber = Sub}) ->
% A subscription! Try to fill it as many times as possible.
try_fill_subscription_loop(State, Sub).
try_fill_subscription_loop(State, Sub) ->
case try_fill_request(State, Sub) of
{ok, NewState} ->
try_fill_subscription_loop(NewState, Sub);
error ->
State
end.
try_fill_request(#s{state = receiving}, #request{type = message}) ->
% Message not finished yet.
error;
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
% No fragments have been received, not even empty fragments. This
% channel/message hasn't even been opened yet.
error;
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
Fragments = Out ++ lists:reverse(In),
Message = list_to_binary(Fragments),
send_result(Recip, Message),
NewState = State#s{fragments = {[], []}},
{ok, NewState};
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
case pop_queue(Fragments) of
{ok, {NewFragments, Fragment}} ->
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
send_result(Recip, Fragment),
NewState = State#s{fragments = NewFragments},
{ok, NewState};
error ->
State
end.
send_result({async, Pid}, Result) ->
erlang:send(Pid, {msp_fragment, self(), Result});
send_result({sync, From}, Result) ->
gen_server:reply(From, {ok, Result}).
pop_queue({[], []}) ->
error;
pop_queue({In, [Next | Out]}) ->
NewFragments = {In, Out},
{ok, {NewFragments, Next}};
pop_queue({In, []}) ->
Out = lists:reverse(In),
pop_queue({[], Out}).
+314
View File
@@ -0,0 +1,314 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker Manager
%%% @end
-module(msp_channel_man).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
%% Worker interface
-export([enroll/0]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(route, {from :: inet:port_number(),
to :: endpoint()}).
-record(channel,
{chan_pid :: pid()}).
-record(route_info,
{router :: pid(),
socket :: gen_udp:socket(),
side :: 0 | 1,
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
-record(s,
{connections = #{} :: #{#route{} => #route_info{}}}).
-type state() :: #s{}.
%%% Service Interface
add_route(Sock, TheirIP, TheirPort, Side) ->
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
ok ->
transfer_socket(Sock, TheirIP, TheirPort);
{error, Reason} ->
{error, Reason}
end.
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
% A pretty complex function for an interface, but we need to transfer ownership
% of the socket.
transfer_socket(Sock, TheirIP, TheirPort) ->
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
ok ->
transfer_socket2(Sock, {TheirIP, TheirPort});
{error, Reason} ->
{error, Reason}
end.
transfer_socket2(Sock, Peer) ->
% Since it was transferred, any backlog of datagrams will now be in the
% channel manager's inbox. Turn it off from active, so that the manager
% knows that it will be fully caught up *before* the socket_transferred
% notification arrives.
case inet:setopts(Sock, [{active, false}]) of
ok ->
% Now the socket is all cleaned up, let the manager know.
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
{error, Reason} ->
{error, Reason}
end.
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_router, if it doesn't know where to send
% something.
dispatch_fallback(Socket, Peer, ID, Packet) ->
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
%%% Worker Interface
-spec enroll() -> ok.
%% @doc
%% Workers register here after they initialize.
enroll() ->
gen_server:cast(?MODULE, {enroll, self()}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
init(none) ->
State = #s{},
{ok, State}.
handle_call({add_route, Sock, Route, Side}, _, State) ->
{Result, NewState} = do_add_route(Sock, Route, Side, State),
{reply, Result, NewState};
handle_call({socket_transferred, Sock, Peer}, _, State) ->
Result = do_socket_transferred(Sock, Peer, State),
{reply, Result, State};
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(Socket, Peer, [ID | PacketList], State) ->
Packet = list_to_binary(PacketList),
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(_, _, <<>>, State) ->
% Empty datagram... Ignore.
State;
handle_udp(_, _, [], State) ->
State.
handle_down(Mon, PID, Reason, State) ->
% TODO
Unexpected = {'DOWN', Mon, process, PID, Reason},
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
State.
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Doer Functions
-spec do_enroll(PID, State) -> NewState
when PID :: pid(),
State :: state(),
NewState :: state().
do_enroll(_PID, State) ->
% TODO... ??
State.
do_enroll2(PID, State, Channels) ->
case lists:member(PID, Channels) of
false ->
Mon = monitor(process, PID),
ok = log(info, "Enroll: ~tp @ ~tp", [PID, Mon]),
%State#s{channels = [PID | Channels]};
State;
true ->
State
end.
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route isn't registered already...
case maps:is_key(Route, Conns) of
true ->
Result = {error, already_added},
{Result, State};
false ->
do_add_route2(Sock, Route, Side, State)
end.
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
{ok, Router} = msp_router:start_link(),
NewConn = #route_info{router = Router,
socket = Sock,
side = Side,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
NewState = State#s{connections = NewConns},
{ok, NewState}.
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route is actually registered.
case maps:find(Route, Conns) of
{ok, Conn} ->
do_socket_transferred2(Sock, Peer, Conn);
error ->
{error, unknown_connection}
end.
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
->
msp_router:begin_routing(Router, Sock, Peer),
channel_catchup(Router, maps:iterator(Channels));
do_socket_transferred2(_, _, _) ->
{error, wrong_socket}.
% If we have created channels without spawning the router process, then at some
% point we will need to tell the router process what channels currently exist.
channel_catchup(Router, Iter) ->
case maps:next(Iter) of
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
msp_router:add_channel(Router, ChanID, Chan),
channel_catchup(Router, NewIter);
none ->
ok
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
case maps:find(Route, Conns) of
{ok, Info} ->
do_create_channel2(State, Route, Info);
error ->
Result = {error, unknown_connection},
{Result, State}
end.
do_create_channel2(State, Route, Info) ->
case Info#route_info.unused_channels of
[] ->
{{error, all_in_use}, State};
[Next | _] ->
do_create_channel3(State, Route, Info, Next)
end.
do_create_channel3(State, Route, Info, ChanID) ->
{ok, Chan} = msp_channel_sup:start_channel(),
msp_channel:configure_channel(Chan, ChanID, Info#route_info.socket, Route#route.to),
Result = {ok, {Chan, ChanID}},
% Now update the channel list...
Remaining = lists:delete(ChanID, Info#route_info.unused_channels),
NewChan = #channel{chan_pid = Chan},
NewChannels = maps:put(ChanID, NewChan, Info#route_info.used_channels),
NewInfo = Info#route_info{unused_channels = Remaining,
used_channels = NewChannels},
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
% Also add it to the router!
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
{Result, NewState}.
do_dispatch(Socket, Peer, ID, Packet, State) ->
{ok, OurPort} = inet:port(Socket),
Route = #route{from = OurPort, to = Peer},
do_dispatch2(Route, ID, Packet, State).
do_dispatch2(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
State
end.
do_dispatch2(Route, ID, Packet, State, Info) ->
case maps:find(ID, Info#route_info.used_channels) of
{ok, #channel{chan_pid = Chan}} ->
do_dispatch3(Packet, State, Chan);
error ->
{{ok, {Chan, _}}, NewState} = do_create_channel3(State, Route, Info, ID),
do_dispatch3(Packet, NewState, Chan)
end.
do_dispatch3(Packet, State, Chan) ->
erlang:send(Chan, {msp_fragment, Packet}),
State.
+48
View File
@@ -0,0 +1,48 @@
%%% @doc
%%% Minimal Stream Protocol : Channel Worker Supervisor
%%% @end
-module(msp_channel_sup).
-vsn("0.1.0").
-behaviour(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_channel/0]).
-export([start_link/0]).
-export([init/1]).
-spec start_channel() -> Result
when Result :: {ok, pid()}
| {error, Reason},
Reason :: {already_started, pid()}
| {shutdown, term()}
| term().
start_channel() ->
supervisor:start_child(?MODULE, []).
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {simple_one_for_one, 1, 60},
Channel =
{msp_channel,
{msp_channel, start_link, []},
temporary,
brutal_kill,
worker,
[msp_channel]},
{ok, {RestartStrategy, [Channel]}}.
+50
View File
@@ -0,0 +1,50 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Service Supervisor
%%%
%%% This is the service-level supervisor of the system. It is the parent of both the
%%% client connection handlers and the client manager (which manages the client
%%% connection handlers). This is the child of msp_sup.
%%%
%%% See: http://erlang.org/doc/apps/kernel/application.html
%%% @end
-module(msp_channels).
-vsn("0.1.0").
-behavior(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_link/0]).
-export([init/1]).
-spec start_link() -> {ok, pid()}.
%% @private
%% This supervisor's own start function.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {rest_for_one, 1, 60},
ChannelMan =
{msp_channel_man,
{msp_channel_man, start_link, []},
permanent,
5000,
worker,
[msp_channel_man]},
ChannelSup =
{msp_channel_sup,
{msp_channel_sup, start_link, []},
permanent,
5000,
supervisor,
[msp_channel_sup]},
Children = [ChannelSup, ChannelMan],
{ok, {RestartStrategy, Children}}.
+9 -9
View File
@@ -5,28 +5,28 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-record(s,
{}).
{}).
-type state() :: #s{}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
-spec init(none) -> {ok, state()}.
init(none) ->
ok = io:format("msp_man starting.~n", []),
State = #s{},
{ok, State}.
ok = io:format("msp_man starting.~n", []),
State = #s{},
{ok, State}.
handle_call(_, _, State) ->
{reply, ok, State}.
{reply, ok, State}.
handle_cast(_, State) ->
{noreply, State}.
{noreply, State}.
handle_info(Unexpected, State) ->
ok = io:format("Warning: Unexpected info ~p~n", [Unexpected]),
{noreply, State}.
ok = io:format("Warning: Unexpected info ~p~n", [Unexpected]),
{noreply, State}.
+139
View File
@@ -0,0 +1,139 @@
%%% @doc
%%% Minimal Stream Protocol: Connection Worker
%%% @end
-module(msp_router).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([begin_routing/3, add_channel/3]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
% msp_channel server.
-type channel() :: pid().
-record(s,
{socket :: gen_udp:socket(),
peer :: endpoint(),
connections = #{} :: #{integer() => channel()}}).
-type state() :: none | #s{}.
%%% Interface
% msp does not negotiate new connections, but once you have a
% socket, and a host that knows you will be talking MSP, then a
% new msp_router can be initialized.
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
% Transfer the socket to the gen_server. If it is
% active then a bunch of messages will be received
% at once, but that is fine.
case gen_udp:controlling_process(OurSock, Router) of
ok ->
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
{error, Reason} ->
{error, Reason}
end.
add_channel(Router, ChanID, ChanPID) ->
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link(?MODULE, none, []).
% TODO: SWP? SWP of SWPs?
init(none) ->
{ok, none}.
handle_call({begin_routing, Sock, Peer}, _, none) ->
{Result, State} = do_begin_routing(Sock, Peer),
{reply, Result, State};
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
Result = {error, already_routing},
{reply, Result, State};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{reply, undef, State}.
handle_cast({add_channel, ChanID, ChanPID}, State) ->
NewState = do_add_channel(ChanID, ChanPID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
log(debug, "Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
{noreply, State}.
-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason}
when OldVersion :: Version | {old, Version},
Version :: term(),
State :: state(),
Extra :: term(),
NewState :: term(),
Reason :: term().
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Doer Functions
do_begin_routing(Sock, Peer) ->
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock, peer = Peer},
{ok, State}.
do_add_channel(_, _, none) ->
% Do nothing... We will need to be told about these channels again later.
none;
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
NewConns = maps:put(ChanID, ChanPID, Conns),
State#s{connections = NewConns}.
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
case maps:find(ID, Conns) of
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?
ok.
+15 -9
View File
@@ -5,14 +5,20 @@
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
RestartStrategy = {one_for_one, 0, 60},
Children = [{msp_man,
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]}],
{ok, {RestartStrategy, Children}}.
RestartStrategy = {one_for_one, 0, 60},
Children = [{msp_man,
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]},
{msp_channels,
{msp_channels, start_link, []},
permanent,
5000,
worker,
[msp_channels]}],
{ok, {RestartStrategy, Children}}.
+94
View File
@@ -0,0 +1,94 @@
-module(msp_tests).
-export([spawn_tests/0, run_tests_and_halt/0, run_tests_protected/0, run_tests/0]).
spawn_tests() ->
spawn(?MODULE, run_tests_and_halt, []),
ok.
run_tests_and_halt() ->
% Add a handler to print all log messages to the screen, so we can see why
% tests are failing, if they are.
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
Result = run_tests_protected(),
io:format("Tests returned ~p~n", [Result]),
halt().
run_tests_protected() ->
try
run_tests()
catch
_:Reason:Stack -> {error, Reason, Stack}
end.
run_tests() ->
ok = send_test(),
ok = out_of_order_test(),
ok = race_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5511,
PortB = 5512,
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
MessageA = <<"message sent from A to B">>,
msp_channel:send_and_close(ChanA, MessageA),
io:format("A has tried to send a message to B~n"),
{ok, MessageA} = msp_channel:receive_message(ChanB),
io:format("received successfully~n"),
MessageB = <<"message sent from B to A">>,
msp_channel:send_and_close(ChanB, MessageB),
io:format("B has tried to send a message to A~n"),
{ok, MessageB} = msp_channel:receive_message(ChanA),
io:format("received successfully~n"),
ok.
out_of_order_test() ->
IP = {127, 0, 0, 1},
PortA = 5521,
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
io:format("Received out of order fragments successfully.~n"),
ok.
race_test() ->
IP = {127, 0, 0, 1},
PortA = 5531,
PortB = 5532,
{ok, SockA} = gen_udp:open(PortA),
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 0:16, "a message">>),
timer:sleep(10),
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
% which needs to be properly detected and handled by the socket transfer
% functions.
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
% TODO We could test that the datagrams were dispatched properly by
% receiving on the channel, but for now we just manually check the
% logs/console output for msp_channel_man errors.
timer:sleep(10),
ok.