-module(gmhc_events). -vsn("0.8.3"). -export([subscribe/1, ensure_subscribed/1, unsubscribe/1, ensure_unsubscribed/1, publish/2]). -export([debug/0, progress/0, stop/0]). -export([rpt_debug/2, rpt_progress/2]). %% internal -export([init_reporter/2]). -include_lib("kernel/include/logger.hrl"). -export_type([event/0]). -type event() :: pool_notification | {pool_notification, atom()} | error | puzzle | result | connected | disconnected. -spec publish(event(), any()) -> ok. publish(Event, Info) -> Data = #{sender => self(), time => os:timestamp(), info => Info}, _ = gproc_ps:publish(l, Event, Data), ok. -spec subscribe(event()) -> true. subscribe(Event) -> gproc_ps:subscribe(l, Event). -spec ensure_subscribed(event()) -> true. %% @doc Subscribes to Event. Will not crash if called more than once. ensure_subscribed(Event) -> try subscribe(Event) catch error:badarg -> %% This assertion should not be needed, since there is %% no other scenario that would cause subscribe/1 to fail, %% other than gproc not running (would also cause a badarg) _ = gproc:get_value({p,l,{gproc_ps_event, Event}}, self()), true end. -spec unsubscribe(event()) -> true. unsubscribe(Event) -> gproc_ps:unsubscribe(l, Event). -spec ensure_unsubscribed(event()) -> true. ensure_unsubscribed(Event) -> case lists:member(self(), gproc_ps:list_subs(l,Event)) of true -> unsubscribe(Event); false -> true end. debug() -> ok = application:ensure_started(gproc), spawn_reporter(fun() -> sub(), gmhive_worker:subscribe_returns(), loop(fun rpt_debug/2, false) end). progress() -> ok = application:ensure_started(gproc), spawn_reporter(fun() -> sub(), loop(fun rpt_progress/2, true) end). spawn_reporter(F) -> Parent = self(), proc_lib:start_link(?MODULE, init_reporter, [F, Parent]). init_reporter(F, Parent) -> try_register_reporter(), proc_lib:init_ack(Parent, self()), F(). stop() -> case whereis(gmhc_reporter) of undefined -> not_running; Pid -> exit(Pid, kill) end. sub() -> subscribe(pool_notification), subscribe({pool_notification, new_generation}), subscribe(connected), subscribe(puzzle), subscribe(result), subscribe(error), subscribe(disconnected). loop(F, Ts) -> receive stop -> ok; {gproc_ps_event, E, Data} -> maybe_print(F(E, Data), Ts), loop(F, Ts) end. try_register_reporter() -> try register(gmhc_reporter, self()) catch error:_ -> ?LOG_ERROR("Reporter already running. Try gmhc_events:stop().", []), error(already_running) end. maybe_print([], _) -> ok; maybe_print(String, Ts) when is_boolean(Ts) -> TSstr = [[ts(), " "] || Ts], io:put_chars([TSstr, String, "\n"]). ts() -> calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}, {offset, "Z"}]). rpt_debug(E, Data) -> io_lib:fwrite("EVENT ~p: ~p", [E, Data]). rpt_progress(puzzle, #{info := {_Data, _Target, Nonce, _Config}}) -> w("Trying nonce: ~p", [Nonce]); rpt_progress(result, #{info := Info}) -> case Info of {error, no_solution} -> []; {ok, Cycles} -> w("Found! Reporting ~w cycles to leader.", [length(Cycles)]); Other -> w("Unexpected 'result': ~tp", [Other]) end; rpt_progress(pool_notification, #{info := #{msg := Msg}}) -> case Msg of #{solution_accepted := #{seq := Seq}} -> w("The hive has produced a solution! Sequence: ~w", [Seq]); #{new_generation := _} -> []; #{candidate := _} -> []; Other -> w("Unexpected 'pool_notification': ~tp", [Other]) end; rpt_progress(connected, _) -> w("Connected!", []); rpt_progress(disconnected, _) -> w("Disconnected!", []); rpt_progress(_, _) -> []. w(Fmt, Args) -> io_lib:fwrite(Fmt, Args).