Compare commits

...

6 Commits

Author SHA1 Message Date
Jarvis Carroll
78ea3fc85e handle socket race conditions better
Now the channle manager takes full responsibility for dispatch as
soon as it is initialised, and only passes that responsibility on
to the router process once things have been cleaned up.

What a complex refactor just to pass a socket to a new process!
2025-10-29 11:56:44 +00:00
Jarvis Carroll
e8ef0b4304 buffer out-of-order messages
This took a while to debug!! Turns out I don't actually
handle datagrams that have been received before the
workers are all initialized... oops!!
2025-10-27 07:31:27 +00:00
Jarvis Carroll
b6c16967e7 notify router of new channels
This way the second message received on a channel should be
routed by the router without involving the fallback.

Lolspeed achieved.
2025-10-26 11:23:34 +00:00
Jarvis Carroll
47612c2775 have channel_man spawn its own router
Once I added dispatch_fallback to channel_man, it became clear that
the router is actually subordinate to the channel_man.
2025-10-26 11:14:09 +00:00
Jarvis Carroll
cfd9eb748f rename Connection to Router
Connection is more of a supervision concept than a server concept.
I should have a SWP of SWPs too, but I can't be bothered just yet.
2025-10-26 10:46:27 +00:00
Jarvis Carroll
669b2e51e1 actually add msp_channel_sup.erl
oops
2025-10-26 10:39:08 +00:00
5 changed files with 314 additions and 83 deletions

View File

@ -75,12 +75,19 @@
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(instruction,
{op :: integer(),
seq :: integer(),
data :: binary()}).
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer()}).
seq = 0 :: integer(),
instructions = [] :: [#instruction{}],
fragments = [] :: [binary()]}).
-type state() :: none | #s{}.
@ -118,6 +125,7 @@ handle_call(Unexpected, From, State) ->
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
log(debug, "Opening channel ~p~n", [Id]),
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
@ -163,7 +171,7 @@ do_send(Message, Mod, State = #s{chan_id = Id,
peer = Peer,
socket = Sock,
seq = Seq}) ->
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
@ -181,18 +189,76 @@ choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
do_handle_ack(State, Data);
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
do_handle_nack(State, Data);
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
do_handle_cancel(State, Data);
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
% TODO: do we need ack cancel? There isn't much you can do with the
% sequence information, you really just need the server to tell you what
% did get through and how it was interpreted.
do_handle_ack_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
do_handle_reliable(State, Op, Seq, Rest);
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_datagram(State, Unexpected) ->
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_ack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_nack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_ack_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
%%% Reliable message reordering
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
State#s{instructions = NewInstructions}.
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
% We need to go later than this. Move it to the 'earlier than us' pile.
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
% There is already an instruction with this sequence ID. Use that.
lists:reverse(Earlier, Is);
insert_instruction(Op, Seq, Data, Is, Earlier) ->
% If neither of the above conditions are met, we can add it here.
NewI = #instruction{op = Op, seq = Seq, data = Data},
lists:reverse(Earlier, [NewI | Is]).
%%% Reliable message execution
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
State.
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
@ -205,16 +271,8 @@ do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
@ -227,26 +285,10 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.

View File

@ -9,7 +9,7 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
%% Worker interface
-export([enroll/0]).
%% gen_server
@ -29,8 +29,9 @@
{chan_pid :: pid()}).
-record(route_info,
{conn_pid :: pid(),
{router :: pid(),
socket :: gen_udp:socket(),
side :: 0 | 1,
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
@ -41,19 +42,49 @@
%%% Service Interface
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
add_route(Sock, TheirIP, TheirPort, Side) ->
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
ok ->
transfer_socket(Sock, TheirIP, TheirPort);
{error, Reason} ->
{error, Reason}
end.
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
% A pretty complex function for an interface, but we need to transfer ownership
% of the socket.
transfer_socket(Sock, TheirIP, TheirPort) ->
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
ok ->
transfer_socket2(Sock, {TheirIP, TheirPort});
{error, Reason} ->
{error, Reason}
end.
transfer_socket2(Sock, Peer) ->
% Since it was transferred, any backlog of datagrams will now be in the
% channel manager's inbox. Turn it off from active, so that the manager
% knows that it will be fully caught up *before* the socket_transferred
% notification arrives.
case inet:setopts(Sock, [{active, false}]) of
ok ->
% Now the socket is all cleaned up, let the manager know.
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
{error, Reason} ->
{error, Reason}
end.
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_connection, if it doesn't know where to
% send something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
% Deliver messages on behalf of msp_router, if it doesn't know where to send
% something.
dispatch_fallback(Socket, Peer, ID, Packet) ->
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
%%% Worker Interface
@ -81,6 +112,12 @@ init(none) ->
{ok, State}.
handle_call({add_route, Sock, Route, Side}, _, State) ->
{Result, NewState} = do_add_route(Sock, Route, Side, State),
{reply, Result, NewState};
handle_call({socket_transferred, Sock, Peer}, _, State) ->
Result = do_socket_transferred(Sock, Peer, State),
{reply, Result, State};
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
@ -89,11 +126,8 @@ handle_call(Unexpected, From, State) ->
{noreply, State}.
handle_cast({update_router, Route, Conn, Sock}, State) ->
NewState = do_update_router(Route, Conn, Sock, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
@ -106,10 +140,23 @@ handle_cast(Unexpected, State) ->
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(Socket, Peer, [ID | PacketList], State) ->
Packet = list_to_binary(PacketList),
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(_, _, <<>>, State) ->
% Empty datagram... Ignore.
State;
handle_udp(_, _, [], State) ->
State.
handle_down(Mon, PID, Reason, State) ->
% TODO
@ -149,18 +196,56 @@ do_enroll2(PID, State, Channels) ->
State
end.
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route isn't registered already...
case maps:is_key(Route, Conns) of
true ->
Result = {error, already_added},
{Result, State};
false ->
do_add_route2(Sock, Route, Side, State)
end.
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
{ok, Router} = msp_router:start_link(),
NewConn = #route_info{router = Router,
socket = Sock,
side = Side,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
NewState = State#s{connections = NewConns},
{ok, NewState}.
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route is actually registered.
case maps:find(Route, Conns) of
{ok, _Info} ->
io:format("Updating routes not yet implemented.~n", []),
init:stop();
{ok, Conn} ->
do_socket_transferred2(Sock, Peer, Conn);
error ->
NewConn = #route_info{conn_pid = Conn,
socket = Sock,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}
{error, unknown_connection}
end.
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
->
msp_router:begin_routing(Router, Sock, Peer),
channel_catchup(Router, maps:iterator(Channels));
do_socket_transferred2(_, _, _) ->
{error, wrong_socket}.
% If we have created channels without spawning the router process, then at some
% point we will need to tell the router process what channels currently exist.
channel_catchup(Router, Iter) ->
case maps:next(Iter) of
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
msp_router:add_channel(Router, ChanID, Chan),
channel_catchup(Router, NewIter);
none ->
ok
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
@ -194,13 +279,23 @@ do_create_channel3(State, Route, Info, ChanID) ->
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
% Also add it to the router!
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
{Result, NewState}.
do_dispatch(Route, ID, Packet, State) ->
do_dispatch(Socket, Peer, ID, Packet, State) ->
{ok, OurPort} = inet:port(Socket),
Route = #route{from = OurPort, to = Peer},
do_dispatch2(Route, ID, Packet, State).
do_dispatch2(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
State
end.

48
src/msp_channel_sup.erl Normal file
View File

@ -0,0 +1,48 @@
%%% @doc
%%% Minimal Stream Protocol : Channel Worker Supervisor
%%% @end
-module(msp_channel_sup).
-vsn("0.1.0").
-behaviour(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_channel/0]).
-export([start_link/0]).
-export([init/1]).
-spec start_channel() -> Result
when Result :: {ok, pid()}
| {error, Reason},
Reason :: {already_started, pid()}
| {shutdown, term()}
| term().
start_channel() ->
supervisor:start_child(?MODULE, []).
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {simple_one_for_one, 1, 60},
Channel =
{msp_channel,
{msp_channel, start_link, []},
temporary,
brutal_kill,
worker,
[msp_channel]},
{ok, {RestartStrategy, [Channel]}}.

View File

@ -2,14 +2,14 @@
%%% Minimal Stream Protocol: Connection Worker
%%% @end
-module(msp_connection).
-module(msp_router).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([begin_msp/5]).
-export([begin_routing/3, add_channel/3]).
%% gen_server
-export([start_link/0]).
@ -28,7 +28,6 @@
-record(s,
{socket :: gen_udp:socket(),
peer :: endpoint(),
side :: 0 | 1,
connections = #{} :: #{integer() => channel()}}).
-type state() :: none | #s{}.
@ -37,18 +36,21 @@
% msp does not negotiate new connections, but once you have a
% socket, and a host that knows you will be talking MSP, then a
% new msp_connection can be initialized.
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
% new msp_router can be initialized.
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
% Transfer the socket to the gen_server. If it is
% active then a bunch of messages will be received
% at once, but that is fine.
case gen_udp:controlling_process(OurSock, Connection) of
case gen_udp:controlling_process(OurSock, Router) of
ok ->
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
{error, Reason} ->
{error, Reason}
end.
add_channel(Router, ChanID, ChanPID) ->
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
%%% gen_server
-spec start_link() -> Result
@ -59,29 +61,36 @@ start_link() ->
gen_server:start_link(?MODULE, none, []).
% TODO: Ask msp_connection_man for the socket and endpoint
% TODO: SWP? SWP of SWPs?
init(none) ->
{ok, none}.
handle_call({begin_routing, Sock, Peer}, _, none) ->
{Result, State} = do_begin_routing(Sock, Peer),
{reply, Result, State};
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
Result = {error, already_routing},
{reply, Result, State};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{reply, undef, State}.
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
State = do_begin_msp(Sock, Peer, Side),
{noreply, State};
handle_cast({add_channel, ChanID, ChanPID}, State) ->
NewState = do_add_channel(ChanID, ChanPID, State),
{noreply, NewState};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
log(debug, "Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
{noreply, State}.
@ -104,12 +113,17 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_msp(Sock, Peer, Side) ->
do_begin_routing(Sock, Peer) ->
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
State = #s{socket = Sock, peer = Peer},
{ok, State}.
do_add_channel(_, _, none) ->
% Do nothing... We will need to be told about these channels again later.
none;
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
NewConns = maps:put(ChanID, ChanPID, Conns),
State#s{connections = NewConns}.
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
@ -117,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?

View File

@ -6,6 +6,9 @@ spawn_tests() ->
ok.
run_tests_and_halt() ->
% Add a handler to print all log messages to the screen, so we can see why
% tests are failing, if they are.
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
Result = run_tests_protected(),
io:format("Tests returned ~p~n", [Result]),
halt().
@ -18,22 +21,21 @@ run_tests_protected() ->
end.
run_tests() ->
ok = send_test(),
%ok = send_test(),
%ok = out_of_order_test(),
ok = race_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
{ok, Pid} = msp_connection:start_link(),
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
{Pid, Sock}.
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5555,
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
PortA = 5511,
PortB = 5512,
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
@ -46,4 +48,36 @@ send_test() ->
timer:sleep(10),
ok.
out_of_order_test() ->
IP = {127, 0, 0, 1},
PortA = 5521,
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
% TODO: fix race_test and remove this sleep
timer:sleep(10),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
timer:sleep(10),
ok.
race_test() ->
IP = {127, 0, 0, 1},
PortA = 5531,
PortB = 5532,
{ok, SockA} = gen_udp:open(PortA),
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
timer:sleep(10),
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
timer:sleep(10),
ok.