some kind of channel allocation

Now sending messages results in some basic channel state
tracking, but receiving doesn't do any of that yet.
This commit is contained in:
Jarvis Carroll 2025-10-26 05:01:44 +00:00
parent 7503463ff2
commit 0ccd64e2f1
6 changed files with 356 additions and 42 deletions

View File

@ -9,7 +9,8 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([new_connection/2]).
-export([configure_channel/4]).
-export([send_and_close/2]).
%% gen_server
-export([start_link/0]).
@ -72,8 +73,14 @@
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(s,
{}).
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer()}).
-type state() :: none | #s{}.
@ -82,16 +89,11 @@
%%% Interface
new_connection(Peer, Packet) ->
{ok, Pid} = start_link(),
configure_channel(Pid, Peer),
handle_datagram(Pid, Packet).
configure_channel(Pid, Id, Socket, Peer) ->
gen_server:cast(Pid, {configure_channel, Id, Socket, Peer}).
configure_channel(Id, Peer) ->
gen_server:cast(Id, {configure_channel, Peer}).
handle_datagram(Id, Packet) ->
gen_server:cast(Id, {handle_datagram, Packet}).
send_and_close(Chan, Message) ->
gen_server:call(Chan, {send, Message, close}).
%%% gen_server
@ -107,22 +109,27 @@ init(none) ->
{ok, none}.
handle_call({send, Message, Mod}, _, State) ->
{Result, NewState} = do_send(Message, Mod, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({configure_channel, Peer}, none) ->
State = #s{},
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
{noreply, State};
handle_cast({handle_datagram, Packet}, State) ->
NewState = do_handle_datagram(State, Packet),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({msp_fragment, Packet}, State) ->
NewState = do_handle_datagram(State, Packet),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
@ -145,7 +152,34 @@ terminate(_, _) ->
%%% Doer Functions
%%% Send Doer Functions
do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
{{error, already_open}, State};
do_send(_, _, State = #s{state = receiving}) ->
{{error, not_sending}, State};
do_send(Message, Mod, State = #s{chan_id = Id,
state = ChanMode,
peer = Peer,
socket = Sock,
seq = Seq}) ->
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
gen_udp:send(Sock, Peer, Datagram),
NewState = State#s{state = NewChanMode, seq = Seq + 1},
{ok, NewState}.
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
choose_opcode(true, none) -> {?OP_OPEN, sending};
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Receive Doer Functions
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
@ -194,26 +228,26 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.

219
src/msp_channel_man.erl Normal file
View File

@ -0,0 +1,219 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker Manager
%%% @end
-module(msp_channel_man).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
%% Worker interface
-export([enroll/0]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Type and Record Definitions
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(route, {from :: inet:port_number(),
to :: endpoint()}).
-record(channel,
{chan_pid :: pid()}).
-record(route_info,
{conn_pid :: pid(),
socket :: gen_udp:socket(),
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
-record(s,
{connections = #{} :: #{#route{} => #route_info{}}}).
-type state() :: #s{}.
%%% Service Interface
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_connection, if it doesn't know where to
% send something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
%%% Worker Interface
-spec enroll() -> ok.
%% @doc
%% Workers register here after they initialize.
enroll() ->
gen_server:cast(?MODULE, {enroll, self()}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
init(none) ->
State = #s{},
{ok, State}.
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({update_router, Route, Conn, Sock}, State) ->
NewState = do_update_router(Route, Conn, Sock, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_down(Mon, PID, Reason, State) ->
% TODO
Unexpected = {'DOWN', Mon, process, PID, Reason},
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
State.
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Doer Functions
-spec do_enroll(PID, State) -> NewState
when PID :: pid(),
State :: state(),
NewState :: state().
do_enroll(_PID, State) ->
% TODO... ??
State.
do_enroll2(PID, State, Channels) ->
case lists:member(PID, Channels) of
false ->
Mon = monitor(process, PID),
ok = log(info, "Enroll: ~tp @ ~tp", [PID, Mon]),
%State#s{channels = [PID | Channels]};
State;
true ->
State
end.
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
case maps:find(Route, Conns) of
{ok, _Info} ->
io:format("Updating routes not yet implemented.~n", []),
init:stop();
error ->
NewConn = #route_info{conn_pid = Conn,
socket = Sock,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
case maps:find(Route, Conns) of
{ok, Info} ->
do_create_channel2(State, Route, Info);
error ->
Result = {error, unknown_connection},
{Result, State}
end.
do_create_channel2(State, Route, Info) ->
case Info#route_info.unused_channels of
[] ->
{{error, all_in_use}, State};
[Next | _] ->
do_create_channel3(State, Route, Info, Next)
end.
do_create_channel3(State, Route, Info, ChanID) ->
{ok, Chan} = msp_channel_sup:start_channel(),
msp_channel:configure_channel(Chan, ChanID, Info#route_info.socket, Route#route.to),
Result = {ok, {Chan, ChanID}},
% Now update the channel list...
Remaining = lists:delete(ChanID, Info#route_info.unused_channels),
NewChan = #channel{chan_pid = Chan},
NewChannels = maps:put(ChanID, NewChan, Info#route_info.used_channels),
NewInfo = Info#route_info{unused_channels = Remaining,
used_channels = NewChannels},
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
{Result, NewState}.
do_dispatch(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
State
end.
do_dispatch2(Route, ID, Packet, State, Info) ->
case maps:find(ID, Info#route_info.used_channels) of
{ok, #channel{chan_pid = Chan}} ->
do_dispatch3(Packet, State, Chan);
error ->
{{ok, {Chan, _}}, NewState} = do_create_channel3(State, Route, Info, ID),
do_dispatch3(Packet, NewState, Chan)
end.
do_dispatch3(Packet, State, Chan) ->
erlang:send(Chan, {msp_fragment, Packet}),
State.

50
src/msp_channels.erl Normal file
View File

@ -0,0 +1,50 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Service Supervisor
%%%
%%% This is the service-level supervisor of the system. It is the parent of both the
%%% client connection handlers and the client manager (which manages the client
%%% connection handlers). This is the child of msp_sup.
%%%
%%% See: http://erlang.org/doc/apps/kernel/application.html
%%% @end
-module(msp_channels).
-vsn("0.1.0").
-behavior(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_link/0]).
-export([init/1]).
-spec start_link() -> {ok, pid()}.
%% @private
%% This supervisor's own start function.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {rest_for_one, 1, 60},
ChannelMan =
{msp_channel_man,
{msp_channel_man, start_link, []},
permanent,
5000,
worker,
[msp_channel_man]},
ChannelSup =
{msp_channel_sup,
{msp_channel_sup, start_link, []},
permanent,
5000,
supervisor,
[msp_channel_sup]},
Children = [ChannelSup, ChannelMan],
{ok, {RestartStrategy, Children}}.

View File

@ -78,8 +78,8 @@ handle_cast(Unexpected, State) ->
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
NewState = do_dispatch(State, Packet),
{noreply, NewState};
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
@ -111,19 +111,17 @@ do_begin_msp(Sock, Peer, Side) ->
side = Side},
State.
do_dispatch(State = #s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
case maps:find(ID, Conns) of
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet}),
State;
erlang:send(Conn, {msp_fragment, Packet});
error ->
io:format("Opening connection ~p~n", [ID]),
Conn = spawn_link(msp_channel, new_connection, [Peer, Packet]),
NewConns = maps:put(Peer, Conn, Conns),
State#s{connections = NewConns}
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
end;
do_dispatch(State, <<>>) ->
do_dispatch(_, <<>>) ->
% Empty datagram?
State.
ok.

View File

@ -10,9 +10,15 @@ start_link() ->
init([]) ->
RestartStrategy = {one_for_one, 0, 60},
Children = [{msp_man,
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]}],
{msp_man, start_link, []},
permanent,
5000,
worker,
[msp_man]},
{msp_channels,
{msp_channels, start_link, []},
permanent,
5000,
worker,
[msp_channels]}],
{ok, {RestartStrategy, Children}}.

View File

@ -25,17 +25,24 @@ make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
{ok, Pid} = msp_connection:start_link(),
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
{Pid, Sock}.
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5555,
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
gen_udp:send(SockA, {IP, PortB}, << 0:8, 5:8, 0:16, "message sent from A to B">>),
gen_udp:send(SockB, {IP, PortA}, <<128:8, 5:8, 0:16, "message sent from B to A">>),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
timer:sleep(10),
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
timer:sleep(10),
ok.