Compare commits

..

2 Commits

Author SHA1 Message Date
Jarvis Carroll
0ccd64e2f1 some kind of channel allocation
Now sending messages results in some basic channel state
tracking, but receiving doesn't do any of that yet.
2025-10-26 05:01:44 +00:00
Jarvis Carroll
7503463ff2 message dispatch stub
This is probably about as far as I can go without actual
manager processes... I need something that knows what
channels are in use!!
2025-10-26 00:25:01 +00:00
6 changed files with 558 additions and 14 deletions

253
src/msp_channel.erl Normal file
View File

@ -0,0 +1,253 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker
%%% @end
-module(msp_channel).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([configure_channel/4]).
-export([send_and_close/2]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Opcodes
% Fragment/message opcodes:
% _____________________________________________________________________________
% OP_FRAGMENT | Part of a message/stream
% OP_OPEN | Open a new channel, and send the first fragment of the
% | first message in it.
% OP_END_MESSAGE | End of a message/stream, expects a response
% OP_OPEN_END | Open a new channel, and send an entire message
% OP_CLOSE | End of a message/stream, do not respond, close the channel
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
% | channel, all in one datagram. Sort of like
% | gen_server:cast/2, or if UDP were reliable.
%
% Transport Control Stuff
% -----------------------------------------------------------------------------
% OP_ACK | Acknowledge that instructions up to N have been received.
% OP_STATUS | Request an ACK for this exact command. Has many uses -
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
% | to stop the receiver from coalescing ACKs too much.
% OP_NACK | Instruction N has not been received.
% OP_CANCEL | Sender is hoping that messages M through N weren't
% | delivered, and can be cancelled.
% OP_ACK_CANCEL | Respond to OP_CANCEL.
%
% Stream Negotiation
% _____________________________________________________________________________
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
% | should be moved to a different ID.
%
% Lossy
% _____________________________________________________________________________
% OP_OPEN_DIRTY | Like OP_OPEN, but marks the channel as 'dirty', meaning
% | it is allowed to send lossy datagrams too, for voip, etc.
% OP_LOSSY | A lossy datagram, for the application to deal with.
% Numbers aren't final.
-define(OP_FRAGMENT, 0).
-define(OP_OPEN, 1).
-define(OP_END_MESSAGE, 2).
-define(OP_OPEN_END, 3).
-define(OP_CLOSE, 4).
-define(OP_OPEN_CLOSE, 5).
-define(OP_ACK, 6).
-define(OP_STATUS, 7).
-define(OP_NACK, 8).
-define(OP_CANCEL, 9).
-define(OP_ACK_CANCEL, 10).
-define(OP_REQ_MOVE, 11).
-define(OP_OPEN_DIRTY, 12).
-define(OP_LOSSY, 13).
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer()}).
-type state() :: none | #s{}.
%%% Interface
configure_channel(Pid, Id, Socket, Peer) ->
gen_server:cast(Pid, {configure_channel, Id, Socket, Peer}).
send_and_close(Chan, Message) ->
gen_server:call(Chan, {send, Message, close}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link(?MODULE, none, []).
init(none) ->
{ok, none}.
handle_call({send, Message, Mod}, _, State) ->
{Result, NewState} = do_send(Message, Mod, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
{noreply, State};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({msp_fragment, Packet}, State) ->
NewState = do_handle_datagram(State, Packet),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason}
when OldVersion :: Version | {old, Version},
Version :: term(),
State :: state(),
Extra :: term(),
NewState :: term(),
Reason :: term().
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Send Doer Functions
do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
{{error, already_open}, State};
do_send(_, _, State = #s{state = receiving}) ->
{{error, not_sending}, State};
do_send(Message, Mod, State = #s{chan_id = Id,
state = ChanMode,
peer = Peer,
socket = Sock,
seq = Seq}) ->
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
gen_udp:send(Sock, Peer, Datagram),
NewState = State#s{state = NewChanMode, seq = Seq + 1},
{ok, NewState}.
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
choose_opcode(true, none) -> {?OP_OPEN, sending};
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Receive Doer Functions
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
do_handle_reliable(State, Op, Seq, Rest);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
do_handle_fragment(State, true, continue, Seq, Payload);
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
do_handle_fragment(State, false, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
do_handle_fragment(State, true, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
do_handle_fragment(State, true, dirty, Seq, Payload);
do_handle_reliable(State, Unknown, _, _) ->
io:format("Got unexpected opcode ~p~n", [Unknown]),
State.
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.

219
src/msp_channel_man.erl Normal file
View File

@ -0,0 +1,219 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker Manager
%%% @end
-module(msp_channel_man).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
%% Worker interface
-export([enroll/0]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(route, {from :: inet:port_number(),
to :: endpoint()}).
-record(channel,
{chan_pid :: pid()}).
-record(route_info,
{conn_pid :: pid(),
socket :: gen_udp:socket(),
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
-record(s,
{connections = #{} :: #{#route{} => #route_info{}}}).
-type state() :: #s{}.
%%% Service Interface
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_connection, if it doesn't know where to
% send something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
%%% Worker Interface
-spec enroll() -> ok.
%% @doc
%% Workers register here after they initialize.
enroll() ->
gen_server:cast(?MODULE, {enroll, self()}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
init(none) ->
State = #s{},
{ok, State}.
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({update_router, Route, Conn, Sock}, State) ->
NewState = do_update_router(Route, Conn, Sock, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_down(Mon, PID, Reason, State) ->
% TODO
Unexpected = {'DOWN', Mon, process, PID, Reason},
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
State.
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Doer Functions
-spec do_enroll(PID, State) -> NewState
when PID :: pid(),
State :: state(),
NewState :: state().
do_enroll(_PID, State) ->
% TODO... ??
State.
do_enroll2(PID, State, Channels) ->
case lists:member(PID, Channels) of
false ->
Mon = monitor(process, PID),
ok = log(info, "Enroll: ~tp @ ~tp", [PID, Mon]),
%State#s{channels = [PID | Channels]};
State;
true ->
State
end.
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
case maps:find(Route, Conns) of
{ok, _Info} ->
io:format("Updating routes not yet implemented.~n", []),
init:stop();
error ->
NewConn = #route_info{conn_pid = Conn,
socket = Sock,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
case maps:find(Route, Conns) of
{ok, Info} ->
do_create_channel2(State, Route, Info);
error ->
Result = {error, unknown_connection},
{Result, State}
end.
do_create_channel2(State, Route, Info) ->
case Info#route_info.unused_channels of
[] ->
{{error, all_in_use}, State};
[Next | _] ->
do_create_channel3(State, Route, Info, Next)
end.
do_create_channel3(State, Route, Info, ChanID) ->
{ok, Chan} = msp_channel_sup:start_channel(),
msp_channel:configure_channel(Chan, ChanID, Info#route_info.socket, Route#route.to),
Result = {ok, {Chan, ChanID}},
% Now update the channel list...
Remaining = lists:delete(ChanID, Info#route_info.unused_channels),
NewChan = #channel{chan_pid = Chan},
NewChannels = maps:put(ChanID, NewChan, Info#route_info.used_channels),
NewInfo = Info#route_info{unused_channels = Remaining,
used_channels = NewChannels},
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
{Result, NewState}.
do_dispatch(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
State
end.
do_dispatch2(Route, ID, Packet, State, Info) ->
case maps:find(ID, Info#route_info.used_channels) of
{ok, #channel{chan_pid = Chan}} ->
do_dispatch3(Packet, State, Chan);
error ->
{{ok, {Chan, _}}, NewState} = do_create_channel3(State, Route, Info, ID),
do_dispatch3(Packet, NewState, Chan)
end.
do_dispatch3(Packet, State, Chan) ->
erlang:send(Chan, {msp_fragment, Packet}),
State.

50
src/msp_channels.erl Normal file
View File

@ -0,0 +1,50 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Service Supervisor
%%%
%%% This is the service-level supervisor of the system. It is the parent of both the
%%% client connection handlers and the client manager (which manages the client
%%% connection handlers). This is the child of msp_sup.
%%%
%%% See: http://erlang.org/doc/apps/kernel/application.html
%%% @end
-module(msp_channels).
-vsn("0.1.0").
-behavior(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_link/0]).
-export([init/1]).
-spec start_link() -> {ok, pid()}.
%% @private
%% This supervisor's own start function.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {rest_for_one, 1, 60},
ChannelMan =
{msp_channel_man,
{msp_channel_man, start_link, []},
permanent,
5000,
worker,
[msp_channel_man]},
ChannelSup =
{msp_channel_sup,
{msp_channel_sup, start_link, []},
permanent,
5000,
supervisor,
[msp_channel_sup]},
Children = [ChannelSup, ChannelMan],
{ok, {RestartStrategy, Children}}.

View File

@ -78,8 +78,8 @@ handle_cast(Unexpected, State) ->
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
NewState = do_dispatch(State, Packet),
{noreply, NewState};
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
@ -105,14 +105,23 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_msp(Sock, Peer, Side) ->
ok = inet:setopts(Sock, [{active, once}]),
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
do_dispatch(State = #s{socket = Sock}, Packet) ->
io:format("Got data: ~p~n", [Packet]),
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
State.
case maps:find(ID, Conns) of
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?
ok.

View File

@ -10,9 +10,15 @@ start_link() ->
init([]) ->
RestartStrategy = {one_for_one, 0, 60},
Children = [{msp_man,
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]}],
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]},
{msp_channels,
{msp_channels, start_link, []},
permanent,
5000,
worker,
[msp_channels]}],
{ok, {RestartStrategy, Children}}.

View File

@ -25,17 +25,24 @@ make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
{ok, Pid} = msp_connection:start_link(),
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
{Pid, Sock}.
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5555,
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
gen_udp:send(SockA, {IP, PortB}, <<"message sent from A to B">>),
gen_udp:send(SockB, {IP, PortA}, <<"message sent from B to A">>),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
timer:sleep(10),
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
timer:sleep(10),
ok.