message dispatch stub

This is probably about as far as I can go without actual
manager processes... I need something that knows what
channels are in use!!
This commit is contained in:
Jarvis Carroll 2025-10-26 00:25:01 +00:00
parent 587fa1710c
commit 7503463ff2
3 changed files with 235 additions and 5 deletions

219
src/msp_channel.erl Normal file
View File

@ -0,0 +1,219 @@
%%% @doc
%%% Minimal Stream Protocol: Channel Worker
%%% @end
-module(msp_channel).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([new_connection/2]).
%% gen_server
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-include("$zx_include/zx_logger.hrl").
%%% Opcodes
% Fragment/message opcodes:
% _____________________________________________________________________________
% OP_FRAGMENT | Part of a message/stream
% OP_OPEN | Open a new channel, and send the first fragment of the
% | first message in it.
% OP_END_MESSAGE | End of a message/stream, expects a response
% OP_OPEN_END | Open a new channel, and send an entire message
% OP_CLOSE | End of a message/stream, do not respond, close the channel
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
% | channel, all in one datagram. Sort of like
% | gen_server:cast/2, or if UDP were reliable.
%
% Transport Control Stuff
% -----------------------------------------------------------------------------
% OP_ACK | Acknowledge that instructions up to N have been received.
% OP_STATUS | Request an ACK for this exact command. Has many uses -
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
% | to stop the receiver from coalescing ACKs too much.
% OP_NACK | Instruction N has not been received.
% OP_CANCEL | Sender is hoping that messages M through N weren't
% | delivered, and can be cancelled.
% OP_ACK_CANCEL | Respond to OP_CANCEL.
%
% Stream Negotiation
% _____________________________________________________________________________
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
% | should be moved to a different ID.
%
% Lossy
% _____________________________________________________________________________
% OP_OPEN_DIRTY | Like OP_OPEN, but marks the channel as 'dirty', meaning
% | it is allowed to send lossy datagrams too, for voip, etc.
% OP_LOSSY | A lossy datagram, for the application to deal with.
% Numbers aren't final.
-define(OP_FRAGMENT, 0).
-define(OP_OPEN, 1).
-define(OP_END_MESSAGE, 2).
-define(OP_OPEN_END, 3).
-define(OP_CLOSE, 4).
-define(OP_OPEN_CLOSE, 5).
-define(OP_ACK, 6).
-define(OP_STATUS, 7).
-define(OP_NACK, 8).
-define(OP_CANCEL, 9).
-define(OP_ACK_CANCEL, 10).
-define(OP_REQ_MOVE, 11).
-define(OP_OPEN_DIRTY, 12).
-define(OP_LOSSY, 13).
%%% Type and Record Definitions
-record(s,
{}).
-type state() :: none | #s{}.
%%% Interface
new_connection(Peer, Packet) ->
{ok, Pid} = start_link(),
configure_channel(Pid, Peer),
handle_datagram(Pid, Packet).
configure_channel(Id, Peer) ->
gen_server:cast(Id, {configure_channel, Peer}).
handle_datagram(Id, Packet) ->
gen_server:cast(Id, {handle_datagram, Packet}).
%%% gen_server
-spec start_link() -> Result
when Result :: {ok, pid()}
| {error, Reason :: term()}.
start_link() ->
gen_server:start_link(?MODULE, none, []).
init(none) ->
{ok, none}.
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
handle_cast({configure_channel, Peer}, none) ->
State = #s{},
{noreply, State};
handle_cast({handle_datagram, Packet}, State) ->
NewState = do_handle_datagram(State, Packet),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason}
when OldVersion :: Version | {old, Version},
Version :: term(),
State :: state(),
Extra :: term(),
NewState :: term(),
Reason :: term().
code_change(_, State, _) ->
{ok, State}.
terminate(_, _) ->
ok.
%%% Doer Functions
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
do_handle_reliable(State, Op, Seq, Rest);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
do_handle_fragment(State, true, continue, Seq, Payload);
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
do_handle_fragment(State, false, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
do_handle_fragment(State, true, end_message, Seq, Payload);
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
do_handle_fragment(State, true, dirty, Seq, Payload);
do_handle_reliable(State, Unknown, _, _) ->
io:format("Got unexpected opcode ~p~n", [Unknown]),
State.
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n"),
State.

View File

@ -105,14 +105,25 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_msp(Sock, Peer, Side) ->
ok = inet:setopts(Sock, [{active, once}]),
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
do_dispatch(State = #s{socket = Sock}, Packet) ->
io:format("Got data: ~p~n", [Packet]),
do_dispatch(State = #s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
case maps:find(ID, Conns) of
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet}),
State;
error ->
io:format("Opening connection ~p~n", [ID]),
Conn = spawn_link(msp_channel, new_connection, [Peer, Packet]),
NewConns = maps:put(Peer, Conn, Conns),
State#s{connections = NewConns}
end;
do_dispatch(State, <<>>) ->
% Empty datagram?
State.

View File

@ -34,8 +34,8 @@ send_test() ->
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
gen_udp:send(SockA, {IP, PortB}, <<"message sent from A to B">>),
gen_udp:send(SockB, {IP, PortA}, <<"message sent from B to A">>),
gen_udp:send(SockA, {IP, PortB}, << 0:8, 5:8, 0:16, "message sent from A to B">>),
gen_udp:send(SockB, {IP, PortA}, <<128:8, 5:8, 0:16, "message sent from B to A">>),
timer:sleep(10),
ok.