gmhive_client/src/gmhc_server.erl
2025-10-23 22:22:59 +02:00

449 lines
16 KiB
Erlang

-module(gmhc_server).
-vsn("0.8.3").
-behaviour(gen_server).
-export([ connected/2
, disconnected/1
, from_pool/1
, new_candidate/1
]).
-export([ total_nonces/0 ]).
-export([
start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-include_lib("kernel/include/logger.hrl").
-include("gmhc_events.hrl").
-record(worker, { config
, nonces = 0
, index
, pid
, mref
, cand
, nonce
, errors = 0}).
-type worker() :: #worker{}.
-type type() :: monitor | worker.
-record(st, {
connected = #{} :: #{non_neg_integer() => {pid(), type()}}
, working = false :: boolean()
, candidate :: map() | 'undefined'
, nonces = 1 :: pos_integer()
, workers = [] :: [worker()]
}).
-define(CONNECTED(S), map_size(S#st.connected) > 0).
-define(MAX_ERRORS, 50).
connected(Id, Type) ->
gen_server:call(?MODULE, {connected, Id, Type}).
disconnected(Id) ->
gen_server:cast(?MODULE, {disconnected, Id}).
from_pool(Msg) ->
ToSend = {from_pool, Msg},
%% ?LOG_DEBUG("Sending to server: ~p", [ToSend]),
gen_server:cast(?MODULE, ToSend).
new_candidate(Cand) ->
gen_server:cast(?MODULE, {new_candidate, Cand}).
total_nonces() ->
gen_server:call(?MODULE, total_nonces).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
WorkerConfigs = gmhc_workers:get_worker_configs(),
?LOG_DEBUG("WorkerConfigs = ~p", [WorkerConfigs]),
%% IdleWorkers = [#worker{executable = E} || E <- Instances],
{IdleWorkers,_} = lists:mapfoldl(
fun(C, N) ->
NNonces = calc_nonces(C),
{#worker{index = N, config = C, nonces = NNonces}, N+1}
end, 1, WorkerConfigs),
TotalNonces = lists:foldl(fun(#worker{nonces = N}, Acc) ->
N + Acc
end, 0, IdleWorkers),
process_flag(trap_exit, true),
{ok, #st{workers = IdleWorkers, nonces = TotalNonces}}.
handle_call(total_nonces, _From, #st{nonces = Nonces} = S) ->
{reply, Nonces, S};
handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) ->
?LOG_DEBUG("connected: ~p, ~p", [Id, Type]),
erlang:monitor(process, Pid),
S1 = S#st{connected = Conn#{Id => {Pid, Type}}},
S2 = case Type of
monitor ->
stop_workers(S1#st.workers), % shouldn't be any running
S1#st{workers = [], working = false};
worker ->
gmhc_watchdog:watch(5*60_000, 1), %% 5 minutes, one-shot
S1#st{working = true}
end,
{reply, ok, S2};
handle_call(_Req, _From, S) ->
{reply, unknown_call, S}.
handle_cast({from_pool, #{via := Connector,
notification :=
#{candidate := Cand0}}},
#st{workers = Workers} = S) ->
Cand = maps:put(via, Connector, decode_candidate_hash(Cand0)),
?LOG_DEBUG("Got new candidate; will mine it: ~p", [Cand]),
%%
%% We could check whether we have already received the candidate ...
%% For now, stop all workers, restart with new candidate
try
%% Most of the time we don't want to stop the worker. If we do, though, then
%% we need to do it more carefully than this, or memory usage will triple.
%% Workers1 = stop_workers(Workers),
%%
%% Nonces may be [], in which case we need to request new nonces first.
#st{candidate = Cand1} = S1 = maybe_request_nonces(S#st{candidate = Cand}),
{Workers2, Cand2} = assign_nonces(Workers, Cand1),
#st{candidate = Cand3} = S2 = maybe_request_nonces(S1#st{candidate = Cand2}),
NewWorkers = [spawn_worker(W, Cand3) || W <- Workers2],
{noreply, S2#st{workers = NewWorkers}}
catch
Cat:Err:St ->
?LOG_ERROR("CAUGHT ~p:~p / ~p", [Cat, Err, St]),
{noreply, S}
end;
handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
?LOG_DEBUG("disconnected: ~p", [Id]),
Conn1 = maps:remove(Id, Conn),
gmhc_watchdog:unwatch(),
S1 = if map_size(Conn1) == 0 ->
Ws = stop_workers(S#st.workers),
S#st{connected = Conn1, workers = Ws};
true -> S#st{connected = Conn1}
end,
{noreply, S1};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({'DOWN', MRef, process, Pid, Reason}, #st{ workers = Workers
, connected = Connected} = S) ->
%% ?LOG_DEBUG("DOWN from ~p: ~p", [Pid, Reason]),
case lists:keyfind(Pid, #worker.pid, Workers) of
#worker{mref = MRef} = W ->
S1 = handle_worker_result(Reason, W, S),
{noreply, S1};
false ->
Conn1 = maps:filter(fun(_, {P,_}) -> P =/= Pid end, Connected),
{noreply, S#st{connected = Conn1}}
end;
handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers
, working = Working} = S)
when ?CONNECTED(S), Working ->
case lists:keyfind(Pid, #worker.pid, Workers) of
#worker{} = W ->
%% ?LOG_DEBUG("EXIT from worker ~p: ~p", [W#worker.index, Reason]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Reason})),
Ws1 = incr_worker_error(W, Workers),
erlang:start_timer(100, self(), check_workers),
{noreply, S#st{workers = Ws1}};
false ->
%% ?LOG_DEBUG("EXIT apparently not from worker?? (~p)", [Pid]),
{noreply, S}
end;
handle_info({timeout, _, check_workers}, #st{workers = Workers} = S) ->
S1 = maybe_request_nonces(S),
S2 = lists:foldl(fun(W, Sx) ->
maybe_restart_worker(W, Sx)
end, S1, Workers),
{noreply, S2};
handle_info(Msg, St) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, St}.
terminate(_Reason, _St) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
report_solutions(Solutions, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
Nonces = all_nonces(W),
[report_no_solution_(Via, Seq, N)
|| N <- Nonces, not lists:keymember(N, 1, Solutions)],
gmhc_handler:call(
#{via => Via,
solutions => #{ seq => Seq
, found => [#{ nonce => Nonce
, evidence => Evd }
|| {Nonce, Evd} <- Solutions] }}).
report_no_solution(_Nonce, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
Nonces = all_nonces(W),
%% ?LOG_DEBUG("report no_solution Seq = ~p, Nonce = ~p", [Seq, Nonce]),
[report_no_solution_(Via, Seq, Nonce1) || Nonce1 <- Nonces],
ok.
report_no_solution_(Via, Seq, Nonce) ->
gmhc_handler:async_call(#{via => Via,
no_solution => #{ seq => Seq
, nonce => Nonce}}).
all_nonces(#worker{nonce = Nonce, config = Config}) ->
case gmhw_pow_cuckoo:repeats(Config) of
1 -> [Nonce];
Rs -> lists:seq(Nonce, Nonce + Rs - 1)
end.
maybe_request_nonces(#st{ candidate = #{via := Via, seq := Seq, nonces := Nonces}
, nonces = N} = S) when ?CONNECTED(S) ->
case Nonces == [] of
true ->
Res = gmhc_handler:call(#{via => Via,
get_nonces => #{ seq => Seq
, n => N }}),
nonces_result(Res, Seq, S);
false ->
S
end;
maybe_request_nonces(S) ->
S.
nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
case Seq == Seq0 of
true ->
wd_ping(),
#st{candidate = Cand} = S,
S#st{candidate = Cand#{nonces => Nonces}};
false ->
?LOG_DEBUG("Seq mismatch - wtf?? ~p - ~p", [Seq, Seq0]),
S
end;
nonces_result({error, Reason}, Seq0, S) ->
?LOG_DEBUG("Got error on nonce request: ~p", [Reason]),
Workers = stop_workers_for_seq(Seq0, S#st.workers),
case Reason of
{timeout, _} ->
Timeout = retry_timeout(1000, 3000),
erlang:start_timer(Timeout, self(), check_workers);
_ ->
ok
end,
S#st{workers = Workers}.
retry_timeout(Floor, Range) ->
Floor + gmhc_lib:rand(Range).
handle_worker_result({worker_result, Result}, W, S) ->
%% ?LOG_DEBUG("worker result: ~p", [Result]),
wd_ping(),
case Result of
{solutions, Solutions} ->
{Cont, S1} = report_solutions_(Solutions, W, S),
maybe_continue(Cont, reset_errors(W), S1);
{solution, Nonce, Solution} ->
%% report_solution(Nonce, Solution, W, S),
{Cont, S1} = report_solutions_([{Nonce, Solution}], W, S),
maybe_continue(Cont, reset_errors(W), S1);
{no_solution, Nonce} ->
report_no_solution(Nonce, W, S),
maybe_restart_worker(reset_errors(W), S);
{error, S} ->
?LOG_DEBUG("Worker ~p reported error as normal", [W#worker.index]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Result})),
Ws = incr_worker_error(W, S#st.workers),
S#st{workers = Ws}
end;
handle_worker_result(Error, W, S) ->
?LOG_DEBUG("Got worker error from ~p: ~p", [W#worker.index, Error]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Error})),
Ws = incr_worker_error(W, S#st.workers),
S#st{workers = Ws}.
report_solutions_(Solutions, W, S) ->
case report_solutions(Solutions, W, S) of
ok ->
%% Ws = reset_worker(W, S#st.workers),
%% Ws1 = stop_workers(Ws),
%% {stopped, S#st{workers = Ws1}};
{continue, S};
continue ->
{continue, S};
{error, _} ->
{error, S}
end.
reset_errors(#worker{} = W) ->
W#worker{errors = 0}.
reset_worker(#worker{index = I} = W, Ws) ->
W1 = reset_worker_(W),
lists:keyreplace(I, #worker.index, Ws, W1).
reset_worker_(W) ->
%% ?LOG_DEBUG("reset_worker ~p", [W#worker.index]),
W#worker{pid = undefined, mref = undefined,
nonce = undefined, cand = undefined}.
incr_worker_error(#worker{errors = Es, index = I} = W, Ws) ->
%% ?LOG_DEBUG("Increment worker (~p) error count: ~p", [I, Es+1]),
W1 = reset_worker_(W#worker{errors = Es+1}),
lists:keyreplace(I, #worker.index, Ws, W1).
%% maybe_continue(stopped, _, S) ->
%% S;
maybe_continue(continue, W, S) ->
maybe_restart_worker(W, S);
maybe_continue(error, W, S) ->
?LOG_INFO("Won't restart worker ~p due to error", [W#worker.index]),
Ws = reset_worker(W, S#st.workers),
S#st{workers = Ws}.
maybe_restart_worker(#worker{index = I} = W, #st{candidate = C} = S) ->
case maps:get(nonces, C) of
[] ->
%% Waiting for nonces
Ws = reset_worker(W, S#st.workers),
S#st{workers = Ws};
Ns ->
{Nonce, Ns1} = pick_nonce(W#worker.nonces, Ns),
%% ?LOG_DEBUG("restart worker ~p with nonce ~p", [I, Nonce]),
W1 = reset_worker_(W),
W2 = spawn_worker(W1#worker{nonce = Nonce}, C),
Ws = lists:keyreplace(I, #worker.index, S#st.workers, W2),
S1 = S#st{candidate = C#{nonces => Ns1}, workers = Ws},
maybe_request_nonces(S1)
end.
%% In a Gajumaru node, a typical worker config might look like this:
%% "cuckoo": {
%% "edge_bits": 29,
%% "miners": [{"executable": "mean29-avx2"},
%% {"executable": "lean29-avx2"},
%% {"executable": "lean29-avx2"},
%% {"executable": "lean29-avx2"}]
%% }
stop_workers(Workers) ->
[stop_worker(W) || W <- Workers].
stop_workers_for_seq(Seq, Workers) ->
[stop_worker(W) || #worker{cand = #{seq := Seq1}} = W <- Workers,
Seq1 =:= Seq].
stop_worker(#worker{pid = Pid} = W) when is_pid(Pid) ->
MRef = erlang:monitor(process, Pid),
?LOG_DEBUG("Will stop worker ~p (MRef = ~p)", [Pid, MRef]),
exit(Pid, shutdown),
receive
{'EXIT', Pid, _} -> ok;
{'DOWN', MRef, process, Pid, _} -> ok
end,
W#worker{pid = undefined, mref = undefined, nonce = undefined};
stop_worker(W) ->
W.
assign_nonces(Ws, #{nonces := Ns} = C) ->
{Ws1, Nonces1} = assign_nonces_(Ws, Ns, []),
{Ws1, C#{nonces => Nonces1}}.
assign_nonces_([W | Ws], [], Acc) ->
assign_nonces_(Ws, [], [W#worker{nonce = undefined}|Acc]);
assign_nonces_([#worker{nonces = N} = W | Ws], Ns, Acc) ->
{Nonce, Ns1} = pick_nonce(N, Ns),
assign_nonces_(Ws, Ns1, [W#worker{nonce = Nonce}|Acc]);
assign_nonces_([], Ns, Acc) ->
{lists:reverse(Acc), Ns}.
-spec calc_nonces(gmhw_pow_cuckoo:config()) -> non_neg_integer().
calc_nonces(Cfg) ->
NInstances = case gmhw_pow_cuckoo:addressed_instances(Cfg) of
undefined -> 1;
L -> length(L)
end,
Repeats = gmhw_pow_cuckoo:repeats(Cfg),
Repeats * NInstances.
pick_nonce(_, [A, A]) ->
%% ?LOG_DEBUG("last nonce (~p)", [A]),
{A, []};
pick_nonce(N, [A, B]) when A < B ->
A1 = A + N,
New = if A1 > B -> [];
true -> [A1, B]
end,
%% ?LOG_DEBUG("Remanining nonces: ~p", [New]),
{A, New}.
%% Dialyzer doesn't like that the fun passed to spawn_link/1
%% doesn't have a local return (it communicates its result via the exit reason).
-dialyzer({no_return, spawn_worker/2}).
spawn_worker(#worker{nonce = undefined} = W, _) ->
W;
spawn_worker(#worker{errors = Es} = W, _) when Es >= ?MAX_ERRORS ->
?LOG_DEBUG("Won't start worker - reached max error count: ~p", [W]),
W;
spawn_worker(#worker{pid = undefined, nonce = Nonce, config = Cfg} = W, Cand) ->
Me = self(),
#{candidate := Data, target := Target, edge_bits := EdgeBits} = Cand,
Pid = spawn_link(
fun() ->
Cfg1 = gmhw_pow_cuckoo:set_edge_bits(EdgeBits, Cfg),
init_worker(Data, Nonce, Target, Cfg1, Me)
end),
MRef = erlang:monitor(process, Pid),
W#worker{pid = Pid, mref = MRef, cand = Cand, nonce = Nonce};
spawn_worker(W, _) ->
% Worker already has work. Don't disturb it.
W.
-spec init_worker(binary(), integer(), integer(), tuple(), pid()) -> no_return().
init_worker(Data, Nonce, Target, Config, Parent) ->
gmhc_events:publish(puzzle, {Data, Target, Nonce, Config}),
Res = gmhc_workers:generate_from_hash(Data, Target, Nonce, Config, undefined),
%% ?LOG_DEBUG("worker result: ~p", [Res]),
gmhc_events:publish(result, Res),
case Res of
{ok, Solutions} when is_list(Solutions) ->
worker_result(Parent, {solutions, Solutions});
%% {ok, {Nonce1, Solution}} ->
%% worker_result(Parent, {solution, Nonce1, Solution});
{error, no_solution} ->
%% TODO: If we are using repeats, then we might report
%% no_solution for each nonce tried.
worker_result(Parent, {no_solution, Nonce});
{error, Other} ->
?LOG_ERROR("Bad worker! {error, ~p}", [Other]),
gmhc_events:publish(error, ?ERR_EVT(#{error => cannot_start_worker,
data => {error, Other}})),
exit(Other)
end.
worker_result(Pid, Result) ->
unlink(Pid),
exit({worker_result, Result}).
decode_candidate_hash(#{candidate := C} = Cand) ->
{ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C),
Cand#{candidate := Hash}.
wd_ping() ->
gmhc_watchdog:ping().