Compare commits
6 Commits
0ccd64e2f1
...
78ea3fc85e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78ea3fc85e | ||
|
|
e8ef0b4304 | ||
|
|
b6c16967e7 | ||
|
|
47612c2775 | ||
|
|
cfd9eb748f | ||
|
|
669b2e51e1 |
@ -75,12 +75,19 @@
|
||||
|
||||
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
||||
|
||||
-record(instruction,
|
||||
{op :: integer(),
|
||||
seq :: integer(),
|
||||
data :: binary()}).
|
||||
|
||||
-record(s,
|
||||
{chan_id :: integer(),
|
||||
socket :: gen_udp:socket(),
|
||||
peer :: endpoint(), % TODO configure the socket in advance?
|
||||
state = closed :: closed | sending | receiving,
|
||||
seq = 0 :: integer()}).
|
||||
seq = 0 :: integer(),
|
||||
instructions = [] :: [#instruction{}],
|
||||
fragments = [] :: [binary()]}).
|
||||
|
||||
|
||||
-type state() :: none | #s{}.
|
||||
@ -118,6 +125,7 @@ handle_call(Unexpected, From, State) ->
|
||||
|
||||
|
||||
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
|
||||
log(debug, "Opening channel ~p~n", [Id]),
|
||||
State = #s{chan_id = Id,
|
||||
socket = Socket,
|
||||
peer = Peer},
|
||||
@ -163,7 +171,7 @@ do_send(Message, Mod, State = #s{chan_id = Id,
|
||||
peer = Peer,
|
||||
socket = Sock,
|
||||
seq = Seq}) ->
|
||||
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
||||
IsNewChannel = ChanMode == closed,
|
||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
||||
@ -181,18 +189,76 @@ choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
||||
|
||||
%%% Receive Doer Functions
|
||||
|
||||
% First handle the messages that don't need to be reordered
|
||||
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
|
||||
do_handle_lossy(State, Data);
|
||||
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
|
||||
do_handle_ack(State, Data);
|
||||
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
|
||||
do_handle_nack(State, Data);
|
||||
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
|
||||
do_handle_cancel(State, Data);
|
||||
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
|
||||
% TODO: do we need ack cancel? There isn't much you can do with the
|
||||
% sequence information, you really just need the server to tell you what
|
||||
% did get through and how it was interpreted.
|
||||
do_handle_ack_cancel(State, Data);
|
||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
||||
do_handle_reliable(State, Op, Seq, Rest);
|
||||
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
|
||||
% Reliable stream operations need to be ordered first.
|
||||
NewState = do_buffer_reliable(State, Op, Seq, Rest),
|
||||
do_execute_buffer(NewState);
|
||||
do_handle_datagram(State, Unexpected) ->
|
||||
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
|
||||
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
|
||||
State.
|
||||
|
||||
do_handle_lossy(State, Data) ->
|
||||
io:format("Got lossy datagram: ~p~n", [Data]),
|
||||
State.
|
||||
|
||||
do_handle_ack(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_nack(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_cancel(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
do_handle_ack_cancel(State, Payload) ->
|
||||
io:format("Unimplemented command. Data ~p~n", [Payload]),
|
||||
State.
|
||||
|
||||
%%% Reliable message reordering
|
||||
|
||||
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
|
||||
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
|
||||
State#s{instructions = NewInstructions}.
|
||||
|
||||
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
|
||||
% We need to go later than this. Move it to the 'earlier than us' pile.
|
||||
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
|
||||
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
|
||||
% There is already an instruction with this sequence ID. Use that.
|
||||
lists:reverse(Earlier, Is);
|
||||
insert_instruction(Op, Seq, Data, Is, Earlier) ->
|
||||
% If neither of the above conditions are met, we can add it here.
|
||||
NewI = #instruction{op = Op, seq = Seq, data = Data},
|
||||
lists:reverse(Earlier, [NewI | Is]).
|
||||
|
||||
%%% Reliable message execution
|
||||
|
||||
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
|
||||
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
|
||||
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
|
||||
do_execute_buffer(NewState);
|
||||
do_execute_buffer(State) ->
|
||||
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
|
||||
State.
|
||||
|
||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
||||
@ -205,16 +271,8 @@ do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
|
||||
do_handle_ack(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
||||
do_handle_status(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
|
||||
do_handle_nack(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
|
||||
do_handle_cancel(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
|
||||
do_handle_ack_cancel(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
||||
do_handle_req_move(State, Seq, Payload);
|
||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
||||
@ -227,26 +285,10 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
||||
State.
|
||||
|
||||
do_handle_ack(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_status(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_nack(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_cancel(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_ack_cancel(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
do_handle_req_move(State, Seq, Payload) ->
|
||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
||||
State.
|
||||
|
||||
@ -9,7 +9,7 @@
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
|
||||
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
|
||||
%% Worker interface
|
||||
-export([enroll/0]).
|
||||
%% gen_server
|
||||
@ -29,8 +29,9 @@
|
||||
{chan_pid :: pid()}).
|
||||
|
||||
-record(route_info,
|
||||
{conn_pid :: pid(),
|
||||
{router :: pid(),
|
||||
socket :: gen_udp:socket(),
|
||||
side :: 0 | 1,
|
||||
unused_channels :: [integer()],
|
||||
used_channels :: #{integer() => #channel{}}}).
|
||||
|
||||
@ -41,19 +42,49 @@
|
||||
|
||||
%%% Service Interface
|
||||
|
||||
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
|
||||
add_route(Sock, TheirIP, TheirPort, Side) ->
|
||||
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
|
||||
ok ->
|
||||
transfer_socket(Sock, TheirIP, TheirPort);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
|
||||
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
|
||||
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
|
||||
|
||||
% A pretty complex function for an interface, but we need to transfer ownership
|
||||
% of the socket.
|
||||
transfer_socket(Sock, TheirIP, TheirPort) ->
|
||||
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
|
||||
ok ->
|
||||
transfer_socket2(Sock, {TheirIP, TheirPort});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
transfer_socket2(Sock, Peer) ->
|
||||
% Since it was transferred, any backlog of datagrams will now be in the
|
||||
% channel manager's inbox. Turn it off from active, so that the manager
|
||||
% knows that it will be fully caught up *before* the socket_transferred
|
||||
% notification arrives.
|
||||
case inet:setopts(Sock, [{active, false}]) of
|
||||
ok ->
|
||||
% Now the socket is all cleaned up, let the manager know.
|
||||
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
create_channel(OurPort, TheirIP, TheirPort) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:call(?MODULE, {create_channel, Route}).
|
||||
|
||||
% Deliver messages on behalf of msp_connection, if it doesn't know where to
|
||||
% send something.
|
||||
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
|
||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
||||
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
|
||||
% Deliver messages on behalf of msp_router, if it doesn't know where to send
|
||||
% something.
|
||||
dispatch_fallback(Socket, Peer, ID, Packet) ->
|
||||
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
|
||||
|
||||
%%% Worker Interface
|
||||
|
||||
@ -81,6 +112,12 @@ init(none) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
handle_call({add_route, Sock, Route, Side}, _, State) ->
|
||||
{Result, NewState} = do_add_route(Sock, Route, Side, State),
|
||||
{reply, Result, NewState};
|
||||
handle_call({socket_transferred, Sock, Peer}, _, State) ->
|
||||
Result = do_socket_transferred(Sock, Peer, State),
|
||||
{reply, Result, State};
|
||||
handle_call({create_channel, Route}, _, State) ->
|
||||
{Result, NewState} = do_create_channel(State, Route),
|
||||
{reply, Result, NewState};
|
||||
@ -89,11 +126,8 @@ handle_call(Unexpected, From, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_cast({update_router, Route, Conn, Sock}, State) ->
|
||||
NewState = do_update_router(Route, Conn, Sock, State),
|
||||
{noreply, NewState};
|
||||
handle_cast({dispatch, Route, ID, Packet}, State) ->
|
||||
NewState = do_dispatch(Route, ID, Packet, State),
|
||||
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
|
||||
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
|
||||
{noreply, NewState};
|
||||
handle_cast({enroll, PID}, State) ->
|
||||
NewState = do_enroll(PID, State),
|
||||
@ -106,10 +140,23 @@ handle_cast(Unexpected, State) ->
|
||||
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
|
||||
NewState = handle_down(Mon, PID, Reason, State),
|
||||
{noreply, NewState};
|
||||
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
|
||||
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
|
||||
{noreply, NewState};
|
||||
handle_info(Unexpected, State) ->
|
||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
|
||||
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||
handle_udp(Socket, Peer, [ID | PacketList], State) ->
|
||||
Packet = list_to_binary(PacketList),
|
||||
do_dispatch(Socket, Peer, ID, Packet, State);
|
||||
handle_udp(_, _, <<>>, State) ->
|
||||
% Empty datagram... Ignore.
|
||||
State;
|
||||
handle_udp(_, _, [], State) ->
|
||||
State.
|
||||
|
||||
handle_down(Mon, PID, Reason, State) ->
|
||||
% TODO
|
||||
@ -149,18 +196,56 @@ do_enroll2(PID, State, Channels) ->
|
||||
State
|
||||
end.
|
||||
|
||||
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
|
||||
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
% Check that this route isn't registered already...
|
||||
case maps:is_key(Route, Conns) of
|
||||
true ->
|
||||
Result = {error, already_added},
|
||||
{Result, State};
|
||||
false ->
|
||||
do_add_route2(Sock, Route, Side, State)
|
||||
end.
|
||||
|
||||
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
|
||||
{ok, Router} = msp_router:start_link(),
|
||||
NewConn = #route_info{router = Router,
|
||||
socket = Sock,
|
||||
side = Side,
|
||||
unused_channels = lists:seq(0, 255),
|
||||
used_channels = #{}},
|
||||
NewConns = maps:put(Route, NewConn, Conns),
|
||||
NewState = State#s{connections = NewConns},
|
||||
{ok, NewState}.
|
||||
|
||||
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
% Check that this route is actually registered.
|
||||
case maps:find(Route, Conns) of
|
||||
{ok, _Info} ->
|
||||
io:format("Updating routes not yet implemented.~n", []),
|
||||
init:stop();
|
||||
{ok, Conn} ->
|
||||
do_socket_transferred2(Sock, Peer, Conn);
|
||||
error ->
|
||||
NewConn = #route_info{conn_pid = Conn,
|
||||
socket = Sock,
|
||||
unused_channels = lists:seq(0, 255),
|
||||
used_channels = #{}},
|
||||
NewConns = maps:put(Route, NewConn, Conns),
|
||||
State#s{connections = NewConns}
|
||||
{error, unknown_connection}
|
||||
end.
|
||||
|
||||
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
|
||||
->
|
||||
msp_router:begin_routing(Router, Sock, Peer),
|
||||
channel_catchup(Router, maps:iterator(Channels));
|
||||
do_socket_transferred2(_, _, _) ->
|
||||
{error, wrong_socket}.
|
||||
|
||||
% If we have created channels without spawning the router process, then at some
|
||||
% point we will need to tell the router process what channels currently exist.
|
||||
channel_catchup(Router, Iter) ->
|
||||
case maps:next(Iter) of
|
||||
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
|
||||
msp_router:add_channel(Router, ChanID, Chan),
|
||||
channel_catchup(Router, NewIter);
|
||||
none ->
|
||||
ok
|
||||
end.
|
||||
|
||||
do_create_channel(State = #s{connections = Conns}, Route) ->
|
||||
@ -194,13 +279,23 @@ do_create_channel3(State, Route, Info, ChanID) ->
|
||||
NewConns = maps:put(Route, NewInfo, State#s.connections),
|
||||
NewState = State#s{connections = NewConns},
|
||||
|
||||
% Also add it to the router!
|
||||
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
|
||||
|
||||
{Result, NewState}.
|
||||
|
||||
do_dispatch(Route, ID, Packet, State) ->
|
||||
do_dispatch(Socket, Peer, ID, Packet, State) ->
|
||||
{ok, OurPort} = inet:port(Socket),
|
||||
Route = #route{from = OurPort, to = Peer},
|
||||
do_dispatch2(Route, ID, Packet, State).
|
||||
|
||||
do_dispatch2(Route, ID, Packet, State) ->
|
||||
case maps:find(Route, State#s.connections) of
|
||||
{ok, Info} ->
|
||||
do_dispatch2(Route, ID, Packet, State, Info);
|
||||
error ->
|
||||
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
|
||||
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
|
||||
State
|
||||
end.
|
||||
|
||||
|
||||
48
src/msp_channel_sup.erl
Normal file
48
src/msp_channel_sup.erl
Normal file
@ -0,0 +1,48 @@
|
||||
%%% @doc
|
||||
%%% Minimal Stream Protocol : Channel Worker Supervisor
|
||||
%%% @end
|
||||
|
||||
-module(msp_channel_sup).
|
||||
-vsn("0.1.0").
|
||||
-behaviour(supervisor).
|
||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
|
||||
-export([start_channel/0]).
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
|
||||
-spec start_channel() -> Result
|
||||
when Result :: {ok, pid()}
|
||||
| {error, Reason},
|
||||
Reason :: {already_started, pid()}
|
||||
| {shutdown, term()}
|
||||
| term().
|
||||
|
||||
start_channel() ->
|
||||
supervisor:start_child(?MODULE, []).
|
||||
|
||||
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
|
||||
|
||||
|
||||
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
||||
%% @private
|
||||
%% The OTP init/1 function.
|
||||
|
||||
init(none) ->
|
||||
RestartStrategy = {simple_one_for_one, 1, 60},
|
||||
Channel =
|
||||
{msp_channel,
|
||||
{msp_channel, start_link, []},
|
||||
temporary,
|
||||
brutal_kill,
|
||||
worker,
|
||||
[msp_channel]},
|
||||
{ok, {RestartStrategy, [Channel]}}.
|
||||
@ -2,14 +2,14 @@
|
||||
%%% Minimal Stream Protocol: Connection Worker
|
||||
%%% @end
|
||||
|
||||
-module(msp_connection).
|
||||
-module(msp_router).
|
||||
-vsn("0.1.0").
|
||||
-behavior(gen_server).
|
||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
||||
-license("MIT").
|
||||
|
||||
-export([begin_msp/5]).
|
||||
-export([begin_routing/3, add_channel/3]).
|
||||
|
||||
%% gen_server
|
||||
-export([start_link/0]).
|
||||
@ -28,7 +28,6 @@
|
||||
-record(s,
|
||||
{socket :: gen_udp:socket(),
|
||||
peer :: endpoint(),
|
||||
side :: 0 | 1,
|
||||
connections = #{} :: #{integer() => channel()}}).
|
||||
|
||||
-type state() :: none | #s{}.
|
||||
@ -37,18 +36,21 @@
|
||||
|
||||
% msp does not negotiate new connections, but once you have a
|
||||
% socket, and a host that knows you will be talking MSP, then a
|
||||
% new msp_connection can be initialized.
|
||||
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
|
||||
% new msp_router can be initialized.
|
||||
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
|
||||
% Transfer the socket to the gen_server. If it is
|
||||
% active then a bunch of messages will be received
|
||||
% at once, but that is fine.
|
||||
case gen_udp:controlling_process(OurSock, Connection) of
|
||||
case gen_udp:controlling_process(OurSock, Router) of
|
||||
ok ->
|
||||
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
|
||||
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
add_channel(Router, ChanID, ChanPID) ->
|
||||
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
|
||||
|
||||
%%% gen_server
|
||||
|
||||
-spec start_link() -> Result
|
||||
@ -59,29 +61,36 @@ start_link() ->
|
||||
gen_server:start_link(?MODULE, none, []).
|
||||
|
||||
|
||||
% TODO: Ask msp_connection_man for the socket and endpoint
|
||||
% TODO: SWP? SWP of SWPs?
|
||||
init(none) ->
|
||||
{ok, none}.
|
||||
|
||||
|
||||
handle_call({begin_routing, Sock, Peer}, _, none) ->
|
||||
{Result, State} = do_begin_routing(Sock, Peer),
|
||||
{reply, Result, State};
|
||||
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
|
||||
Result = {error, already_routing},
|
||||
{reply, Result, State};
|
||||
handle_call(Unexpected, From, State) ->
|
||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
||||
{reply, undef, State}.
|
||||
|
||||
|
||||
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
|
||||
State = do_begin_msp(Sock, Peer, Side),
|
||||
{noreply, State};
|
||||
handle_cast({add_channel, ChanID, ChanPID}, State) ->
|
||||
NewState = do_add_channel(ChanID, ChanPID, State),
|
||||
{noreply, NewState};
|
||||
handle_cast(Unexpected, State) ->
|
||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
||||
log(debug, "Got packet: ~p~n", [Packet]),
|
||||
do_dispatch(State, Packet),
|
||||
{noreply, State};
|
||||
handle_info(Unexpected, State) ->
|
||||
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
@ -104,12 +113,17 @@ terminate(_, _) ->
|
||||
|
||||
%%% Doer Functions
|
||||
|
||||
do_begin_msp(Sock, Peer, Side) ->
|
||||
do_begin_routing(Sock, Peer) ->
|
||||
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
|
||||
State = #s{socket = Sock,
|
||||
peer = Peer,
|
||||
side = Side},
|
||||
State.
|
||||
State = #s{socket = Sock, peer = Peer},
|
||||
{ok, State}.
|
||||
|
||||
do_add_channel(_, _, none) ->
|
||||
% Do nothing... We will need to be told about these channels again later.
|
||||
none;
|
||||
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
|
||||
NewConns = maps:put(ChanID, ChanPID, Conns),
|
||||
State#s{connections = NewConns}.
|
||||
|
||||
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
|
||||
ok = inet:setopts(Sock, [{active, once}]),
|
||||
@ -117,9 +131,7 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
|
||||
{ok, Conn} ->
|
||||
erlang:send(Conn, {msp_fragment, Packet});
|
||||
error ->
|
||||
{ok, OurPort} = inet:port(Sock),
|
||||
{TheirIP, TheirPort} = Peer,
|
||||
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
|
||||
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
|
||||
end;
|
||||
do_dispatch(_, <<>>) ->
|
||||
% Empty datagram?
|
||||
@ -6,6 +6,9 @@ spawn_tests() ->
|
||||
ok.
|
||||
|
||||
run_tests_and_halt() ->
|
||||
% Add a handler to print all log messages to the screen, so we can see why
|
||||
% tests are failing, if they are.
|
||||
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
|
||||
Result = run_tests_protected(),
|
||||
io:format("Tests returned ~p~n", [Result]),
|
||||
halt().
|
||||
@ -18,22 +21,21 @@ run_tests_protected() ->
|
||||
end.
|
||||
|
||||
run_tests() ->
|
||||
ok = send_test(),
|
||||
%ok = send_test(),
|
||||
%ok = out_of_order_test(),
|
||||
ok = race_test(),
|
||||
ok.
|
||||
|
||||
make_connection(OurPort, TheirIP, TheirPort, Side) ->
|
||||
{ok, Sock} = gen_udp:open(OurPort),
|
||||
{ok, Pid} = msp_connection:start_link(),
|
||||
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
|
||||
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
|
||||
{Pid, Sock}.
|
||||
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
|
||||
|
||||
send_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5555,
|
||||
PortB = 6666,
|
||||
{A, SockA} = make_connection(PortA, IP, PortB, 0),
|
||||
{B, SockB} = make_connection(PortB, IP, PortA, 1),
|
||||
PortA = 5511,
|
||||
PortB = 5512,
|
||||
ok = make_connection(PortA, IP, PortB, 0),
|
||||
ok = make_connection(PortB, IP, PortA, 1),
|
||||
|
||||
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
||||
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
|
||||
@ -46,4 +48,36 @@ send_test() ->
|
||||
timer:sleep(10),
|
||||
ok.
|
||||
|
||||
out_of_order_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5521,
|
||||
PortB = 5522,
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
ok = make_connection(PortB, IP, PortA, 0),
|
||||
|
||||
% TODO: fix race_test and remove this sleep
|
||||
timer:sleep(10),
|
||||
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
ok.
|
||||
|
||||
race_test() ->
|
||||
IP = {127, 0, 0, 1},
|
||||
PortA = 5531,
|
||||
PortB = 5532,
|
||||
{ok, SockA} = gen_udp:open(PortA),
|
||||
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
|
||||
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
|
||||
|
||||
timer:sleep(10),
|
||||
|
||||
ok.
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user