From 0ccd64e2f1ce6cd977f96e9764d4968aef5cc622 Mon Sep 17 00:00:00 2001 From: Jarvis Carroll Date: Sun, 26 Oct 2025 05:01:44 +0000 Subject: [PATCH] some kind of channel allocation Now sending messages results in some basic channel state tracking, but receiving doesn't do any of that yet. --- src/msp_channel.erl | 80 ++++++++++----- src/msp_channel_man.erl | 219 ++++++++++++++++++++++++++++++++++++++++ src/msp_channels.erl | 50 +++++++++ src/msp_connection.erl | 20 ++-- src/msp_sup.erl | 16 ++- src/msp_tests.erl | 13 ++- 6 files changed, 356 insertions(+), 42 deletions(-) create mode 100644 src/msp_channel_man.erl create mode 100644 src/msp_channels.erl diff --git a/src/msp_channel.erl b/src/msp_channel.erl index 54c38fc..38266dd 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -9,7 +9,8 @@ -copyright("Jarvis Carroll "). -license("MIT"). --export([new_connection/2]). +-export([configure_channel/4]). +-export([send_and_close/2]). %% gen_server -export([start_link/0]). @@ -72,8 +73,14 @@ %%% Type and Record Definitions +-type endpoint() :: {inet:ip_address(), inet:port_number()}. + -record(s, - {}). + {chan_id :: integer(), + socket :: gen_udp:socket(), + peer :: endpoint(), % TODO configure the socket in advance? + state = closed :: closed | sending | receiving, + seq = 0 :: integer()}). -type state() :: none | #s{}. @@ -82,16 +89,11 @@ %%% Interface -new_connection(Peer, Packet) -> - {ok, Pid} = start_link(), - configure_channel(Pid, Peer), - handle_datagram(Pid, Packet). +configure_channel(Pid, Id, Socket, Peer) -> + gen_server:cast(Pid, {configure_channel, Id, Socket, Peer}). -configure_channel(Id, Peer) -> - gen_server:cast(Id, {configure_channel, Peer}). - -handle_datagram(Id, Packet) -> - gen_server:cast(Id, {handle_datagram, Packet}). +send_and_close(Chan, Message) -> + gen_server:call(Chan, {send, Message, close}). %%% gen_server @@ -107,22 +109,27 @@ init(none) -> {ok, none}. +handle_call({send, Message, Mod}, _, State) -> + {Result, NewState} = do_send(Message, Mod, State), + {reply, Result, NewState}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), {noreply, State}. -handle_cast({configure_channel, Peer}, none) -> - State = #s{}, +handle_cast({configure_channel, Id, Socket, Peer}, none) -> + State = #s{chan_id = Id, + socket = Socket, + peer = Peer}, {noreply, State}; -handle_cast({handle_datagram, Packet}, State) -> - NewState = do_handle_datagram(State, Packet), - {noreply, NewState}; handle_cast(Unexpected, State) -> ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), {noreply, State}. +handle_info({msp_fragment, Packet}, State) -> + NewState = do_handle_datagram(State, Packet), + {noreply, NewState}; handle_info(Unexpected, State) -> ok = log(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. @@ -145,7 +152,34 @@ terminate(_, _) -> -%%% Doer Functions +%%% Send Doer Functions + +do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed -> + {{error, already_open}, State}; +do_send(_, _, State = #s{state = receiving}) -> + {{error, not_sending}, State}; +do_send(Message, Mod, State = #s{chan_id = Id, + state = ChanMode, + peer = Peer, + socket = Sock, + seq = Seq}) -> + io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]), + IsNewChannel = ChanMode == closed, + {Op, NewChanMode} = choose_opcode(IsNewChannel, Mod), + Datagram = <>, + gen_udp:send(Sock, Peer, Datagram), + NewState = State#s{state = NewChanMode, seq = Seq + 1}, + {ok, NewState}. + +choose_opcode(false, none) -> {?OP_FRAGMENT, sending}; +choose_opcode(true, none) -> {?OP_OPEN, sending}; +choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving}; +choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving}; +choose_opcode(false, close) -> {?OP_OPEN_END, closed}; +choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed}; +choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}. + +%%% Receive Doer Functions do_handle_datagram(State, <>) -> do_handle_lossy(State, Data); @@ -194,26 +228,26 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) -> State. do_handle_ack(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. do_handle_status(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. do_handle_nack(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. do_handle_cancel(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. do_handle_ack_cancel(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. do_handle_req_move(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n"), + io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), State. diff --git a/src/msp_channel_man.erl b/src/msp_channel_man.erl new file mode 100644 index 0000000..5c3b80e --- /dev/null +++ b/src/msp_channel_man.erl @@ -0,0 +1,219 @@ +%%% @doc +%%% Minimal Stream Protocol: Channel Worker Manager +%%% @end + +-module(msp_channel_man). +-vsn("0.1.0"). +-behavior(gen_server). +-author("Jarvis Carroll "). +-copyright("Jarvis Carroll "). +-license("MIT"). + +-export([update_router/5, create_channel/3, dispatch_fallback/5]). +%% Worker interface +-export([enroll/0]). +%% gen_server +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). +-include("$zx_include/zx_logger.hrl"). + + +%%% Type and Record Definitions + +-type endpoint() :: {inet:ip_address(), inet:port_number()}. +-record(route, {from :: inet:port_number(), + to :: endpoint()}). + +-record(channel, + {chan_pid :: pid()}). + +-record(route_info, + {conn_pid :: pid(), + socket :: gen_udp:socket(), + unused_channels :: [integer()], + used_channels :: #{integer() => #channel{}}}). + +-record(s, + {connections = #{} :: #{#route{} => #route_info{}}}). + +-type state() :: #s{}. + +%%% Service Interface + +update_router(OurPort, TheirIP, TheirPort, Conn, Sock) -> + Route = #route{from = OurPort, to = {TheirIP, TheirPort}}, + gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}). + +create_channel(OurPort, TheirIP, TheirPort) -> + Route = #route{from = OurPort, to = {TheirIP, TheirPort}}, + gen_server:call(?MODULE, {create_channel, Route}). + +% Deliver messages on behalf of msp_connection, if it doesn't know where to +% send something. +dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) -> + Route = #route{from = OurPort, to = {TheirIP, TheirPort}}, + gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}). + +%%% Worker Interface + +-spec enroll() -> ok. +%% @doc +%% Workers register here after they initialize. + +enroll() -> + gen_server:cast(?MODULE, {enroll, self()}). + + + +%%% gen_server + +-spec start_link() -> Result + when Result :: {ok, pid()} + | {error, Reason :: term()}. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). + + +init(none) -> + State = #s{}, + {ok, State}. + + +handle_call({create_channel, Route}, _, State) -> + {Result, NewState} = do_create_channel(State, Route), + {reply, Result, NewState}; +handle_call(Unexpected, From, State) -> + ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), + {noreply, State}. + + +handle_cast({update_router, Route, Conn, Sock}, State) -> + NewState = do_update_router(Route, Conn, Sock, State), + {noreply, NewState}; +handle_cast({dispatch, Route, ID, Packet}, State) -> + NewState = do_dispatch(Route, ID, Packet, State), + {noreply, NewState}; +handle_cast({enroll, PID}, State) -> + NewState = do_enroll(PID, State), + {noreply, NewState}; +handle_cast(Unexpected, State) -> + ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), + {noreply, State}. + + +handle_info({'DOWN', Mon, process, PID, Reason}, State) -> + NewState = handle_down(Mon, PID, Reason, State), + {noreply, NewState}; +handle_info(Unexpected, State) -> + ok = log(warning, "Unexpected info: ~tp", [Unexpected]), + {noreply, State}. + + +handle_down(Mon, PID, Reason, State) -> + % TODO + Unexpected = {'DOWN', Mon, process, PID, Reason}, + ok = log(warning, "Unexpected info: ~tp", [Unexpected]), + State. + + +code_change(_, State, _) -> + {ok, State}. + + +terminate(_, _) -> + ok. + + + +%%% Doer Functions + +-spec do_enroll(PID, State) -> NewState + when PID :: pid(), + State :: state(), + NewState :: state(). + +do_enroll(_PID, State) -> + % TODO... ?? + State. + +do_enroll2(PID, State, Channels) -> + case lists:member(PID, Channels) of + false -> + Mon = monitor(process, PID), + ok = log(info, "Enroll: ~tp @ ~tp", [PID, Mon]), + %State#s{channels = [PID | Channels]}; + State; + true -> + State + end. + +do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) -> + case maps:find(Route, Conns) of + {ok, _Info} -> + io:format("Updating routes not yet implemented.~n", []), + init:stop(); + error -> + NewConn = #route_info{conn_pid = Conn, + socket = Sock, + unused_channels = lists:seq(0, 255), + used_channels = #{}}, + NewConns = maps:put(Route, NewConn, Conns), + State#s{connections = NewConns} + end. + +do_create_channel(State = #s{connections = Conns}, Route) -> + case maps:find(Route, Conns) of + {ok, Info} -> + do_create_channel2(State, Route, Info); + error -> + Result = {error, unknown_connection}, + {Result, State} + end. + +do_create_channel2(State, Route, Info) -> + case Info#route_info.unused_channels of + [] -> + {{error, all_in_use}, State}; + [Next | _] -> + do_create_channel3(State, Route, Info, Next) + end. + +do_create_channel3(State, Route, Info, ChanID) -> + {ok, Chan} = msp_channel_sup:start_channel(), + msp_channel:configure_channel(Chan, ChanID, Info#route_info.socket, Route#route.to), + Result = {ok, {Chan, ChanID}}, + + % Now update the channel list... + Remaining = lists:delete(ChanID, Info#route_info.unused_channels), + NewChan = #channel{chan_pid = Chan}, + NewChannels = maps:put(ChanID, NewChan, Info#route_info.used_channels), + NewInfo = Info#route_info{unused_channels = Remaining, + used_channels = NewChannels}, + NewConns = maps:put(Route, NewInfo, State#s.connections), + NewState = State#s{connections = NewConns}, + + {Result, NewState}. + +do_dispatch(Route, ID, Packet, State) -> + case maps:find(Route, State#s.connections) of + {ok, Info} -> + do_dispatch2(Route, ID, Packet, State, Info); + error -> + State + end. + +do_dispatch2(Route, ID, Packet, State, Info) -> + case maps:find(ID, Info#route_info.used_channels) of + {ok, #channel{chan_pid = Chan}} -> + do_dispatch3(Packet, State, Chan); + error -> + {{ok, {Chan, _}}, NewState} = do_create_channel3(State, Route, Info, ID), + do_dispatch3(Packet, NewState, Chan) + end. + +do_dispatch3(Packet, State, Chan) -> + erlang:send(Chan, {msp_fragment, Packet}), + State. + diff --git a/src/msp_channels.erl b/src/msp_channels.erl new file mode 100644 index 0000000..4b28677 --- /dev/null +++ b/src/msp_channels.erl @@ -0,0 +1,50 @@ +%%% @doc +%%% Minimal Stream Protocol: Channel Service Supervisor +%%% +%%% This is the service-level supervisor of the system. It is the parent of both the +%%% client connection handlers and the client manager (which manages the client +%%% connection handlers). This is the child of msp_sup. +%%% +%%% See: http://erlang.org/doc/apps/kernel/application.html +%%% @end + +-module(msp_channels). +-vsn("0.1.0"). +-behavior(supervisor). +-author("Jarvis Carroll "). +-copyright("Jarvis Carroll "). +-license("MIT"). + +-export([start_link/0]). +-export([init/1]). + + +-spec start_link() -> {ok, pid()}. +%% @private +%% This supervisor's own start function. + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, none). + +-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +%% @private +%% The OTP init/1 function. + +init(none) -> + RestartStrategy = {rest_for_one, 1, 60}, + ChannelMan = + {msp_channel_man, + {msp_channel_man, start_link, []}, + permanent, + 5000, + worker, + [msp_channel_man]}, + ChannelSup = + {msp_channel_sup, + {msp_channel_sup, start_link, []}, + permanent, + 5000, + supervisor, + [msp_channel_sup]}, + Children = [ChannelSup, ChannelMan], + {ok, {RestartStrategy, Children}}. diff --git a/src/msp_connection.erl b/src/msp_connection.erl index b727e1f..725d4eb 100644 --- a/src/msp_connection.erl +++ b/src/msp_connection.erl @@ -78,8 +78,8 @@ handle_cast(Unexpected, State) -> handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) -> - NewState = do_dispatch(State, Packet), - {noreply, NewState}; + do_dispatch(State, Packet), + {noreply, State}; handle_info(Unexpected, State) -> ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]), {noreply, State}. @@ -111,19 +111,17 @@ do_begin_msp(Sock, Peer, Side) -> side = Side}, State. -do_dispatch(State = #s{socket = Sock, peer = Peer, connections = Conns}, <>) -> +do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <>) -> ok = inet:setopts(Sock, [{active, once}]), case maps:find(ID, Conns) of {ok, Conn} -> - erlang:send(Conn, {msp_fragment, Packet}), - State; + erlang:send(Conn, {msp_fragment, Packet}); error -> - io:format("Opening connection ~p~n", [ID]), - Conn = spawn_link(msp_channel, new_connection, [Peer, Packet]), - NewConns = maps:put(Peer, Conn, Conns), - State#s{connections = NewConns} + {ok, OurPort} = inet:port(Sock), + {TheirIP, TheirPort} = Peer, + msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) end; -do_dispatch(State, <<>>) -> +do_dispatch(_, <<>>) -> % Empty datagram? - State. + ok. diff --git a/src/msp_sup.erl b/src/msp_sup.erl index 66d9eeb..1bc64fb 100644 --- a/src/msp_sup.erl +++ b/src/msp_sup.erl @@ -10,9 +10,15 @@ start_link() -> init([]) -> RestartStrategy = {one_for_one, 0, 60}, Children = [{msp_man, - {msp_man, start_link, []}, - permanent, - 5000, - worker, - [msp_man]}], + {msp_man, start_link, []}, + permanent, + 5000, + worker, + [msp_man]}, + {msp_channels, + {msp_channels, start_link, []}, + permanent, + 5000, + worker, + [msp_channels]}], {ok, {RestartStrategy, Children}}. diff --git a/src/msp_tests.erl b/src/msp_tests.erl index bcb090e..ded193b 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -25,17 +25,24 @@ make_connection(OurPort, TheirIP, TheirPort, Side) -> {ok, Sock} = gen_udp:open(OurPort), {ok, Pid} = msp_connection:start_link(), ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side), + ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock), {Pid, Sock}. - send_test() -> IP = {127, 0, 0, 1}, PortA = 5555, PortB = 6666, {A, SockA} = make_connection(PortA, IP, PortB, 0), {B, SockB} = make_connection(PortB, IP, PortA, 1), - gen_udp:send(SockA, {IP, PortB}, << 0:8, 5:8, 0:16, "message sent from A to B">>), - gen_udp:send(SockB, {IP, PortA}, <<128:8, 5:8, 0:16, "message sent from B to A">>), + + {ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB), + msp_channel:send_and_close(ChanA, <<"message sent from A to B">>), + io:format("A has tried to send a message on channel ~p~n", [IndexA]), + timer:sleep(10), + + {ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA), + msp_channel:send_and_close(ChanB, <<"message sent from B to A">>), + io:format("B has tried to send a message on channel ~p~n", [IndexB]), timer:sleep(10), ok.