diff --git a/src/msp_channel.erl b/src/msp_channel.erl index ce79d3a..db1219b 100644 --- a/src/msp_channel.erl +++ b/src/msp_channel.erl @@ -10,7 +10,13 @@ -license("MIT"). -export([configure_channel/4]). --export([send_and_close/2]). +-export([send_and_close/2, send_message/2]). +% A lot of interfaces, but the last two are only really written for +% completeness. The distinctions here are blocking vs subscribing, and +% fragments vs messages. +-export([receive_fragment/1, receive_message/1, + subscribe_fragment/1, subscribe_message/1, + subscribe_fragment_once/1, subscribe_message_once/1]). %% gen_server -export([start_link/0]). @@ -66,14 +72,24 @@ seq :: integer(), data :: binary()}). + +-type request_type() :: fragment | message. +-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}. +-record(request, + {type :: request_type(), + recipient :: request_recipient()}). +-type request() :: #request{}. + -record(s, {chan_id :: integer(), socket :: gen_udp:socket(), peer :: endpoint(), % TODO configure the socket in advance? state = closed :: closed | sending | receiving, seq = 0 :: integer(), + requests = {[], []} :: {[request()], [request()]}, + subscriber = none :: none | request(), instructions = [] :: [#instruction{}], - fragments = [] :: [binary()]}). + fragments = {[], []} :: {[binary()], [binary()]}}). -type state() :: none | #s{}. @@ -88,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) -> send_and_close(Chan, Message) -> gen_server:call(Chan, {send, Message, close}). +send_message(Chan, Message) -> + gen_server:call(Chan, {send, Message, end_message}). + +receive_fragment(Chan) -> + gen_server:call(Chan, {add_sync_request, fragment}). + +receive_message(Chan) -> + gen_server:call(Chan, {add_sync_request, message}). + +subscribe_fragment(Chan) -> + Pid = self(), + gen_server:call(Chan, {subscribe, fragment, Pid}). + +subscribe_message(Chan) -> + Pid = self(), + gen_server:call(Chan, {subscribe, message, Pid}). + +subscribe_fragment_once(Chan) -> + Pid = self(), + gen_server:call(Chan, {add_async_request, fragment, Pid}). + +subscribe_message_once(Chan) -> + Pid = self(), + gen_server:call(Chan, {add_async_request, message, Pid}). + %%% gen_server -spec start_link() -> Result @@ -105,6 +146,15 @@ init(none) -> handle_call({send, Message, Mod}, _, State) -> {Result, NewState} = do_send(Message, Mod, State), {reply, Result, NewState}; +handle_call({add_sync_request, Type}, From, State) -> + NewState = do_add_sync_request(Type, From, State), + {noreply, NewState}; +handle_call({add_async_request, Type, Pid}, _, State) -> + {Result, NewState} = do_add_async_request(Type, Pid, State), + {reply, Result, NewState}; +handle_call({subscribe, Type, Pid}, _, State) -> + {Result, NewState} = do_subscribe(Type, Pid, State), + {reply, Result, NewState}; handle_call(Unexpected, From, State) -> ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), {noreply, State}. @@ -168,6 +218,41 @@ choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving}; choose_opcode(close) -> {?OP_CLOSE, closed}; choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}. +%%% Subscription Doer Functions + +% Adds a request to the queue, or responds to that request directly, if there +% is a subscriber already. +do_add_sync_request(Type, From, State = #s{subscriber = none}) -> + do_add_request(Type, {sync, From}, State); +do_add_sync_request(_, From, State) -> + Result = {error, already_subscribed}, + gen_server:reply(From, Result), + State. + +% Adds a request to the queue and returns ok, or returns an error if there is a +% subscriber already. +do_add_async_request(Type, Pid, State = #s{subscriber = none}) -> + NewState = do_add_request(Type, {async, Pid}, State), + {ok, NewState}; +do_add_async_request(_, _, State) -> + Result = {error, already_subscribed}, + {Result, State}. + +do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) -> + Request = #request{type = Type, recipient = Recipient}, + NewRequests = {[Request | In], Out}, + StateWithRequest = State#s{requests = NewRequests}, + try_fill_requests(StateWithRequest). + +do_subscribe(Type, Pid, State = #s{subscriber = none}) -> + Request = #request{type = Type, recipient = {async, Pid}}, + StateWithSub = State#s{subscriber = Request}, + NewState = try_fill_requests(StateWithSub), + {ok, NewState}; +do_subscribe(_, _, State) -> + Result = {error, already_subscribed}, + {Result, State}. + %%% Receive Doer Functions % First handle the messages that don't need to be reordered @@ -223,13 +308,19 @@ insert_instruction(Op, Seq, Data, Is, Earlier) -> %%% Reliable message execution -do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) -> +do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) -> + % If we are closed, then anything we receive opens the channel. + StateOpened = State#s{state = receiving}, + do_execute_buffer(StateOpened); +do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) -> PoppedState = State#s{instructions = Rest, seq = Seq + 1}, + log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]), NewState = do_handle_reliable(PoppedState, Op, Seq, Data), do_execute_buffer(NewState); do_execute_buffer(State) -> - % Empty instruction list, or the next instruction hasn't arrived yet. Done. - State. + % We've finished consuming instructions; now see if we have any requests we + % can fill. + try_fill_requests(State). do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) -> do_handle_fragment(State, continue, Seq, Payload); @@ -245,11 +336,104 @@ do_handle_reliable(State, Unknown, _, _) -> io:format("Got unexpected opcode ~p~n", [Unknown]), State. -do_handle_fragment(State, Mod, Seq, Payload) -> - io:format("Got fragment ~p with index ~p, and modifier ~p~n", [Payload, Seq, Mod]), - State. - do_handle_status(State, Seq, Payload) -> - io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]), - State. + send_ack(State, Seq), + do_handle_fragment(State, continue, Seq, Payload). + +send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) -> + % TODO: backpressure + gen_udp:send(Sock, Peer, <>). + +do_handle_fragment(State, Mod, _Seq, Payload) -> + WithFragment = push_fragment(State, Payload), + update_chan_mode(WithFragment, Mod). + +push_fragment(State = #s{fragments = {In, Out}}, Payload) -> + State#s{fragments = {[Payload | In], Out}}. + +update_chan_mode(State, continue) -> + State#s{state = receiving}; +update_chan_mode(State, end_message) -> + State#s{state = sending}; +update_chan_mode(State, close_channel) -> + % TODO: Properly clean up subscriptions and requests and things, if the + % channel closes... Also if the channel switches to sending, I guess a + % fragment subscriber should be notified? + State#s{state = closed}; +update_chan_mode(State, dirty) -> + % TODO: create and then quarantine dirty channels + State#s{state = receiving}. + +try_fill_requests(State = #s{requests = Requests}) -> + case pop_queue(Requests) of + {ok, {Remaining, Request}} -> + try_fill_requests2(State, Remaining, Request); + error -> + % No requests, try subscriptions instead. + try_fill_subscription(State) + end. + +try_fill_requests2(State, RequestsRemaining, Request) -> + Without = State#s{requests = RequestsRemaining}, + case try_fill_request(Without, Request) of + {ok, NewState} -> + % Done successfully! Start the loop again with the new state. + try_fill_requests(NewState); + error -> + % The request didn't get filled... Just revert. + State + end. + +try_fill_subscription(State = #s{subscriber = none}) -> + % No subscription either! Do nothing. + State; +try_fill_subscription(State = #s{subscriber = Sub}) -> + % A subscription! Try to fill it as many times as possible. + try_fill_subscription_loop(State, Sub). + +try_fill_subscription_loop(State, Sub) -> + case try_fill_request(State, Sub) of + {ok, NewState} -> + try_fill_subscription_loop(NewState, Sub); + error -> + State + end. + +try_fill_request(#s{state = receiving}, #request{type = message}) -> + % Message not finished yet. + error; +try_fill_request(#s{fragments = {[], []}}, #request{type = message}) -> + % No fragments have been received, not even empty fragments. This + % channel/message hasn't even been opened yet. + error; +try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) -> + Fragments = Out ++ lists:reverse(In), + Message = list_to_binary(Fragments), + send_result(Recip, Message), + NewState = State#s{fragments = {[], []}}, + {ok, NewState}; +try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) -> + case pop_queue(Fragments) of + {ok, {NewFragments, Fragment}} -> + log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]), + send_result(Recip, Fragment), + NewState = State#s{fragments = NewFragments}, + {ok, NewState}; + error -> + State + end. + +send_result({async, Pid}, Result) -> + erlang:send(Pid, {msp_fragment, self(), Result}); +send_result({sync, From}, Result) -> + gen_server:reply(From, {ok, Result}). + +pop_queue({[], []}) -> + error; +pop_queue({In, [Next | Out]}) -> + NewFragments = {In, Out}, + {ok, {NewFragments, Next}}; +pop_queue({In, []}) -> + Out = lists:reverse(In), + pop_queue({[], Out}). diff --git a/src/msp_tests.erl b/src/msp_tests.erl index 81bcc2f..914a597 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -37,15 +37,21 @@ send_test() -> ok = make_connection(PortA, IP, PortB, 0), ok = make_connection(PortB, IP, PortA, 1), - {ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB), - msp_channel:send_and_close(ChanA, <<"message sent from A to B">>), - io:format("A has tried to send a message on channel ~p~n", [IndexA]), - timer:sleep(10), + {ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB), + {ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA), + + MessageA = <<"message sent from A to B">>, + msp_channel:send_and_close(ChanA, MessageA), + io:format("A has tried to send a message to B~n"), + {ok, MessageA} = msp_channel:receive_message(ChanB), + io:format("received successfully~n"), + + MessageB = <<"message sent from B to A">>, + msp_channel:send_and_close(ChanB, MessageB), + io:format("B has tried to send a message to A~n"), + {ok, MessageB} = msp_channel:receive_message(ChanA), + io:format("received successfully~n"), - {ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA), - msp_channel:send_and_close(ChanB, <<"message sent from B to A">>), - io:format("B has tried to send a message on channel ~p~n", [IndexB]), - timer:sleep(10), ok. out_of_order_test() -> @@ -54,11 +60,13 @@ out_of_order_test() -> PortB = 5522, {ok, SockA} = gen_udp:open(PortA), ok = make_connection(PortB, IP, PortA, 0), + {ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA), - ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second message">>), - ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first message">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>), + ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>), - timer:sleep(10), + {ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB), + io:format("Received out of order fragments successfully.~n"), ok. @@ -72,8 +80,14 @@ race_test() -> timer:sleep(10), + % We should already have a {udp, SockB, IP, PortA, _} in our message queue, + % which needs to be properly detected and handled by the socket transfer + % functions. ok = msp_channel_man:add_route(SockB, IP, PortA, 0), + % TODO We could test that the datagrams were dispatched properly by + % receiving on the channel, but for now we just manually check the + % logs/console output for msp_channel_man errors. timer:sleep(10), ok.