Compare commits
No commits in common. "0ccd64e2f1ce6cd977f96e9764d4968aef5cc622" and "587fa1710c19f5e97d57c595592fae0e59513bb3" have entirely different histories.
0ccd64e2f1
...
587fa1710c
@ -1,253 +0,0 @@
|
|||||||
%%% @doc
|
|
||||||
%%% Minimal Stream Protocol: Channel Worker
|
|
||||||
%%% @end
|
|
||||||
|
|
||||||
-module(msp_channel).
|
|
||||||
-vsn("0.1.0").
|
|
||||||
-behavior(gen_server).
|
|
||||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-license("MIT").
|
|
||||||
|
|
||||||
-export([configure_channel/4]).
|
|
||||||
-export([send_and_close/2]).
|
|
||||||
|
|
||||||
%% gen_server
|
|
||||||
-export([start_link/0]).
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
||||||
code_change/3, terminate/2]).
|
|
||||||
-include("$zx_include/zx_logger.hrl").
|
|
||||||
|
|
||||||
|
|
||||||
%%% Opcodes
|
|
||||||
|
|
||||||
% Fragment/message opcodes:
|
|
||||||
% _____________________________________________________________________________
|
|
||||||
% OP_FRAGMENT | Part of a message/stream
|
|
||||||
% OP_OPEN | Open a new channel, and send the first fragment of the
|
|
||||||
% | first message in it.
|
|
||||||
% OP_END_MESSAGE | End of a message/stream, expects a response
|
|
||||||
% OP_OPEN_END | Open a new channel, and send an entire message
|
|
||||||
% OP_CLOSE | End of a message/stream, do not respond, close the channel
|
|
||||||
% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the
|
|
||||||
% | channel, all in one datagram. Sort of like
|
|
||||||
% | gen_server:cast/2, or if UDP were reliable.
|
|
||||||
%
|
|
||||||
% Transport Control Stuff
|
|
||||||
% -----------------------------------------------------------------------------
|
|
||||||
% OP_ACK | Acknowledge that instructions up to N have been received.
|
|
||||||
% OP_STATUS | Request an ACK for this exact command. Has many uses -
|
|
||||||
% | diagnostics, benchmarking, keepalive, dropped ACKs, or just
|
|
||||||
% | to stop the receiver from coalescing ACKs too much.
|
|
||||||
% OP_NACK | Instruction N has not been received.
|
|
||||||
% OP_CANCEL | Sender is hoping that messages M through N weren't
|
|
||||||
% | delivered, and can be cancelled.
|
|
||||||
% OP_ACK_CANCEL | Respond to OP_CANCEL.
|
|
||||||
%
|
|
||||||
% Stream Negotiation
|
|
||||||
% _____________________________________________________________________________
|
|
||||||
% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and
|
|
||||||
% | should be moved to a different ID.
|
|
||||||
%
|
|
||||||
% Lossy
|
|
||||||
% _____________________________________________________________________________
|
|
||||||
% OP_OPEN_DIRTY | Like OP_OPEN, but marks the channel as 'dirty', meaning
|
|
||||||
% | it is allowed to send lossy datagrams too, for voip, etc.
|
|
||||||
% OP_LOSSY | A lossy datagram, for the application to deal with.
|
|
||||||
|
|
||||||
% Numbers aren't final.
|
|
||||||
-define(OP_FRAGMENT, 0).
|
|
||||||
-define(OP_OPEN, 1).
|
|
||||||
-define(OP_END_MESSAGE, 2).
|
|
||||||
-define(OP_OPEN_END, 3).
|
|
||||||
-define(OP_CLOSE, 4).
|
|
||||||
-define(OP_OPEN_CLOSE, 5).
|
|
||||||
-define(OP_ACK, 6).
|
|
||||||
-define(OP_STATUS, 7).
|
|
||||||
-define(OP_NACK, 8).
|
|
||||||
-define(OP_CANCEL, 9).
|
|
||||||
-define(OP_ACK_CANCEL, 10).
|
|
||||||
-define(OP_REQ_MOVE, 11).
|
|
||||||
-define(OP_OPEN_DIRTY, 12).
|
|
||||||
-define(OP_LOSSY, 13).
|
|
||||||
|
|
||||||
%%% Type and Record Definitions
|
|
||||||
|
|
||||||
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
|
||||||
|
|
||||||
-record(s,
|
|
||||||
{chan_id :: integer(),
|
|
||||||
socket :: gen_udp:socket(),
|
|
||||||
peer :: endpoint(), % TODO configure the socket in advance?
|
|
||||||
state = closed :: closed | sending | receiving,
|
|
||||||
seq = 0 :: integer()}).
|
|
||||||
|
|
||||||
|
|
||||||
-type state() :: none | #s{}.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% Interface
|
|
||||||
|
|
||||||
configure_channel(Pid, Id, Socket, Peer) ->
|
|
||||||
gen_server:cast(Pid, {configure_channel, Id, Socket, Peer}).
|
|
||||||
|
|
||||||
send_and_close(Chan, Message) ->
|
|
||||||
gen_server:call(Chan, {send, Message, close}).
|
|
||||||
|
|
||||||
%%% gen_server
|
|
||||||
|
|
||||||
-spec start_link() -> Result
|
|
||||||
when Result :: {ok, pid()}
|
|
||||||
| {error, Reason :: term()}.
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link(?MODULE, none, []).
|
|
||||||
|
|
||||||
|
|
||||||
init(none) ->
|
|
||||||
{ok, none}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_call({send, Message, Mod}, _, State) ->
|
|
||||||
{Result, NewState} = do_send(Message, Mod, State),
|
|
||||||
{reply, Result, NewState};
|
|
||||||
handle_call(Unexpected, From, State) ->
|
|
||||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_cast({configure_channel, Id, Socket, Peer}, none) ->
|
|
||||||
State = #s{chan_id = Id,
|
|
||||||
socket = Socket,
|
|
||||||
peer = Peer},
|
|
||||||
{noreply, State};
|
|
||||||
handle_cast(Unexpected, State) ->
|
|
||||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_info({msp_fragment, Packet}, State) ->
|
|
||||||
NewState = do_handle_datagram(State, Packet),
|
|
||||||
{noreply, NewState};
|
|
||||||
handle_info(Unexpected, State) ->
|
|
||||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason}
|
|
||||||
when OldVersion :: Version | {old, Version},
|
|
||||||
Version :: term(),
|
|
||||||
State :: state(),
|
|
||||||
Extra :: term(),
|
|
||||||
NewState :: term(),
|
|
||||||
Reason :: term().
|
|
||||||
|
|
||||||
code_change(_, State, _) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
|
|
||||||
terminate(_, _) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% Send Doer Functions
|
|
||||||
|
|
||||||
do_send(_, open_dirty, State = #s{state = ChanMode}) when ChanMode /= closed ->
|
|
||||||
{{error, already_open}, State};
|
|
||||||
do_send(_, _, State = #s{state = receiving}) ->
|
|
||||||
{{error, not_sending}, State};
|
|
||||||
do_send(Message, Mod, State = #s{chan_id = Id,
|
|
||||||
state = ChanMode,
|
|
||||||
peer = Peer,
|
|
||||||
socket = Sock,
|
|
||||||
seq = Seq}) ->
|
|
||||||
io:format("~p is sending a message to ~p, seq ~p~n", [self(), Peer, Id]),
|
|
||||||
IsNewChannel = ChanMode == closed,
|
|
||||||
{Op, NewChanMode} = choose_opcode(IsNewChannel, Mod),
|
|
||||||
Datagram = <<Id:8, Op:8, Seq:16, Message/bytes>>,
|
|
||||||
gen_udp:send(Sock, Peer, Datagram),
|
|
||||||
NewState = State#s{state = NewChanMode, seq = Seq + 1},
|
|
||||||
{ok, NewState}.
|
|
||||||
|
|
||||||
choose_opcode(false, none) -> {?OP_FRAGMENT, sending};
|
|
||||||
choose_opcode(true, none) -> {?OP_OPEN, sending};
|
|
||||||
choose_opcode(false, end_message) -> {?OP_END_MESSAGE, receiving};
|
|
||||||
choose_opcode(true, end_message) -> {?OP_OPEN_END, receiving};
|
|
||||||
choose_opcode(false, close) -> {?OP_OPEN_END, closed};
|
|
||||||
choose_opcode(true, close) -> {?OP_OPEN_CLOSE, closed};
|
|
||||||
choose_opcode(true, open_dirty) -> {?OP_OPEN_DIRTY, sending}.
|
|
||||||
|
|
||||||
%%% Receive Doer Functions
|
|
||||||
|
|
||||||
do_handle_datagram(State, <<?OP_LOSSY:8, Data/binary>>) ->
|
|
||||||
do_handle_lossy(State, Data);
|
|
||||||
do_handle_datagram(State, <<Op:8, Seq:16, Rest/binary>>) ->
|
|
||||||
do_handle_reliable(State, Op, Seq, Rest);
|
|
||||||
do_handle_datagram(State, Unexpected) ->
|
|
||||||
io:format("Got datagram ~p which is too short.~n", [Unexpected]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_lossy(State, Data) ->
|
|
||||||
io:format("Got lossy datagram: ~p~n", [Data]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, false, continue, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, continue, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, false, end_message, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, end_message, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, false, close_channel, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, close_channel, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_ACK, Seq, Payload) ->
|
|
||||||
do_handle_ack(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_STATUS, Seq, Payload) ->
|
|
||||||
do_handle_status(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_NACK, Seq, Payload) ->
|
|
||||||
do_handle_nack(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) ->
|
|
||||||
do_handle_cancel(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) ->
|
|
||||||
do_handle_ack_cancel(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) ->
|
|
||||||
do_handle_req_move(State, Seq, Payload);
|
|
||||||
do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) ->
|
|
||||||
do_handle_fragment(State, true, dirty, Seq, Payload);
|
|
||||||
do_handle_reliable(State, Unknown, _, _) ->
|
|
||||||
io:format("Got unexpected opcode ~p~n", [Unknown]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_fragment(State, NewChannel, Mod, Seq, Payload) ->
|
|
||||||
io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_ack(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_status(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_nack(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_cancel(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_ack_cancel(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_handle_req_move(State, Seq, Payload) ->
|
|
||||||
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
@ -1,219 +0,0 @@
|
|||||||
%%% @doc
|
|
||||||
%%% Minimal Stream Protocol: Channel Worker Manager
|
|
||||||
%%% @end
|
|
||||||
|
|
||||||
-module(msp_channel_man).
|
|
||||||
-vsn("0.1.0").
|
|
||||||
-behavior(gen_server).
|
|
||||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-license("MIT").
|
|
||||||
|
|
||||||
-export([update_router/5, create_channel/3, dispatch_fallback/5]).
|
|
||||||
%% Worker interface
|
|
||||||
-export([enroll/0]).
|
|
||||||
%% gen_server
|
|
||||||
-export([start_link/0]).
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
||||||
code_change/3, terminate/2]).
|
|
||||||
-include("$zx_include/zx_logger.hrl").
|
|
||||||
|
|
||||||
|
|
||||||
%%% Type and Record Definitions
|
|
||||||
|
|
||||||
-type endpoint() :: {inet:ip_address(), inet:port_number()}.
|
|
||||||
-record(route, {from :: inet:port_number(),
|
|
||||||
to :: endpoint()}).
|
|
||||||
|
|
||||||
-record(channel,
|
|
||||||
{chan_pid :: pid()}).
|
|
||||||
|
|
||||||
-record(route_info,
|
|
||||||
{conn_pid :: pid(),
|
|
||||||
socket :: gen_udp:socket(),
|
|
||||||
unused_channels :: [integer()],
|
|
||||||
used_channels :: #{integer() => #channel{}}}).
|
|
||||||
|
|
||||||
-record(s,
|
|
||||||
{connections = #{} :: #{#route{} => #route_info{}}}).
|
|
||||||
|
|
||||||
-type state() :: #s{}.
|
|
||||||
|
|
||||||
%%% Service Interface
|
|
||||||
|
|
||||||
update_router(OurPort, TheirIP, TheirPort, Conn, Sock) ->
|
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
|
||||||
gen_server:cast(?MODULE, {update_router, Route, Conn, Sock}).
|
|
||||||
|
|
||||||
create_channel(OurPort, TheirIP, TheirPort) ->
|
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
|
||||||
gen_server:call(?MODULE, {create_channel, Route}).
|
|
||||||
|
|
||||||
% Deliver messages on behalf of msp_connection, if it doesn't know where to
|
|
||||||
% send something.
|
|
||||||
dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet) ->
|
|
||||||
Route = #route{from = OurPort, to = {TheirIP, TheirPort}},
|
|
||||||
gen_server:cast(?MODULE, {dispatch, Route, ID, Packet}).
|
|
||||||
|
|
||||||
%%% Worker Interface
|
|
||||||
|
|
||||||
-spec enroll() -> ok.
|
|
||||||
%% @doc
|
|
||||||
%% Workers register here after they initialize.
|
|
||||||
|
|
||||||
enroll() ->
|
|
||||||
gen_server:cast(?MODULE, {enroll, self()}).
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% gen_server
|
|
||||||
|
|
||||||
-spec start_link() -> Result
|
|
||||||
when Result :: {ok, pid()}
|
|
||||||
| {error, Reason :: term()}.
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
|
|
||||||
|
|
||||||
|
|
||||||
init(none) ->
|
|
||||||
State = #s{},
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_call({create_channel, Route}, _, State) ->
|
|
||||||
{Result, NewState} = do_create_channel(State, Route),
|
|
||||||
{reply, Result, NewState};
|
|
||||||
handle_call(Unexpected, From, State) ->
|
|
||||||
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_cast({update_router, Route, Conn, Sock}, State) ->
|
|
||||||
NewState = do_update_router(Route, Conn, Sock, State),
|
|
||||||
{noreply, NewState};
|
|
||||||
handle_cast({dispatch, Route, ID, Packet}, State) ->
|
|
||||||
NewState = do_dispatch(Route, ID, Packet, State),
|
|
||||||
{noreply, NewState};
|
|
||||||
handle_cast({enroll, PID}, State) ->
|
|
||||||
NewState = do_enroll(PID, State),
|
|
||||||
{noreply, NewState};
|
|
||||||
handle_cast(Unexpected, State) ->
|
|
||||||
ok = log(warning, "Unexpected cast: ~tp", [Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_info({'DOWN', Mon, process, PID, Reason}, State) ->
|
|
||||||
NewState = handle_down(Mon, PID, Reason, State),
|
|
||||||
{noreply, NewState};
|
|
||||||
handle_info(Unexpected, State) ->
|
|
||||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_down(Mon, PID, Reason, State) ->
|
|
||||||
% TODO
|
|
||||||
Unexpected = {'DOWN', Mon, process, PID, Reason},
|
|
||||||
ok = log(warning, "Unexpected info: ~tp", [Unexpected]),
|
|
||||||
State.
|
|
||||||
|
|
||||||
|
|
||||||
code_change(_, State, _) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
|
|
||||||
terminate(_, _) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%% Doer Functions
|
|
||||||
|
|
||||||
-spec do_enroll(PID, State) -> NewState
|
|
||||||
when PID :: pid(),
|
|
||||||
State :: state(),
|
|
||||||
NewState :: state().
|
|
||||||
|
|
||||||
do_enroll(_PID, State) ->
|
|
||||||
% TODO... ??
|
|
||||||
State.
|
|
||||||
|
|
||||||
do_enroll2(PID, State, Channels) ->
|
|
||||||
case lists:member(PID, Channels) of
|
|
||||||
false ->
|
|
||||||
Mon = monitor(process, PID),
|
|
||||||
ok = log(info, "Enroll: ~tp @ ~tp", [PID, Mon]),
|
|
||||||
%State#s{channels = [PID | Channels]};
|
|
||||||
State;
|
|
||||||
true ->
|
|
||||||
State
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_update_router(Route, Conn, Sock, State = #s{connections = Conns}) ->
|
|
||||||
case maps:find(Route, Conns) of
|
|
||||||
{ok, _Info} ->
|
|
||||||
io:format("Updating routes not yet implemented.~n", []),
|
|
||||||
init:stop();
|
|
||||||
error ->
|
|
||||||
NewConn = #route_info{conn_pid = Conn,
|
|
||||||
socket = Sock,
|
|
||||||
unused_channels = lists:seq(0, 255),
|
|
||||||
used_channels = #{}},
|
|
||||||
NewConns = maps:put(Route, NewConn, Conns),
|
|
||||||
State#s{connections = NewConns}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_create_channel(State = #s{connections = Conns}, Route) ->
|
|
||||||
case maps:find(Route, Conns) of
|
|
||||||
{ok, Info} ->
|
|
||||||
do_create_channel2(State, Route, Info);
|
|
||||||
error ->
|
|
||||||
Result = {error, unknown_connection},
|
|
||||||
{Result, State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_create_channel2(State, Route, Info) ->
|
|
||||||
case Info#route_info.unused_channels of
|
|
||||||
[] ->
|
|
||||||
{{error, all_in_use}, State};
|
|
||||||
[Next | _] ->
|
|
||||||
do_create_channel3(State, Route, Info, Next)
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_create_channel3(State, Route, Info, ChanID) ->
|
|
||||||
{ok, Chan} = msp_channel_sup:start_channel(),
|
|
||||||
msp_channel:configure_channel(Chan, ChanID, Info#route_info.socket, Route#route.to),
|
|
||||||
Result = {ok, {Chan, ChanID}},
|
|
||||||
|
|
||||||
% Now update the channel list...
|
|
||||||
Remaining = lists:delete(ChanID, Info#route_info.unused_channels),
|
|
||||||
NewChan = #channel{chan_pid = Chan},
|
|
||||||
NewChannels = maps:put(ChanID, NewChan, Info#route_info.used_channels),
|
|
||||||
NewInfo = Info#route_info{unused_channels = Remaining,
|
|
||||||
used_channels = NewChannels},
|
|
||||||
NewConns = maps:put(Route, NewInfo, State#s.connections),
|
|
||||||
NewState = State#s{connections = NewConns},
|
|
||||||
|
|
||||||
{Result, NewState}.
|
|
||||||
|
|
||||||
do_dispatch(Route, ID, Packet, State) ->
|
|
||||||
case maps:find(Route, State#s.connections) of
|
|
||||||
{ok, Info} ->
|
|
||||||
do_dispatch2(Route, ID, Packet, State, Info);
|
|
||||||
error ->
|
|
||||||
State
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_dispatch2(Route, ID, Packet, State, Info) ->
|
|
||||||
case maps:find(ID, Info#route_info.used_channels) of
|
|
||||||
{ok, #channel{chan_pid = Chan}} ->
|
|
||||||
do_dispatch3(Packet, State, Chan);
|
|
||||||
error ->
|
|
||||||
{{ok, {Chan, _}}, NewState} = do_create_channel3(State, Route, Info, ID),
|
|
||||||
do_dispatch3(Packet, NewState, Chan)
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_dispatch3(Packet, State, Chan) ->
|
|
||||||
erlang:send(Chan, {msp_fragment, Packet}),
|
|
||||||
State.
|
|
||||||
|
|
||||||
@ -1,50 +0,0 @@
|
|||||||
%%% @doc
|
|
||||||
%%% Minimal Stream Protocol: Channel Service Supervisor
|
|
||||||
%%%
|
|
||||||
%%% This is the service-level supervisor of the system. It is the parent of both the
|
|
||||||
%%% client connection handlers and the client manager (which manages the client
|
|
||||||
%%% connection handlers). This is the child of msp_sup.
|
|
||||||
%%%
|
|
||||||
%%% See: http://erlang.org/doc/apps/kernel/application.html
|
|
||||||
%%% @end
|
|
||||||
|
|
||||||
-module(msp_channels).
|
|
||||||
-vsn("0.1.0").
|
|
||||||
-behavior(supervisor).
|
|
||||||
-author("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-copyright("Jarvis Carroll <spiveehere@gmail.com>").
|
|
||||||
-license("MIT").
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
|
|
||||||
-spec start_link() -> {ok, pid()}.
|
|
||||||
%% @private
|
|
||||||
%% This supervisor's own start function.
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
|
|
||||||
|
|
||||||
-spec init(none) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
|
|
||||||
%% @private
|
|
||||||
%% The OTP init/1 function.
|
|
||||||
|
|
||||||
init(none) ->
|
|
||||||
RestartStrategy = {rest_for_one, 1, 60},
|
|
||||||
ChannelMan =
|
|
||||||
{msp_channel_man,
|
|
||||||
{msp_channel_man, start_link, []},
|
|
||||||
permanent,
|
|
||||||
5000,
|
|
||||||
worker,
|
|
||||||
[msp_channel_man]},
|
|
||||||
ChannelSup =
|
|
||||||
{msp_channel_sup,
|
|
||||||
{msp_channel_sup, start_link, []},
|
|
||||||
permanent,
|
|
||||||
5000,
|
|
||||||
supervisor,
|
|
||||||
[msp_channel_sup]},
|
|
||||||
Children = [ChannelSup, ChannelMan],
|
|
||||||
{ok, {RestartStrategy, Children}}.
|
|
||||||
@ -78,8 +78,8 @@ handle_cast(Unexpected, State) ->
|
|||||||
|
|
||||||
|
|
||||||
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
handle_info({udp, Sock, IP, Port, Packet}, State = #s{socket = Sock, peer = {IP, Port}}) ->
|
||||||
do_dispatch(State, Packet),
|
NewState = do_dispatch(State, Packet),
|
||||||
{noreply, State};
|
{noreply, NewState};
|
||||||
handle_info(Unexpected, State) ->
|
handle_info(Unexpected, State) ->
|
||||||
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
|
ok = io:format(warning, "Unexpected info: ~tp", [Unexpected]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
@ -105,23 +105,14 @@ terminate(_, _) ->
|
|||||||
%%% Doer Functions
|
%%% Doer Functions
|
||||||
|
|
||||||
do_begin_msp(Sock, Peer, Side) ->
|
do_begin_msp(Sock, Peer, Side) ->
|
||||||
ok = inet:setopts(Sock, [{active, once}, {mode, binary}]),
|
ok = inet:setopts(Sock, [{active, once}]),
|
||||||
State = #s{socket = Sock,
|
State = #s{socket = Sock,
|
||||||
peer = Peer,
|
peer = Peer,
|
||||||
side = Side},
|
side = Side},
|
||||||
State.
|
State.
|
||||||
|
|
||||||
do_dispatch(#s{socket = Sock, peer = Peer, connections = Conns}, <<ID:8, Packet/bytes>>) ->
|
do_dispatch(State = #s{socket = Sock}, Packet) ->
|
||||||
|
io:format("Got data: ~p~n", [Packet]),
|
||||||
ok = inet:setopts(Sock, [{active, once}]),
|
ok = inet:setopts(Sock, [{active, once}]),
|
||||||
case maps:find(ID, Conns) of
|
State.
|
||||||
{ok, Conn} ->
|
|
||||||
erlang:send(Conn, {msp_fragment, Packet});
|
|
||||||
error ->
|
|
||||||
{ok, OurPort} = inet:port(Sock),
|
|
||||||
{TheirIP, TheirPort} = Peer,
|
|
||||||
msp_channel_man:dispatch_fallback(OurPort, TheirIP, TheirPort, ID, Packet)
|
|
||||||
end;
|
|
||||||
do_dispatch(_, <<>>) ->
|
|
||||||
% Empty datagram?
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|||||||
@ -10,15 +10,9 @@ start_link() ->
|
|||||||
init([]) ->
|
init([]) ->
|
||||||
RestartStrategy = {one_for_one, 0, 60},
|
RestartStrategy = {one_for_one, 0, 60},
|
||||||
Children = [{msp_man,
|
Children = [{msp_man,
|
||||||
{msp_man, start_link, []},
|
{msp_man, start_link, []},
|
||||||
permanent,
|
permanent,
|
||||||
5000,
|
5000,
|
||||||
worker,
|
worker,
|
||||||
[msp_man]},
|
[msp_man]}],
|
||||||
{msp_channels,
|
|
||||||
{msp_channels, start_link, []},
|
|
||||||
permanent,
|
|
||||||
5000,
|
|
||||||
worker,
|
|
||||||
[msp_channels]}],
|
|
||||||
{ok, {RestartStrategy, Children}}.
|
{ok, {RestartStrategy, Children}}.
|
||||||
|
|||||||
@ -25,24 +25,17 @@ make_connection(OurPort, TheirIP, TheirPort, Side) ->
|
|||||||
{ok, Sock} = gen_udp:open(OurPort),
|
{ok, Sock} = gen_udp:open(OurPort),
|
||||||
{ok, Pid} = msp_connection:start_link(),
|
{ok, Pid} = msp_connection:start_link(),
|
||||||
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
|
ok = msp_connection:begin_msp(Pid, Sock, TheirIP, TheirPort, Side),
|
||||||
ok = msp_channel_man:update_router(OurPort, TheirIP, TheirPort, Pid, Sock),
|
|
||||||
{Pid, Sock}.
|
{Pid, Sock}.
|
||||||
|
|
||||||
|
|
||||||
send_test() ->
|
send_test() ->
|
||||||
IP = {127, 0, 0, 1},
|
IP = {127, 0, 0, 1},
|
||||||
PortA = 5555,
|
PortA = 5555,
|
||||||
PortB = 6666,
|
PortB = 6666,
|
||||||
{A, SockA} = make_connection(PortA, IP, PortB, 0),
|
{A, SockA} = make_connection(PortA, IP, PortB, 0),
|
||||||
{B, SockB} = make_connection(PortB, IP, PortA, 1),
|
{B, SockB} = make_connection(PortB, IP, PortA, 1),
|
||||||
|
gen_udp:send(SockA, {IP, PortB}, <<"message sent from A to B">>),
|
||||||
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
|
gen_udp:send(SockB, {IP, PortA}, <<"message sent from B to A">>),
|
||||||
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
|
|
||||||
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
|
|
||||||
timer:sleep(10),
|
|
||||||
|
|
||||||
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
|
|
||||||
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
|
|
||||||
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
|
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user