gmhive_client/src/gmhc_events.erl
2025-10-15 12:41:00 +02:00

170 lines
4.4 KiB
Erlang

-module(gmhc_events).
-vsn("0.8.3").
-export([subscribe/1,
ensure_subscribed/1,
unsubscribe/1,
ensure_unsubscribed/1,
publish/2]).
-export([debug/0,
progress/0,
stop/0]).
-export([rpt_debug/2,
rpt_progress/2]).
%% internal
-export([init_reporter/2]).
-include_lib("kernel/include/logger.hrl").
-export_type([event/0]).
-type event() :: pool_notification
| {pool_notification, atom()}
| error
| puzzle
| result
| connected
| disconnected.
-spec publish(event(), any()) -> ok.
publish(Event, Info) ->
Data = #{sender => self(),
time => os:timestamp(),
info => Info},
_ = gproc_ps:publish(l, Event, Data),
ok.
-spec subscribe(event()) -> true.
subscribe(Event) ->
gproc_ps:subscribe(l, Event).
-spec ensure_subscribed(event()) -> true.
%% @doc Subscribes to Event. Will not crash if called more than once.
ensure_subscribed(Event) ->
try subscribe(Event)
catch
error:badarg ->
%% This assertion should not be needed, since there is
%% no other scenario that would cause subscribe/1 to fail,
%% other than gproc not running (would also cause a badarg)
_ = gproc:get_value({p,l,{gproc_ps_event, Event}}, self()),
true
end.
-spec unsubscribe(event()) -> true.
unsubscribe(Event) ->
gproc_ps:unsubscribe(l, Event).
-spec ensure_unsubscribed(event()) -> true.
ensure_unsubscribed(Event) ->
case lists:member(self(), gproc_ps:list_subs(l,Event)) of
true ->
unsubscribe(Event);
false ->
true
end.
debug() ->
ok = application:ensure_started(gproc),
spawn_reporter(fun() ->
sub(),
gmhive_worker:subscribe_returns(),
loop(fun rpt_debug/2, false)
end).
progress() ->
ok = application:ensure_started(gproc),
spawn_reporter(fun() ->
sub(),
loop(fun rpt_progress/2, true)
end).
spawn_reporter(F) ->
Parent = self(),
proc_lib:start_link(?MODULE, init_reporter, [F, Parent]).
init_reporter(F, Parent) ->
try_register_reporter(),
proc_lib:init_ack(Parent, self()),
F().
stop() ->
case whereis(gmhc_reporter) of
undefined ->
not_running;
Pid ->
exit(Pid, kill)
end.
sub() ->
subscribe(pool_notification),
subscribe({pool_notification, new_generation}),
subscribe(connected),
subscribe(puzzle),
subscribe(result),
subscribe(error),
subscribe(disconnected).
loop(F, Ts) ->
receive
stop -> ok;
{gproc_ps_event, E, Data} ->
maybe_print(F(E, Data), Ts),
loop(F, Ts)
end.
try_register_reporter() ->
try register(gmhc_reporter, self())
catch
error:_ ->
?LOG_ERROR("Reporter already running. Try gmhc_events:stop().", []),
error(already_running)
end.
maybe_print([], _) ->
ok;
maybe_print(String, Ts) when is_boolean(Ts) ->
TSstr = [[ts(), " "] || Ts],
io:put_chars([TSstr, String, "\n"]).
ts() ->
calendar:system_time_to_rfc3339(erlang:system_time(millisecond),
[{unit, millisecond}, {offset, "Z"}]).
rpt_debug(E, Data) ->
io_lib:fwrite("EVENT ~p: ~p", [E, Data]).
rpt_progress(puzzle, #{info := {_Data, _Target, Nonce, _Config}}) ->
w("Trying nonce: ~p", [Nonce]);
rpt_progress(result, #{info := Info}) ->
case Info of
{error, no_solution} ->
[];
{ok, Cycles} ->
w("Found! Reporting ~w cycles to leader.", [length(Cycles)]);
Other ->
w("Unexpected 'result': ~tp", [Other])
end;
rpt_progress(pool_notification, #{info := #{msg := Msg}}) ->
case Msg of
#{solution_accepted := #{seq := Seq}} ->
w("The hive has produced a solution! Sequence: ~w", [Seq]);
#{new_generation := _} -> [];
#{candidate := _} -> [];
Other ->
w("Unexpected 'pool_notification': ~tp", [Other])
end;
rpt_progress(connected, _) ->
w("Connected!", []);
rpt_progress(disconnected, _) ->
w("Disconnected!", []);
rpt_progress(_, _) ->
[].
w(Fmt, Args) ->
io_lib:fwrite(Fmt, Args).