Receive data on the other end.

I added a bunch of receipt interfaces, but I only test the syncronous
message receipt at the moment, not fragments or async.

I'm not sure whether 'subscribe_message' even makes sense as a concept,
but there it is.
This commit is contained in:
Jarvis Carroll 2025-10-31 11:35:05 +00:00
parent 23276662bb
commit 39a75ca435
2 changed files with 220 additions and 22 deletions

View File

@ -10,7 +10,13 @@
-license("MIT").
-export([configure_channel/4]).
-export([send_and_close/2]).
-export([send_and_close/2, send_message/2]).
% A lot of interfaces, but the last two are only really written for
% completeness. The distinctions here are blocking vs subscribing, and
% fragments vs messages.
-export([receive_fragment/1, receive_message/1,
subscribe_fragment/1, subscribe_message/1,
subscribe_fragment_once/1, subscribe_message_once/1]).
%% gen_server
-export([start_link/0]).
@ -66,14 +72,24 @@
seq :: integer(),
data :: binary()}).
-type request_type() :: fragment | message.
-type request_recipient() :: {async, pid()} | {sync, gen_server:from()}.
-record(request,
{type :: request_type(),
recipient :: request_recipient()}).
-type request() :: #request{}.
-record(s,
{chan_id :: integer(),
socket :: gen_udp:socket(),
peer :: endpoint(), % TODO configure the socket in advance?
state = closed :: closed | sending | receiving,
seq = 0 :: integer(),
requests = {[], []} :: {[request()], [request()]},
subscriber = none :: none | request(),
instructions = [] :: [#instruction{}],
fragments = [] :: [binary()]}).
fragments = {[], []} :: {[binary()], [binary()]}}).
-type state() :: none | #s{}.
@ -88,6 +104,31 @@ configure_channel(Pid, Id, Socket, Peer) ->
send_and_close(Chan, Message) ->
gen_server:call(Chan, {send, Message, close}).
send_message(Chan, Message) ->
gen_server:call(Chan, {send, Message, end_message}).
receive_fragment(Chan) ->
gen_server:call(Chan, {add_sync_request, fragment}).
receive_message(Chan) ->
gen_server:call(Chan, {add_sync_request, message}).
subscribe_fragment(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, fragment, Pid}).
subscribe_message(Chan) ->
Pid = self(),
gen_server:call(Chan, {subscribe, message, Pid}).
subscribe_fragment_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, fragment, Pid}).
subscribe_message_once(Chan) ->
Pid = self(),
gen_server:call(Chan, {add_async_request, message, Pid}).
%%% gen_server
-spec start_link() -> Result
@ -105,6 +146,15 @@ init(none) ->
handle_call({send, Message, Mod}, _, State) ->
{Result, NewState} = do_send(Message, Mod, State),
{reply, Result, NewState};
handle_call({add_sync_request, Type}, From, State) ->
NewState = do_add_sync_request(Type, From, State),
{noreply, NewState};
handle_call({add_async_request, Type, Pid}, _, State) ->
{Result, NewState} = do_add_async_request(Type, Pid, State),
{reply, Result, NewState};
handle_call({subscribe, Type, Pid}, _, State) ->
{Result, NewState} = do_subscribe(Type, Pid, State),
{reply, Result, NewState};
handle_call(Unexpected, From, State) ->
ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]),
{noreply, State}.
@ -168,6 +218,41 @@ choose_opcode(end_message) -> {?OP_END_MESSAGE, receiving};
choose_opcode(close) -> {?OP_CLOSE, closed};
choose_opcode(open_dirty) -> {?OP_OPEN_DIRTY, sending}.
%%% Subscription Doer Functions
% Adds a request to the queue, or responds to that request directly, if there
% is a subscriber already.
do_add_sync_request(Type, From, State = #s{subscriber = none}) ->
do_add_request(Type, {sync, From}, State);
do_add_sync_request(_, From, State) ->
Result = {error, already_subscribed},
gen_server:reply(From, Result),
State.
% Adds a request to the queue and returns ok, or returns an error if there is a
% subscriber already.
do_add_async_request(Type, Pid, State = #s{subscriber = none}) ->
NewState = do_add_request(Type, {async, Pid}, State),
{ok, NewState};
do_add_async_request(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
do_add_request(Type, Recipient, State = #s{requests = {In, Out}}) ->
Request = #request{type = Type, recipient = Recipient},
NewRequests = {[Request | In], Out},
StateWithRequest = State#s{requests = NewRequests},
try_fill_requests(StateWithRequest).
do_subscribe(Type, Pid, State = #s{subscriber = none}) ->
Request = #request{type = Type, recipient = {async, Pid}},
StateWithSub = State#s{subscriber = Request},
NewState = try_fill_requests(StateWithSub),
{ok, NewState};
do_subscribe(_, _, State) ->
Result = {error, already_subscribed},
{Result, State}.
%%% Receive Doer Functions
% First handle the messages that don't need to be reordered
@ -223,13 +308,19 @@ insert_instruction(Op, Seq, Data, Is, Earlier) ->
%%% Reliable message execution
do_execute_buffer(State = #s{seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
do_execute_buffer(State = #s{state = closed, seq = Seq, instructions = [#instruction{seq = Seq} | _]}) ->
% If we are closed, then anything we receive opens the channel.
StateOpened = State#s{state = receiving},
do_execute_buffer(StateOpened);
do_execute_buffer(State = #s{state = receiving, seq = Seq, instructions = [#instruction{op = Op, seq = Seq, data = Data} | Rest]}) ->
PoppedState = State#s{instructions = Rest, seq = Seq + 1},
log(debug, "Executing instruction. Op: ~p, Seq: ~p.~n", [Op, Seq]),
NewState = do_handle_reliable(PoppedState, Op, Seq, Data),
do_execute_buffer(NewState);
do_execute_buffer(State) ->
% Empty instruction list, or the next instruction hasn't arrived yet. Done.
State.
% We've finished consuming instructions; now see if we have any requests we
% can fill.
try_fill_requests(State).
do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) ->
do_handle_fragment(State, continue, Seq, Payload);
@ -245,11 +336,104 @@ do_handle_reliable(State, Unknown, _, _) ->
io:format("Got unexpected opcode ~p~n", [Unknown]),
State.
do_handle_fragment(State, Mod, Seq, Payload) ->
io:format("Got fragment ~p with index ~p, and modifier ~p~n", [Payload, Seq, Mod]),
State.
do_handle_status(State, Seq, Payload) ->
io:format("Unimplemented command. Index ~p, payload ~p~n", [Seq, Payload]),
State.
send_ack(State, Seq),
do_handle_fragment(State, continue, Seq, Payload).
send_ack(#s{chan_id = ID, socket = Sock, peer = Peer}, Seq) ->
% TODO: backpressure
gen_udp:send(Sock, Peer, <<ID:8, ?OP_ACK:8, Seq:16>>).
do_handle_fragment(State, Mod, _Seq, Payload) ->
WithFragment = push_fragment(State, Payload),
update_chan_mode(WithFragment, Mod).
push_fragment(State = #s{fragments = {In, Out}}, Payload) ->
State#s{fragments = {[Payload | In], Out}}.
update_chan_mode(State, continue) ->
State#s{state = receiving};
update_chan_mode(State, end_message) ->
State#s{state = sending};
update_chan_mode(State, close_channel) ->
% TODO: Properly clean up subscriptions and requests and things, if the
% channel closes... Also if the channel switches to sending, I guess a
% fragment subscriber should be notified?
State#s{state = closed};
update_chan_mode(State, dirty) ->
% TODO: create and then quarantine dirty channels
State#s{state = receiving}.
try_fill_requests(State = #s{requests = Requests}) ->
case pop_queue(Requests) of
{ok, {Remaining, Request}} ->
try_fill_requests2(State, Remaining, Request);
error ->
% No requests, try subscriptions instead.
try_fill_subscription(State)
end.
try_fill_requests2(State, RequestsRemaining, Request) ->
Without = State#s{requests = RequestsRemaining},
case try_fill_request(Without, Request) of
{ok, NewState} ->
% Done successfully! Start the loop again with the new state.
try_fill_requests(NewState);
error ->
% The request didn't get filled... Just revert.
State
end.
try_fill_subscription(State = #s{subscriber = none}) ->
% No subscription either! Do nothing.
State;
try_fill_subscription(State = #s{subscriber = Sub}) ->
% A subscription! Try to fill it as many times as possible.
try_fill_subscription_loop(State, Sub).
try_fill_subscription_loop(State, Sub) ->
case try_fill_request(State, Sub) of
{ok, NewState} ->
try_fill_subscription_loop(NewState, Sub);
error ->
State
end.
try_fill_request(#s{state = receiving}, #request{type = message}) ->
% Message not finished yet.
error;
try_fill_request(#s{fragments = {[], []}}, #request{type = message}) ->
% No fragments have been received, not even empty fragments. This
% channel/message hasn't even been opened yet.
error;
try_fill_request(State = #s{fragments = {In, Out}}, #request{type = message, recipient = Recip}) ->
Fragments = Out ++ lists:reverse(In),
Message = list_to_binary(Fragments),
send_result(Recip, Message),
NewState = State#s{fragments = {[], []}},
{ok, NewState};
try_fill_request(State = #s{fragments = Fragments}, #request{type = fragment, recipient = Recip}) ->
case pop_queue(Fragments) of
{ok, {NewFragments, Fragment}} ->
log(debug, "Replying to fragment request. Fragment: ~p~n", [Fragment]),
send_result(Recip, Fragment),
NewState = State#s{fragments = NewFragments},
{ok, NewState};
error ->
State
end.
send_result({async, Pid}, Result) ->
erlang:send(Pid, {msp_fragment, self(), Result});
send_result({sync, From}, Result) ->
gen_server:reply(From, {ok, Result}).
pop_queue({[], []}) ->
error;
pop_queue({In, [Next | Out]}) ->
NewFragments = {In, Out},
{ok, {NewFragments, Next}};
pop_queue({In, []}) ->
Out = lists:reverse(In),
pop_queue({[], Out}).

View File

@ -37,15 +37,21 @@ send_test() ->
ok = make_connection(PortA, IP, PortB, 0),
ok = make_connection(PortB, IP, PortA, 1),
{ok, {ChanA, IndexA}} = msp_channel_man:create_channel(PortA, IP, PortB),
msp_channel:send_and_close(ChanA, <<"message sent from A to B">>),
io:format("A has tried to send a message on channel ~p~n", [IndexA]),
timer:sleep(10),
{ok, {ChanA, 0}} = msp_channel_man:create_channel(PortA, IP, PortB),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
MessageA = <<"message sent from A to B">>,
msp_channel:send_and_close(ChanA, MessageA),
io:format("A has tried to send a message to B~n"),
{ok, MessageA} = msp_channel:receive_message(ChanB),
io:format("received successfully~n"),
MessageB = <<"message sent from B to A">>,
msp_channel:send_and_close(ChanB, MessageB),
io:format("B has tried to send a message to A~n"),
{ok, MessageB} = msp_channel:receive_message(ChanA),
io:format("received successfully~n"),
{ok, {ChanB, IndexB}} = msp_channel_man:create_channel(PortB, IP, PortA),
msp_channel:send_and_close(ChanB, <<"message sent from B to A">>),
io:format("B has tried to send a message on channel ~p~n", [IndexB]),
timer:sleep(10),
ok.
out_of_order_test() ->
@ -54,11 +60,13 @@ out_of_order_test() ->
PortB = 5522,
{ok, SockA} = gen_udp:open(PortA),
ok = make_connection(PortB, IP, PortA, 0),
{ok, {ChanB, 0}} = msp_channel_man:create_channel(PortB, IP, PortA),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second message">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first message">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 2:8, 1:16, "second fragment;">>),
ok = gen_udp:send(SockA, {IP, PortB}, <<0:8, 0:8, 0:16, "first fragment;">>),
timer:sleep(10),
{ok, <<"first fragment;second fragment;">>} = msp_channel:receive_message(ChanB),
io:format("Received out of order fragments successfully.~n"),
ok.
@ -72,8 +80,14 @@ race_test() ->
timer:sleep(10),
% We should already have a {udp, SockB, IP, PortA, _} in our message queue,
% which needs to be properly detected and handled by the socket transfer
% functions.
ok = msp_channel_man:add_route(SockB, IP, PortA, 0),
% TODO We could test that the datagrams were dispatched properly by
% receiving on the channel, but for now we just manually check the
% logs/console output for msp_channel_man errors.
timer:sleep(10),
ok.