Compare commits

..

No commits in common. "78ea3fc85e39b0fad51bfc42ab927f00b96fad94" and "0ccd64e2f1ce6cd977f96e9764d4968aef5cc622" have entirely different histories.

5 changed files with 83 additions and 314 deletions

View File

@ -75,19 +75,12 @@
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
-record(instruction,
{op :: integer(),
seq :: integer(),
data :: binary()}).
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer(),
instructions = [] :: [#instruction{}],
fragments = [] :: [binary()]}).
seq = 0 :: integer()}).
-type state() :: none | #s{}.
@ -125,7 +118,6 @@ handle_call(Unexpected, From, State) ->
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
log(debug, "Opening channel ~p~n", [Id]),
State = #s{chan_id = Id,
socket = Socket,
peer = Peer},
@ -171,7 +163,7 @@ do_send(Message, Mod, State = #s{chan_id = Id,
peer = Peer,
socket = Sock,
seq = Seq}) ->
log(debug, "~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
IsNewChannel = ChanMode == closed,
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
@ -189,76 +181,18 @@ choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
do_handle_lossy(State, Data);
do_handle_datagram(State, <<?OP_ACK:8, Data/binary>>) ->
do_handle_ack(State, Data);
do_handle_datagram(State, <<?OP_NACK:8, Data/binary>>) ->
do_handle_nack(State, Data);
do_handle_datagram(State, <<?OP_CANCEL:8, Data/binary>>) ->
do_handle_cancel(State, Data);
do_handle_datagram(State, <<?OP_ACK_CANCEL:8, Data/binary>>) ->
% TODO: do we need ack cancel? There isn't much you can do with the
% sequence information, you really just need the server to tell you what
% did get through and how it was interpreted.
do_handle_ack_cancel(State, Data);
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
log(debug, "Got opcode ~p, seq ~p. Adding to buffer.~n", [Op, Seq]),
% Reliable stream operations need to be ordered first.
NewState = do_buffer_reliable(State, Op, Seq, Rest),
do_execute_buffer(NewState);
do_handle_reliable(State, Op, Seq, Rest);
do_handle_datagram(State, Unexpected) ->
log(warning, "Got datagram ~p which is too short.~n", [Unexpected]),
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
State.
do_handle_lossy(State, Data) ->
io:format("Got lossy datagram: ~p~n", [Data]),
State.
do_handle_ack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_nack(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
do_handle_ack_cancel(State, Payload) ->
io:format("Unimplemented command. Data ~p~n", [Payload]),
State.
%%% Reliable message reordering
do_buffer_reliable(State = #s{instructions = Is}, Op, Seq, Rest) ->
NewInstructions = insert_instruction(Op, Seq, Rest, Is, []),
State#s{instructions = NewInstructions}.
insert_instruction(Op, Seq, Data, [Next = #instruction{seq = Seq2} | Rest], Earlier) when Seq2 < Seq ->
% We need to go later than this. Move it to the 'earlier than us' pile.
insert_instruction(Op, Seq, Data, Rest, [Next | Earlier]);
insert_instruction(_, Seq, _, Is = [#instruction{seq = Seq} | _], Earlier) ->
% There is already an instruction with this sequence ID. Use that.
lists:reverse(Earlier, Is);
insert_instruction(Op, Seq, Data, Is, Earlier) ->
% If neither of the above conditions are met, we can add it here.
NewI = #instruction{op = Op, seq = Seq, data = Data},
lists:reverse(Earlier, [NewI | Is]).
%%% Reliable message execution
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
State.
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, false, continue, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
@ -271,8 +205,16 @@ do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
do_handle_fragment(State, false, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
do_handle_fragment(State, true, close_channel, Seq, Payload);
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
do_handle_ack(State, Seq, Payload);
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
do_handle_status(State, Seq, Payload);
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
do_handle_nack(State, Seq, Payload);
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
do_handle_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
do_handle_ack_cancel(State, Seq, Payload);
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
do_handle_req_move(State, Seq, Payload);
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
@ -285,10 +227,26 @@ do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
State.
do_handle_ack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_nack(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_ack_cancel(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
do_handle_req_move(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.

View File

@ -9,7 +9,7 @@
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([add_route/4, create_channel/3, dispatch_fallback/4]).
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
%% Worker interface
-export([enroll/0]).
%% gen_server
@ -29,9 +29,8 @@
{chan_pid :: pid()}).
-record(route_info,
{router :: pid(),
{conn_pid :: pid(),
socket :: gen_udp:socket(),
side :: 0 | 1,
unused_channels :: [integer()],
used_channels :: #{integer() => #channel{}}}).
@ -42,49 +41,19 @@
%%% Service Interface
add_route(Sock, TheirIP, TheirPort, Side) ->
case add_passive_route(Sock, TheirIP, TheirPort, Side) of
ok ->
transfer_socket(Sock, TheirIP, TheirPort);
{error, Reason} ->
{error, Reason}
end.
add_passive_route(Sock, TheirIP, TheirPort, Side) ->
log(debug, "adding route for socket ~p to port ~p~n", [Sock, TheirPort]),
gen_server:call(?MODULE, {add_route, Sock, {TheirIP, TheirPort}, Side}).
% A pretty complex function for an interface, but we need to transfer ownership
% of the socket.
transfer_socket(Sock, TheirIP, TheirPort) ->
case gen_udp:controlling_process(Sock, whereis(?MODULE)) of
ok ->
transfer_socket2(Sock, {TheirIP, TheirPort});
{error, Reason} ->
{error, Reason}
end.
transfer_socket2(Sock, Peer) ->
% Since it was transferred, any backlog of datagrams will now be in the
% channel manager's inbox. Turn it off from active, so that the manager
% knows that it will be fully caught up *before* the socket_transferred
% notification arrives.
case inet:setopts(Sock, [{active, false}]) of
ok ->
% Now the socket is all cleaned up, let the manager know.
gen_server:call(?MODULE, {socket_transferred, Sock, Peer});
{error, Reason} ->
{error, Reason}
end.
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
create_channel(OurPort, TheirIP, TheirPort) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:call(?MODULE, {create_channel, Route}).
% Deliver messages on behalf of msp_router, if it doesn't know where to send
% something.
dispatch_fallback(Socket, Peer, ID, Packet) ->
gen_server:cast(?MODULE, {dispatch, Socket, Peer, ID, Packet}).
% Deliver messages on behalf of msp_connection, if it doesn't know where to
% send something.
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
%%% Worker Interface
@ -112,12 +81,6 @@ init(none) ->
{ok, State}.
handle_call({add_route, Sock, Route, Side}, _, State) ->
{Result, NewState} = do_add_route(Sock, Route, Side, State),
{reply, Result, NewState};
handle_call({socket_transferred, Sock, Peer}, _, State) ->
Result = do_socket_transferred(Sock, Peer, State),
{reply, Result, State};
handle_call({create_channel, Route}, _, State) ->
{Result, NewState} = do_create_channel(State, Route),
{reply, Result, NewState};
@ -126,8 +89,11 @@ handle_call(Unexpected, From, State) ->
{noreply, State}.
handle_cast({dispatch, Socket, Peer, ID, Packet}, State) ->
NewState = do_dispatch(Socket, Peer, ID, Packet, State),
handle_cast({update_router, Route, Conn, Sock}, State) ->
NewState = do_update_router(Route, Conn, Sock, State),
{noreply, NewState};
handle_cast({dispatch, Route, ID, Packet}, State) ->
NewState = do_dispatch(Route, ID, Packet, State),
{noreply, NewState};
handle_cast({enroll, PID}, State) ->
NewState = do_enroll(PID, State),
@ -140,23 +106,10 @@ handle_cast(Unexpected, State) ->
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
NewState = handle_down(Mon, PID, Reason, State),
{noreply, NewState};
handle_info({udp, Socket, TheirIP, TheirPort, Data}, State) ->
NewState = handle_udp(Socket, {TheirIP, TheirPort}, Data, State),
{noreply, NewState};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
handle_udp(Socket, Peer, <<ID:8, Packet/bytes>>, State) ->
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(Socket, Peer, [ID | PacketList], State) ->
Packet = list_to_binary(PacketList),
do_dispatch(Socket, Peer, ID, Packet, State);
handle_udp(_, _, <<>>, State) ->
% Empty datagram... Ignore.
State;
handle_udp(_, _, [], State) ->
State.
handle_down(Mon, PID, Reason, State) ->
% TODO
@ -196,56 +149,18 @@ do_enroll2(PID, State, Channels) ->
State
end.
do_add_route(Sock, Peer, Side, State = #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route isn't registered already...
case maps:is_key(Route, Conns) of
true ->
Result = {error, already_added},
{Result, State};
false ->
do_add_route2(Sock, Route, Side, State)
end.
do_add_route2(Sock, Route, Side, State = #s{connections = Conns}) ->
{ok, Router} = msp_router:start_link(),
NewConn = #route_info{router = Router,
socket = Sock,
side = Side,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
NewState = State#s{connections = NewConns},
{ok, NewState}.
do_socket_transferred(Sock, Peer, #s{connections = Conns}) ->
{ok, OurPort} = inet:port(Sock),
Route = #route{from = OurPort, to = Peer},
% Check that this route is actually registered.
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
case maps:find(Route, Conns) of
{ok, Conn} ->
do_socket_transferred2(Sock, Peer, Conn);
{ok, _Info} ->
io:format("Updating routes not yet implemented.~n", []),
init:stop();
error ->
{error, unknown_connection}
end.
do_socket_transferred2(Sock, Peer, #route_info{socket = Sock, router = Router, used_channels = Channels})
->
msp_router:begin_routing(Router, Sock, Peer),
channel_catchup(Router, maps:iterator(Channels));
do_socket_transferred2(_, _, _) ->
{error, wrong_socket}.
% If we have created channels without spawning the router process, then at some
% point we will need to tell the router process what channels currently exist.
channel_catchup(Router, Iter) ->
case maps:next(Iter) of
{ChanID, #channel{chan_pid = Chan}, NewIter} ->
msp_router:add_channel(Router, ChanID, Chan),
channel_catchup(Router, NewIter);
none ->
ok
NewConn = #route_info{conn_pid = Conn,
socket = Sock,
unused_channels = lists:seq(0, 255),
used_channels = #{}},
NewConns = maps:put(Route, NewConn, Conns),
State#s{connections = NewConns}
end.
do_create_channel(State = #s{connections = Conns}, Route) ->
@ -279,23 +194,13 @@ do_create_channel3(State, Route, Info, ChanID) ->
NewConns = maps:put(Route, NewInfo, State#s.connections),
NewState = State#s{connections = NewConns},
% Also add it to the router!
msp_router:add_channel(Info#route_info.router, ChanID, Chan),
{Result, NewState}.
do_dispatch(Socket, Peer, ID, Packet, State) ->
{ok, OurPort} = inet:port(Socket),
Route = #route{from = OurPort, to = Peer},
do_dispatch2(Route, ID, Packet, State).
do_dispatch2(Route, ID, Packet, State) ->
do_dispatch(Route, ID, Packet, State) ->
case maps:find(Route, State#s.connections) of
{ok, Info} ->
do_dispatch2(Route, ID, Packet, State, Info);
error ->
#route{from = From, to = {{X1, X2, X3, X4}, To}} = Route,
ok = log(warning, "Received packet from unknown host ~p.~p.~p.~p:~p on port ~p~n", [X1, X2, X3, X4, To, From]),
State
end.

View File

@ -1,48 +0,0 @@
%%% @doc
%%% Minimal Stream Protocol : Channel Worker Supervisor
%%% @end
-module(msp_channel_sup).
-vsn("0.1.0").
-behaviour(supervisor).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([start_channel/0]).
-export([start_link/0]).
-export([init/1]).
-spec start_channel() -> Result
when Result :: {ok, pid()}
| {error, Reason},
Reason :: {already_started, pid()}
| {shutdown, term()}
| term().
start_channel() ->
supervisor:start_child(?MODULE, []).
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
%% @private
%% The OTP init/1 function.
init(none) ->
RestartStrategy = {simple_one_for_one, 1, 60},
Channel =
{msp_channel,
{msp_channel, start_link, []},
temporary,
brutal_kill,
worker,
[msp_channel]},
{ok, {RestartStrategy, [Channel]}}.

View File

@ -2,14 +2,14 @@
%%% Minimal Stream Protocol: Connection Worker
%%% @end
-module(msp_router).
-module(msp_connection).
-vsn("0.1.0").
-behavior(gen_server).
-author("Jarvis Carroll <spiveehere@gmail.com>").
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
-license("MIT").
-export([begin_routing/3, add_channel/3]).
-export([begin_msp/5]).
%% gen_server
-export([start_link/0]).
@ -28,6 +28,7 @@
-record(s,
{socket :: gen_udp:socket(),
peer :: endpoint(),
side :: 0 | 1,
connections = #{} :: #{integer() => channel()}}).
-type state() :: none | #s{}.
@ -36,21 +37,18 @@
% msp does not negotiate new connections, but once you have a
% socket, and a host that knows you will be talking MSP, then a
% new msp_router can be initialized.
begin_routing(Router, OurSock, {TheirIP, TheirPort}) ->
% new msp_connection can be initialized.
begin_msp(Connection, OurSock, TheirIP, TheirPort, OurSide) ->
% Transfer the socket to the gen_server. If it is
% active then a bunch of messages will be received
% at once, but that is fine.
case gen_udp:controlling_process(OurSock, Router) of
case gen_udp:controlling_process(OurSock, Connection) of
ok ->
gen_server:call(Router, {begin_routing, OurSock, {TheirIP, TheirPort}});
gen_server:cast(Connection, {begin_msp, OurSock, {TheirIP, TheirPort}, OurSide});
{error, Reason} ->
{error, Reason}
end.
add_channel(Router, ChanID, ChanPID) ->
gen_server:cast(Router, {add_channel, ChanID, ChanPID}).
%%% gen_server
-spec start_link() -> Result
@ -61,36 +59,29 @@ start_link() ->
gen_server:start_link(?MODULE, none, []).
% TODO: SWP? SWP of SWPs?
% TODO: Ask msp_connection_man for the socket and endpoint
init(none) ->
{ok, none}.
handle_call({begin_routing, Sock, Peer}, _, none) ->
{Result, State} = do_begin_routing(Sock, Peer),
{reply, Result, State};
handle_call({begin_routing, _, _, _}, _, State = #s{}) ->
Result = {error, already_routing},
{reply, Result, State};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{reply, undef, State}.
handle_cast({add_channel, ChanID, ChanPID}, State) ->
NewState = do_add_channel(ChanID, ChanPID, State),
{noreply, NewState};
handle_cast({begin_msp, Sock, Peer, Side}, none) ->
State = do_begin_msp(Sock, Peer, Side),
{noreply, State};
handle_cast(Unexpected, State) ->
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
{noreply, State}.
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
log(debug, "Got packet: ~p~n", [Packet]),
do_dispatch(State, Packet),
{noreply, State};
handle_info(Unexpected, State) ->
ok = log(warning, "Unexpected info: ~tp~n", [Unexpected]),
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
{noreply, State}.
@ -113,17 +104,12 @@ terminate(_, _) ->
%%% Doer Functions
do_begin_routing(Sock, Peer) ->
do_begin_msp(Sock, Peer, Side) ->
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
State = #s{socket = Sock, peer = Peer},
{ok, State}.
do_add_channel(_, _, none) ->
% Do nothing... We will need to be told about these channels again later.
none;
do_add_channel(ChanID, ChanPID, State = #s{connections = Conns}) ->
NewConns = maps:put(ChanID, ChanPID, Conns),
State#s{connections = NewConns}.
State = #s{socket = Sock,
peer = Peer,
side = Side},
State.
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
ok = inet:setopts(Sock, [{active, once}]),
@ -131,7 +117,9 @@ do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/
{ok, Conn} ->
erlang:send(Conn, {msp_fragment, Packet});
error ->
msp_channel_man:dispatch_fallback(Sock, Peer, ID, Packet)
{ok, OurPort} = inet:port(Sock),
{TheirIP, TheirPort} = Peer,
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
end;
do_dispatch(_, <<>>) ->
% Empty datagram?

View File

@ -6,9 +6,6 @@ spawn_tests() ->
ok.
run_tests_and_halt() ->
% Add a handler to print all log messages to the screen, so we can see why
% tests are failing, if they are.
logger:add_handler(test, logger_std_h, #{config => #{type => standard_io}}),
Result = run_tests_protected(),
io:format("Tests returned ~p~n", [Result]),
halt().
@ -21,21 +18,22 @@ run_tests_protected() ->
end.
run_tests() ->
%ok = send_test(),
%ok = out_of_order_test(),
ok = race_test(),
ok = send_test(),
ok.
make_connection(OurPort, TheirIP, TheirPort, Side) ->
{ok, Sock} = gen_udp:open(OurPort),
ok = msp_channel_man:add_route(Sock, TheirIP, TheirPort, Side).
{ok, Pid} = msp_connection:start_link(),
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
{Pid, Sock}.
send_test() ->
IP = {127, 0, 0, 1},
PortA = 5511,
PortB = 5512,
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
PortA = 5555,
PortB = 6666,
{A, SockA} = make_connection(PortA, IP, PortB, 0),
{B, SockB} = make_connection(PortB, IP, PortA, 1),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
@ -48,36 +46,4 @@ send_test() ->
timer:sleep(10),
ok.
out_of_order_test() ->
IP = {127, 0, 0, 1},
PortA = 5521,
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
% TODO: fix race_test and remove this sleep
timer:sleep(10),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 4:8, 1:16, "second message">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 1:8, 0:16, "first message">>),
timer:sleep(10),
ok.
race_test() ->
IP = {127, 0, 0, 1},
PortA = 5531,
PortB = 5532,
{ok, SockA} = gen_udp:open(PortA),
{ok, SockB} = gen_udp:open(PortB, [{active, true}]),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 5:8, 0:16, "a message">>),
timer:sleep(10),
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
timer:sleep(10),
ok.