First commit

This commit is contained in:
Ulf Wiger
2025-05-13 23:56:46 +02:00
commit a979e18b51
30 changed files with 2255 additions and 0 deletions
+51
View File
@@ -0,0 +1,51 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_app).
-behaviour(application).
-export([ start/1 ]).
-export([ start/2
, start_phase/3
, prep_stop/1
, stop/1
]).
-include_lib("kernel/include/logger.hrl").
-spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}.
start(Opts) ->
application:load(gmmp_client),
{error,_} = application:stop(gmmp_client),
_ = lists:foreach(fun({K, V}) ->
application:set_env(gmmp_client, K, V)
end, Opts),
application:ensure_all_started(gmmp_client).
start(_StartType, _StartArgs) ->
set_things_up(),
gmhc_sup:start_link().
start_phase(connect_to_primary, _StartType, []) ->
case application:get_env(gmmp_client, auto_connect, true) of
true ->
gmhc_connectors_sup:start_first_connector();
false ->
skip
end,
ok;
start_phase(_Phase, _StartType, _PhaseArgs) ->
ok.
prep_stop(_State) ->
ok.
stop(_State) ->
ok.
set_things_up() ->
gmhc_counters:initialize(),
gmhc_config:load_config(),
logger:set_module_level([gmhw_pow_cuckoo], notice),
?LOG_DEBUG("Config: ~p", [gmconfig:user_config()]),
ok.
+67
View File
@@ -0,0 +1,67 @@
-module(gmhc_config).
-export([ load_config/0
, get_config/1
, verify_cfg_props/1
]).
-include_lib("kernel/include/logger.hrl").
load_config() ->
instrument_gmconfig(),
?LOG_DEBUG("Schema: ~p", [gmconfig:schema()]),
gmconfig:load_user_config(report),
gmconfig:apply_os_env(),
gmconfig:process_plain_args(),
ok.
instrument_gmconfig() ->
gmconfig:set_gmconfig_env(gmconfig_env()).
-spec gmconfig_env() -> gmconfig:gmconfig().
gmconfig_env() ->
#{ os_env_prefix => "GMHC"
, config_file_basename => "gmhive_client_config"
, config_file_os_env => "GMHIVE_CLIENT_CONFIG"
, config_file_search_path => [".", fun setup:home/0, fun setup:data_dir/0 ]
, config_plain_args => "-gmhc"
, system_suffix => ""
, schema => fun gmhc_config_schema:to_json/0 }.
get_config(Path) ->
gmconfig:get_config(Path, cpath(Path)).
cpath([<<"pubkey">>]) -> [user_config, env(pubkey ) ];
cpath([<<"extra_pubkeys">>]) -> [user_config, env(extra_pubkeys ), schema_default];
cpath([<<"pool_admin">>, <<"url">>]) -> [user_config, env(pool_admin_url), schema_default];
cpath([<<"workers">>]) -> [user_config, env(workers ), schema_default];
cpath(_) -> [user_config, schema_default].
env2cpath(pubkey ) -> [<<"pubkey">>];
env2cpath(extra_pubkeys ) -> [<<"extra_pubkeys">>];
env2cpath(pool_admin_url) -> [<<"pool_admin">>, <<"url">>];
env2cpath(workers ) -> [<<"workers">>].
verify_cfg_props(PropList) ->
Cfg = lists:foldl(
fun({K,V}, M) ->
CfgK = env2cpath(K),
gmconfig:merge_config_maps(M, to_cfg_map(CfgK, V))
end, #{}, PropList),
case gmconfig_schema_utils:valid(Cfg) of
Ok when is_map(Ok) ->
ok;
Other ->
error({invalid_config, Other})
end.
to_cfg_map(K, V) ->
to_cfg_map(K, V, #{}).
to_cfg_map([H], V, M) ->
M#{H => V};
to_cfg_map([H|T], V, M) ->
M#{H => to_cfg_map(T, V, M)}.
env(K) ->
{env, gmhive_client, K}.
+98
View File
@@ -0,0 +1,98 @@
-module(gmhc_config_schema).
-export([ schema/0
, to_json/0 ]).
-import(gmconfig_schema_helpers,
[ str/1
, str/2
, pos_int/2
, int/2
, bool/2
, obj/1
, obj/2
, array/2
, schema_init/0
]).
-define(ACCOUNT_PATTERN, <<"^ak_[1-9A-HJ-NP-Za-km-z]*$">>).
-define(CONTRACT_PATTERN, <<"^ct_[1-9A-HJ-NP-Za-km-z]*$">>).
to_json() ->
json:encode(schema()).
schema() ->
obj(schema_init(),
#{
pubkey => str(#{pattern => ?ACCOUNT_PATTERN},
<<"Primary client pubkey">>)
, extra_pubkeys => array(#{ description =>
<<"Additional miner pubkeys, sharing rewards">>
, default => []
},
str(#{pattern => ?ACCOUNT_PATTERN}))
, type => str(#{ enum => [<<"miner">>, <<"monitor">>]
, default => <<"miner">>
, description => <<"monitor mode can be used to see if a pool is alive">>})
, pool => pool()
, pool_admin => pool_admin()
, workers => workers()
}).
pool() ->
obj(#{
id => str(#{ pattern => ?CONTRACT_PATTERN},
<<"Pool contract id">>),
host => str(#{ default => <<"127.0.0.1">>
, example => <<"0.0.0.0">>
, description => <<"Hostname of hive server">> })
, port => pos_int(17888, <<"Hive server listen port">>)
}).
pool_admin() ->
obj(#{
url => str(#{ default => <<"https://test.gajumining.com/api/workers/{CLIENT_ID}">>
, description => <<"URL of Eureka worker api">> })
}).
workers() ->
array(
#{default => [#{executable => <<"mean29-generic">>}],
description =>
<<"Definitions of workers' configurations. If no worker are configured one worker "
"is used as default, i.e. 'mean29-generic' executable without any extra args.">>},
obj(#{required => [<<"executable">>]},
#{executable =>
str(#{default => <<"mean29-generic">>,
description =>
<<"Executable binary of the worker. Options are: \"mean29-generic\""
" (memory-intensive), "
"\"mean29-avx2\" (memory-intensive, benefits from faster CPU supporting"
" AVX2 instructions), "
"\"lean29-generic\" (CPU-intensive, useful if memory-constrained), "
"\"lean29-avx2\" (CPU-intensive, useful if memory-constrained, benefits "
"from faster CPU supporting AVX2 instructions).">>}),
executable_group =>
str(#{description => <<"Group of executable binaries of the worker.">>,
enum => [ <<"aecuckoo">>, <<"aecuckooprebuilt">>, <<"gmcuckkoo">>, <<"cuda">>, <<"gajumine">> ],
default => <<"aecuckoo">>}),
extra_args =>
str(#{description => <<"Extra arguments to pass to the worker executable binary. "
"The safest choice is specifying no arguments i.e. empty string.">>,
default => <<>>}),
hex_encoded_header =>
bool(false, <<"Hexadecimal encode the header argument that is send to the worker executable. "
"CUDA executables expect hex encoded header.">>),
repeats =>
int(1, <<"Number of tries to do in each worker context - "
"WARNING: it should be set so the worker process "
"runs for 3-5s or else the node risk missing out on new micro blocks.">>),
instances =>
array(#{description =>
<<"Instances used by the worker in case of Multi-GPU mining. "
"Numbers on the configuration list represent GPU devices that are to "
"be addressed by the worker.">>,
minItems => 1,
example => [0,1,2,3]},
#{type => <<"integer">>})
})).
+468
View File
@@ -0,0 +1,468 @@
-module(gmhc_connector).
-behaviour(gen_server).
-export([ start_link/1
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
connect/1
, disconnect/1
, status/0
, status/1
, send/2
]).
-export([ whereis_id/1 ]).
%% for outcoing messages
%% -export([ solution/3
%% , no_solution/2
%% , nonces/2
%% ]).
-include_lib("kernel/include/logger.hrl").
-define(MAX_RETRY_INTERVAL, 30000).
-type retry_opts() :: #{ deadline => pos_integer() }.
-type timer_ref() :: {reference(), non_neg_integer(), non_neg_integer(), retry_opts()}.
-type id() :: non_neg_integer().
-type connect_opts() :: #{ host => string()
, port => pos_integer()
, timeout => pos_integer() %% overrides deadline
, deadline => pos_integer() %% ms monotonic must be in the future
, nowait => boolean() %% default: false
, tcp_opts => list()
, enoise_opts => list() %% if present, MUST contain {'noise', _}
, tcp_opts => list()
, connect_timeout => pos_integer()
}.
-export_type([ id/0
, connect_opts/0 ]).
-record(st, {
id :: non_neg_integer()
, auto_connect = true :: boolean()
, econn
, reconnect_timer :: timer_ref() | 'undefined'
, awaiting_connect = [] :: list()
, protocol :: binary() | 'undefined'
, version :: binary() | 'undefined'
, opts = #{} :: map()
}).
-spec connect(connect_opts()) -> ok | {error, any()}.
connect(Opts) when is_map(Opts) ->
{Timeout, Opts1} = manual_connect_timeout(Opts),
gen_server:call(?MODULE, {connect, Opts1}, Timeout).
manual_connect_timeout(#{timeout := T} = Opts) ->
Deadline = erlang:monotonic_time(millisecond) + T,
{T, maps:remove(timeout, Opts#{deadline => Deadline})};
manual_connect_timeout(#{deadline := D} = Opts) ->
Timeout = D - erlang:monotonic_time(millisecond),
if Timeout < 0 ->
error(invalid_deadline);
true ->
{Timeout, Opts}
end;
manual_connect_timeout(Opts) ->
DefaultT = 10000,
Deadline = erlang:monotonic_time(millisecond) + DefaultT,
{DefaultT, Opts#{deadline => Deadline}}.
disconnect(Id) ->
gen_server:call(via(Id), disconnect).
send(Via, Msg) ->
gen_server:cast(via(Via), {send, Msg}).
status() ->
{via, gproc, HeadPat} = via('$1'),
Connectors = gproc:select([{{HeadPat,'_','_'}, [], ['$1']}]),
[{Id, status_(via(Id))} || Id <- Connectors].
status(Id) ->
{via, gproc, Req} = via(Id),
case gproc:where(Req) of
undefined ->
disconnected;
Pid when is_pid(Pid) ->
status_(Pid)
end.
status_(Proc) ->
try gen_server:call(Proc, status)
catch
error:_ ->
disconnected
end.
%% start_link() ->
%% start_link(#{}).
whereis_id(Id) ->
gproc:where(reg(Id)).
reg(Id) ->
{n, l, {?MODULE, Id}}.
via(Id) ->
{via, gproc, reg(Id)}.
start_link(#{id := Id} = Opts) ->
gen_server:start_link(via(Id), ?MODULE, Opts, []).
init(#{id := Id} = Opts) when is_map(Opts) ->
AutoConnect = opt_autoconnect(Opts),
S0 = #st{id = Id, auto_connect = AutoConnect},
Nowait = maps:get(nowait, Opts, false),
if Nowait ->
proc_lib:init_ack({ok, self()});
true ->
ok
end,
S1 =
case AutoConnect of
true ->
case try_connect(Opts, S0) of
{ok, S} ->
?LOG_DEBUG("Initial connect succeeded", []),
S;
{error, _} = Error ->
?LOG_WARNING("Could not connect to core server: ~p", [Error]),
start_reconnect_timer(S0#st{econn = undefined})
end;
false ->
S0
end,
if Nowait ->
gen_server:enter_loop(?MODULE, [], S1, via(Id));
true ->
{ok, S1}
end.
handle_call(status, _From, #st{econn = EConn} = S) ->
Status = case EConn of
undefined -> disconnected;
_ -> connected
end,
{reply, Status, S};
handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) ->
Nowait = maps:get(nowait, Opts, false),
case Nowait of
true -> gen_server:reply(From, ok);
false -> ok
end,
case try_connect(Opts, S) of
{ok, S1} ->
if Nowait -> {noreply, S1};
true -> {reply, ok, S1}
end;
{error, _} = Error ->
case maps:get(retry, Opts, true) of
true ->
Waiters1 = if Nowait -> Waiters;
true -> [{From, retry_opts(Opts)}|Waiters]
end,
S1 = start_reconnect_timer(
S#st{ auto_connect = true
, awaiting_connect = Waiters1 }, Opts),
{noreply, S1};
false ->
if Nowait -> {noreply, S};
true -> {reply, Error, S}
end
end
end;
handle_call(disconnect, _From, #st{econn = EConn} = S) ->
case EConn of
undefined ->
ok;
_ ->
enoise:close(EConn)
end,
S1 = cancel_reconnect_timer(S#st{ auto_connect = false
, econn = undefined }),
{reply, ok, S1};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast({send, Msg0}, #st{ econn = EConn
, protocol = P
, version = V } = S) when EConn =/= undefined ->
try
Msg = maps:remove(via, Msg0),
Data = gmhp_msgs:encode(Msg, P, V),
enoise:send(EConn, Data)
catch
error:E:T ->
?LOG_ERROR("CAUGHT error:~p / ~p", [E, T])
end,
{noreply, S};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({noise, EConn, Data}, #st{ id = Id
, econn = EConn
, protocol = P, version = V} = S) ->
try gmhp_msgs:decode(Data, P, V) of
Msg ->
gmhc_handler:from_pool(Msg#{via => Id})
catch
error:E ->
?LOG_WARNING("Unknown message (~p): ~p", [E, Data])
end,
{noreply, S};
handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _, _, _}
, auto_connect = true } = S) ->
case try_connect(Opts, S) of
{ok, S1} ->
?LOG_DEBUG("protocol connected", []),
{noreply, S1};
{error, _} = Error ->
?LOG_DEBUG("Reconnect attempt failed: ~p", [Error]),
{noreply, restart_reconnect_timer(S)}
end;
handle_info({tcp_closed, _Port}, #st{} = S) ->
?LOG_DEBUG("got tcp_closed", []),
disconnected(S#st.id),
S1 = case S#st.auto_connect of
true -> start_reconnect_timer(S#st{econn = undefined});
false -> S#st{econn = undefined}
end,
{noreply, S1};
handle_info(Msg, S) ->
?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
%% try_connect() ->
%% try_connect(#{}, #st{}).
%% try_connect(#st{} = S) ->
%% try_connect(#{}, S).
try_connect(Opts, S) ->
try try_connect_(Opts, S)
catch
error:E:T ->
?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]),
{error, E}
end.
try_connect_(Opts0, S) ->
case eureka_get_host_port() of
{error, _} = Error ->
Error;
PoolOpts when is_map(PoolOpts) ->
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
{ok, EConn, Opts1} ->
S1 = protocol_connect(Opts1, S#st{ econn = EConn
, reconnect_timer = undefined }),
{ok, S1};
{error, _} = Error ->
Error
end
end.
eureka_get_host_port() ->
case gmhc_eureka:get_pool_address() of
#{<<"address">> := Host,
<<"port">> := Port,
<<"pool_id">> := PoolId} ->
#{host => binary_to_list(Host),
port => Port,
pool_id => binary_to_list(PoolId)};
{error, _} = Error ->
Error
end.
try_noise_connect(Opts) ->
Host = binary_to_list(gmhc_config:get_config([<<"pool">>, <<"host">>])),
Port = gmhc_config:get_config([<<"pool">>, <<"port">>]),
noise_connect(maps:merge(#{host => Host, port => Port}, Opts)).
noise_connect(#{host := Host, port := Port} = Opts) ->
TcpOpts = maps:get(tcp_opts, Opts, default_tcp_opts()),
Timeout = maps:get(connect_timeout, Opts, 5000),
?LOG_DEBUG("TCP connect: Host=~p, Port=~p, Timeout=~p, Opts=~p",
[Host,Port,Timeout,TcpOpts]),
case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of
{ok, TcpSock} ->
?LOG_DEBUG("Connected, TcpSock = ~p", [TcpSock]),
EnoiseOpts = maps:get(enoise_opts, Opts, enoise_opts()),
case enoise:connect(TcpSock, EnoiseOpts) of
{ok, EConn, _FinalSt} ->
{ok, EConn, Opts#{ tcp_opts => TcpOpts
, timeout => Timeout
, enoise_opts => EnoiseOpts }};
{error, _} = Err ->
Err
end;
{error, _} = TcpErr ->
?LOG_DEBUG("TCP connection failed: ~p", [TcpErr]),
TcpErr
end.
default_tcp_opts() ->
[ {active, true}
, {reuseaddr, true}
, {mode, binary}
].
enoise_opts() ->
[{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}].
start_reconnect_timer(#st{} = S) ->
start_reconnect_timer(S, #{}).
start_reconnect_timer(#st{} = S, Opts) ->
case deadline_reached(Opts) of
{true, D} ->
?LOG_DEBUG("timer deadline reached, not restarting timer", []),
notify_deadline(D, S#st{reconnect_timer = undefined});
false ->
?LOG_DEBUG("starting reconnect timer ...", []),
TRef = start_timer(1000, Opts),
S#st{reconnect_timer = {TRef, 10, 1000, Opts}}
end.
restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) ->
NewT = max(T * 2, ?MAX_RETRY_INTERVAL),
TRef = start_timer(NewT, Opts),
S#st{reconnect_timer = {TRef, 10, NewT, Opts}};
restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) ->
TRef = start_timer(T, Opts),
S#st{reconnect_timer = {TRef, N-1, T, Opts}}.
deadline_reached(#{deadline := D}) ->
case erlang:monotonic_time(millisecond) > D of
true -> {true, D};
false -> false
end;
deadline_reached(_) ->
false.
notify_deadline(D, #st{awaiting_connect = Waiters} = S) ->
Waiters1 =
lists:foldr(fun({From, D1}, Acc) when D1 == D ->
gen_server:reply(From, {error, timeout}),
Acc;
(Other, Acc) ->
[Other | Acc]
end, [], Waiters),
S#st{awaiting_connect = Waiters1}.
notify_connected(#st{id = Id, awaiting_connect = Waiters} = S) ->
gmhc_events:publish(connected, #{id => Id}),
[gen_server:reply(From, ok) || {From, _} <- Waiters],
gmhc_handler:pool_connected(S#st.id, S#st.opts),
S#st{awaiting_connect = []}.
cancel_reconnect_timer(#st{reconnect_timer = T} = S) ->
case T of
undefined -> S;
{TRef, _, _, _} ->
erlang:cancel_timer(TRef),
S#st{reconnect_timer = undefined}
end.
start_timer(T, Opts0) ->
Opts = retry_opts(Opts0),
erlang:start_timer(T, self(), {reconnect, Opts}).
retry_opts(Opts) ->
maps:with([deadline], Opts).
protocol_connect(Opts, #st{econn = EConn} = S) ->
Pubkey = to_bin(opt(pubkey, Opts, [<<"pubkey">>])),
Extra = [to_bin(X) || X <- opt(extra_pubkeys, Opts, [<<"extra_pubkeys">>])],
PoolId = to_bin(opt(pool_id, Opts, [<<"pool">>, <<"id">>])),
Type = to_atom(opt(type, Opts, [<<"type">>])),
RId = erlang:unique_integer(),
Vsns = gmhp_msgs:versions(),
Protocols = gmhp_msgs:protocols(hd(Vsns)),
ConnectReq = #{ protocols => Protocols
, versions => Vsns
, pool_id => PoolId
, pubkey => Pubkey
, extra_pubkeys => Extra
, type => Type
, nonces => gmhc_server:total_nonces()
, signature => ""},
?LOG_DEBUG("ConnectReq = ~p", [ConnectReq]),
Msg = gmhp_msgs:encode_connect(ConnectReq, RId),
enoise:send(EConn, Msg),
receive
{noise, EConn, Data} ->
case gmhp_msgs:decode_connect_ack(Data) of
#{reply := #{ id := RId
, result := #{connect_ack := #{ protocol := P
, version := V }}
}} ->
connected(S#st.id, Type),
Opts1 = Opts#{ pubkey => Pubkey
, extra => Extra
, pool_id => PoolId
, type => Type },
notify_connected(S#st{protocol = P, version = V, opts = Opts1});
#{error := #{message := Msg}} ->
?LOG_ERROR("Connect error: ~s", [Msg]),
error(protocol_connect)
end
after 10000 ->
error(protocol_connect_timeout)
end.
to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8);
to_bin(S) ->
iolist_to_binary(S).
to_atom(A) when is_atom(A) -> A;
to_atom(S) ->
binary_to_existing_atom(iolist_to_binary(S), utf8).
connected(Id, Type) when Type==miner; Type==monitor ->
gmhc_server:connected(Id, Type).
disconnected(Id) ->
gmhc_events:publish(disconnected, #{id => Id}),
gmhc_server:disconnected(Id).
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
Bool;
opt_autoconnect(#{id := Id}) ->
case Id of
1 ->
application:get_env(gmhive_client, auto_connect, true);
_ ->
true
end.
opt(OptsK, Opts, SchemaPath) ->
case maps:find(OptsK, Opts) of
error ->
gmhc_config:get_config(SchemaPath);
{ok, V} ->
V
end.
+34
View File
@@ -0,0 +1,34 @@
-module(gmhc_connectors_sup).
-behavior(supervisor).
-export([ start_link/0
, init/1 ]).
-export([ start_first_connector/0
, start_connector/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_first_connector() ->
start_connector(#{}).
start_connector(Opts0) ->
Opts = case maps:is_key(id, Opts0) of
true -> Opts0;
false -> Opts0#{id => gmhc_counters:add_read(connector)}
end,
supervisor:start_child(?MODULE, [Opts]).
init([]) ->
Mod = gmhc_connector,
SupFlags = #{ strategy => simple_one_for_one
, intensity => 3
, period => 10 },
ChildSpecs = [ #{ id => Mod
, start => {Mod, start_link, []}
, type => worker
, restart => transient
, shutdown => 5000
, modules => [Mod] } ],
{ok, {SupFlags, ChildSpecs}}.
+53
View File
@@ -0,0 +1,53 @@
-module(gmhc_counters).
-export([ initialize/0 ]).
-export([ add/1
, add/2
, add_read/1
, add_read/2
, get_value/1
, values/0 ]).
counters() ->
#{ connector => 1 }.
initialize() ->
Counters = counters(),
Size = map_size(Counters),
CRef = counters:new(Size, []),
put_counters(#{ref => CRef, counters => Counters}).
add(Counter) ->
add(Counter, 1).
add(Counter, Incr) when is_integer(Incr), Incr >= 0 ->
{CRef, Ix} = counter_ix(Counter),
counters:add(CRef, Ix, Incr).
add_read(Counter) ->
add_read(Counter, 1).
add_read(Counter, Incr) when is_integer(Incr), Incr >= 0 ->
{CRef, Ix} = counter_ix(Counter),
counters:add(CRef, Ix, Incr),
counters:get(CRef, Ix).
get_value(Counter) ->
{CRef, Ix} = counter_ix(Counter),
counters:get(CRef, Ix).
values() ->
#{ref := CRef, counters := Counters} = get_counters(),
maps:map(fun(_,Ix) -> counters:get(CRef, Ix) end, Counters).
counter_ix(Counter) ->
#{ref := CRef, counters := #{Counter := Ix}} = get_counters(),
{CRef, Ix}.
put_counters(C) ->
persistent_term:put(?MODULE, C).
get_counters() ->
persistent_term:get(?MODULE).
+145
View File
@@ -0,0 +1,145 @@
-module(gmhc_eureka).
-export([get_pool_address/0]).
-include_lib("kernel/include/logger.hrl").
-include("gmhc_events.hrl").
get_pool_address() ->
URL0 = gmhc_config:get_config([<<"pool_admin">>, <<"url">>]),
case expand_url(URL0) of
<<"local">> ->
#{<<"address">> => <<"127.0.0.1">>,
<<"port">> => gmconfig:get_config([<<"pool">>, <<"port">>], [schema_default]),
<<"pool_id">> => gmhc_config:get_config([<<"pool">>, <<"id">>]) };
URL ->
?LOG_INFO("Trying to connect to ~p", [URL]),
connect1(URL)
end.
connect1(URL0) ->
URL = binary_to_list(URL0),
Res = request(get, URL),
?LOG_DEBUG("Res = ~p", [Res]),
case Res of
{ok, Body} ->
try get_host_port(json:decode(iolist_to_binary(Body)))
catch
error:_ ->
gmhc_events:publish(error, ?ERR_EVT(#{error => invalid_json,
data => Body})),
{error, invalid_json}
end;
{error, _} = Error ->
gmhc_events:publish(error, ?ERR_EVT(#{error => connect_failure,
data => Error})),
Error
end.
get_host_port(Data) ->
?LOG_DEBUG("Data = ~p", [Data]),
maps:with([<<"address">>, <<"port">>, <<"pool_id">>], Data).
request(get, URL) ->
case request(get, URL, []) of
{ok, #{body := Body}} ->
{ok, Body};
Other ->
%% TODO: perhaps return a more informative reason?
gmhc_events:publish(error, ?ERR_EVT(#{error => get_failed,
url => URL,
data => Other })),
{error, failed}
end.
request(get, URL, []) ->
Headers = [],
HttpOpts = [{timeout, 15000}],
Opts = [],
Profile = default,
request_result(httpc:request(get, {URL, Headers}, HttpOpts, Opts, Profile));
request(post, URL, Body) ->
post_request(URL, Body).
expand_url(URL) ->
case re:run(URL, <<"{[^}]+}">>, []) of
{match, _} ->
expand_vars(URL);
nomatch ->
URL
end.
expand_vars(S) ->
expand_vars(S, <<>>).
expand_vars(<<"{", Rest/binary>>, Acc) ->
{Var, Rest1} = get_var_name(Rest),
expand_vars(Rest1, <<Acc/binary, (expand_var(Var))/binary>>);
expand_vars(<<H, T/binary>>, Acc) ->
expand_vars(T, <<Acc/binary, H>>);
expand_vars(<<>>, Acc) ->
Acc.
expand_var(<<"CLIENT_ID">>) ->
gmhc_config:get_config([<<"pubkey">>]).
get_var_name(S) ->
get_var_name(S, <<>>).
get_var_name(<<"}", Rest/binary>>, Acc) ->
{Acc, Rest};
get_var_name(<<H, T/binary>>, Acc) ->
get_var_name(T, <<Acc/binary, H>>).
%% From hz.erl ==========================================================
% This is Bitcoin's variable-length unsigned integer encoding
% See: https://en.bitcoin.it/wiki/Protocol_documentation#Variable_length_integer
%% vencode(N) when N =< 0 ->
%% {error, {non_pos_N, N}};
%% vencode(N) when N < 16#FD ->
%% {ok, <<N>>};
%% vencode(N) when N =< 16#FFFF ->
%% NBytes = eu(N, 2),
%% {ok, <<16#FD, NBytes/binary>>};
%% vencode(N) when N =< 16#FFFF_FFFF ->
%% NBytes = eu(N, 4),
%% {ok, <<16#FE, NBytes/binary>>};
%% vencode(N) when N < (2 bsl 64) ->
%% NBytes = eu(N, 8),
%% {ok, <<16#FF, NBytes/binary>>}.
% eu = encode unsigned (little endian with a given byte width)
% means add zero bytes to the end as needed
%% eu(N, Size) ->
%% Bytes = binary:encode_unsigned(N, little),
%% NExtraZeros = Size - byte_size(Bytes),
%% ExtraZeros = << <<0>> || _ <- lists:seq(1, NExtraZeros) >>,
%% <<Bytes/binary, ExtraZeros/binary>>.
%% ======================================================================
%% From gmplugin_web_demo_handler.erl ===================================
post_request(URL, Map) ->
?LOG_DEBUG("Map = ~p", [Map]),
Body = json:encode(Map),
?LOG_DEBUG("Body = ~s", [Body]),
PostRes = httpc:request(post, {URL, [], "application/json", Body}, [], []),
request_result(PostRes).
%% ======================================================================
request_result(Result) ->
?LOG_DEBUG("Request result: ~p", [Result]),
request_result_(Result).
request_result_({ok, {{_, C200, Ok}, _Hdrs, Body}}) when C200 >= 200, C200 < 300 ->
{ok, #{code => C200, msg => Ok, body => Body}};
request_result_({ok, {{_, C200, Ok}, Body}}) when C200 >= 200, C200 < 300 ->
{ok, #{code => C200, msg => Ok, body => Body}};
request_result_({ok, {{_, Code, Error}, _Hdrs, Body}}) ->
{error, #{code => Code, msg => Error, body => Body}};
request_result_(_) ->
{error, #{code => 500, msg => <<"Internal error">>, body => <<>>}}.
+76
View File
@@ -0,0 +1,76 @@
-module(gmhc_events).
-export([subscribe/1,
ensure_subscribed/1,
unsubscribe/1,
ensure_unsubscribed/1,
publish/2]).
-export([debug/0]).
-export_type([event/0]).
-type event() :: pool_notification
| {pool_notification, atom()}
| error
| connected
| disconnected.
-spec publish(event(), any()) -> ok.
publish(Event, Info) ->
Data = #{sender => self(),
time => os:timestamp(),
info => Info},
_ = gproc_ps:publish(l, Event, Data),
ok.
-spec subscribe(event()) -> true.
subscribe(Event) ->
gproc_ps:subscribe(l, Event).
-spec ensure_subscribed(event()) -> true.
%% @doc Subscribes to Event. Will not crash if called more than once.
ensure_subscribed(Event) ->
try subscribe(Event)
catch
error:badarg ->
%% This assertion should not be needed, since there is
%% no other scenario that would cause subscribe/1 to fail,
%% other than gproc not running (would also cause a badarg)
_ = gproc:get_value({p,l,{gproc_ps_event, Event}}, self()),
true
end.
-spec unsubscribe(event()) -> true.
unsubscribe(Event) ->
gproc_ps:unsubscribe(l, Event).
-spec ensure_unsubscribed(event()) -> true.
ensure_unsubscribed(Event) ->
case lists:member(self(), gproc_ps:list_subs(l,Event)) of
true ->
unsubscribe(Event);
false ->
true
end.
debug() ->
ok = application:ensure_started(gproc),
spawn(fun() ->
subscribe(pool_notification),
subscribe({pool_notification, new_generation}),
subscribe(connected),
subscribe(error),
subscribe(disconnected),
loop()
end).
loop() ->
receive
stop -> ok;
{gproc_ps_event, E, Data} ->
io:fwrite("EVENT ~p: ~p~n", [E, Data]),
loop()
end.
+2
View File
@@ -0,0 +1,2 @@
-define(ERR_EVT(Info), #{module => ?MODULE, line => ?LINE, info => Info}).
+170
View File
@@ -0,0 +1,170 @@
-module(gmhc_handler).
-behavior(gen_server).
-export([ start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([ call/1
, notify/1
, pool_connected/2
, from_pool/1 ]).
-record(pool, { id :: gmhc_connector:id()
, pid :: pid() | 'undefined'
, mref :: reference()
, connected = false :: boolean()
, keep = true :: boolean()
, host :: string()
, port :: pos_integer()
, opts = #{} :: map() }).
-record(st, {pools = [], opts = #{}}).
-define(CALL_TIMEOUT, 5000).
-include_lib("kernel/include/logger.hrl").
call(Req) ->
try gen_server:call(?MODULE, {call, Req}, ?CALL_TIMEOUT)
catch
exit:Reason ->
{error, Reason}
end.
notify(Msg) ->
gen_server:cast(?MODULE, {notify, Msg}).
pool_connected(Id, Opts) ->
gen_server:cast(?MODULE, {pool_connected, Id, self(), Opts}).
from_pool(Msg) ->
ToSend = {from_pool, Msg},
?MODULE ! ToSend.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({call, Req}, _From, #st{} = S) ->
{reply, call_connector(Req), S};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_method}, S}.
handle_cast({pool_connected, Id, Pid, Opts}, #st{pools = Pools} = S) ->
MRef = erlang:monitor(process, Pid),
case lists:keyfind(Id, #pool.id, Pools) of
#pool{} = P ->
P1 = P#pool{ connected = true
, pid = Pid
, mref = MRef },
Pools1 = lists:keyreplace(Id, #pool.id, Pools, P1),
{noreply, S#st{pools = Pools1}};
false ->
P = #pool{ id = Id
, pid = Pid
, mref = MRef
, host = maps:get(host, Opts)
, port = maps:get(port, Opts)
, opts = Opts },
{noreply, S#st{pools = [P | Pools]}}
end;
handle_cast({notify, Msg}, #st{} = S) ->
notify_connector(Msg),
{noreply, S};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({from_pool, Msg}, S) ->
maybe_publish(Msg),
case Msg of
#{notification := #{new_server := #{ host := _
, port := _
, keep := _ } = Server}} ->
{noreply, start_pool_connector(Server, S)};
_ ->
gmhc_server:from_pool(Msg),
{noreply, S}
end;
handle_info(Msg, S) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
maybe_publish(#{notification := Msg} = N) ->
Info = maybe_via(N, #{msg => Msg}),
gmhc_events:publish(pool_notification, Info),
if map_size(Msg) == 1 ->
[Tag] = maps:keys(Msg),
gmhc_events:publish({pool_notification, Tag}, Info);
true ->
ok
end;
maybe_publish(_) ->
ok.
maybe_via(#{via := Via}, Info) ->
Info#{via => Via}.
call_connector(Req0) ->
{ViaId, Req} = maps:take(via, Req0),
case gmhc_connector:whereis_id(ViaId) of
undefined ->
{error, no_connection};
Pid when is_pid(Pid) ->
Id = erlang:unique_integer(),
MRef = erlang:monitor(process, Pid),
gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}),
receive
{from_pool, #{reply := #{ id := Id, result := Result }}} ->
Result;
{from_pool, #{error := #{ id := Id } = Error}} ->
{error, maps:remove(id, Error)};
{'DOWN', MRef, _, _, _} ->
{error, no_connection}
after 5000 ->
{error, {timeout, process_info(self(), messages)}}
end
end.
notify_connector(Msg0) ->
{Via, Msg} = maps:take(via, Msg0),
gmhc_connector:send(Via, #{notification => #{msg => Msg}}).
start_pool_connector(#{ host := Host
, port := Port
, keep := Keep }, #st{pools = Pools, opts = Opts} = S) ->
case [P || #pool{host = H1, port = P1} = P <- Pools,
H1 == Host,
P1 == Port] of
[] ->
case gmhive_client:connect(Opts#{ host => Host
, port => Port
, nowait => true }) of
{ok, CId} ->
Pid = gmhc_connector:whereis_id(CId),
MRef = erlang:monitor(process, Pid),
P = #pool{ id = CId, host = Host, port = Port,
keep = Keep, pid = Pid, mref = MRef },
S#st{pools = [P | Pools]};
{error, _} = Error ->
?LOG_WARNING("Could not start connector to ~p:~p - ~p",
[Host, Port, Error]),
S
end;
[#pool{} = _P] ->
?LOG_DEBUG("Already have a pool entry for ~p:~p", [Host, Port]),
S
end.
+392
View File
@@ -0,0 +1,392 @@
-module(gmhc_server).
-behaviour(gen_server).
-export([ connected/2
, disconnected/1
, from_pool/1
, new_candidate/1
]).
-export([ total_nonces/0 ]).
-export([
start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-include_lib("kernel/include/logger.hrl").
-include("gmhc_events.hrl").
-record(worker, { config
, nonces = 0
, index
, pid
, mref
, cand
, nonce
, errors = 0}).
-type worker() :: #worker{}.
-type type() :: monitor | worker.
-record(st, {
connected = #{} :: #{non_neg_integer() => {pid(), type()}}
, working = false :: boolean()
, candidate :: map() | 'undefined'
, nonces = 1 :: pos_integer()
, workers = [] :: [worker()]
}).
-define(CONNECTED(S), map_size(S#st.connected) > 0).
-define(MAX_ERRORS, 5).
connected(Id, Type) ->
gen_server:call(?MODULE, {connected, Id, Type}).
disconnected(Id) ->
gen_server:cast(?MODULE, {disconnected, Id}).
from_pool(Msg) ->
ToSend = {from_pool, Msg},
%% ?LOG_DEBUG("Sending to server: ~p", [ToSend]),
gen_server:cast(?MODULE, ToSend).
new_candidate(Cand) ->
gen_server:cast(?MODULE, {new_candidate, Cand}).
total_nonces() ->
gen_server:call(?MODULE, total_nonces).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
WorkerConfigs = gmhc_workers:get_worker_configs(),
?LOG_DEBUG("WorkerConfigs = ~p", [WorkerConfigs]),
%% IdleWorkers = [#worker{executable = E} || E <- Instances],
{IdleWorkers,_} = lists:mapfoldl(
fun(C, N) ->
NNonces = calc_nonces(C),
{#worker{index = N, config = C, nonces = NNonces}, N+1}
end, 1, WorkerConfigs),
TotalNonces = lists:foldl(fun(#worker{nonces = N}, Acc) ->
N + Acc
end, 0, IdleWorkers),
process_flag(trap_exit, true),
{ok, #st{workers = IdleWorkers, nonces = TotalNonces}}.
handle_call(total_nonces, _From, #st{nonces = Nonces} = S) ->
{reply, Nonces, S};
handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) ->
?LOG_DEBUG("connected: ~p, ~p", [Id, Type]),
erlang:monitor(process, Pid),
S1 = S#st{connected = Conn#{Id => {Pid, Type}}},
S2 = case Type of
monitor ->
stop_workers(S1#st.workers), % shouldn't be any running
S1#st{workers = [], working = false};
worker -> S1#st{working = true}
end,
{reply, ok, S2};
handle_call(_Req, _From, S) ->
{reply, unknown_call, S}.
handle_cast({from_pool, #{via := Connector,
notification :=
#{candidate := Cand0}}},
#st{workers = Workers} = S) ->
Cand = maps:put(via, Connector, decode_candidate_hash(Cand0)),
%% ?LOG_DEBUG("Got new candidate; will mine it: ~p", [Cand]),
%% For now, stop all workers, restart with new candidate
Workers1 = stop_workers(Workers),
{Workers2, Cand1} = assign_nonces(Workers1, Cand),
#st{candidate = Cand2} = S1 = maybe_request_nonces(S#st{candidate = Cand1}),
NewWorkers = [spawn_worker(W, Cand2) || W <- Workers2],
{noreply, S1#st{workers = NewWorkers}};
handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
?LOG_DEBUG("disconnected: ~p", [Id]),
Conn1 = maps:remove(Id, Conn),
S1 = if map_size(Conn1) == 0 ->
Ws = stop_workers(S#st.workers),
S#st{connected = Conn1, workers = Ws};
true -> S#st{connected = Conn1}
end,
{noreply, S1};
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({'DOWN', MRef, process, Pid, Reason}, #st{ workers = Workers
, connected = Connected
, working = Working} = S)
when ?CONNECTED(S), Working ->
%% ?LOG_DEBUG("DOWN from ~p: ~p", [Pid, Reason]),
case lists:keyfind(Pid, #worker.pid, Workers) of
#worker{mref = MRef} = W ->
S1 = handle_worker_result(Reason, W, S),
{noreply, S1};
false ->
Conn1 = maps:filter(fun(_, {P,_}) -> P =/= Pid end, Connected),
{noreply, S#st{connected = Conn1}}
end;
handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers
, working = Working} = S)
when ?CONNECTED(S), Working ->
case lists:keyfind(Pid, #worker.pid, Workers) of
#worker{} = W ->
%% ?LOG_DEBUG("EXIT from worker ~p: ~p", [W#worker.index, Reason]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Reason})),
Ws1 = incr_worker_error(W, Workers),
{noreply, S#st{workers = Ws1}};
false ->
%% ?LOG_DEBUG("EXIT apparently not from worker?? (~p)", [Pid]),
{noreply, S}
end;
handle_info(Msg, St) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, St}.
terminate(_Reason, _St) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
report_solutions(Solutions, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
gmhc_handler:call(
#{via => Via,
solutions => #{ seq => Seq
, found => [#{ nonce => Nonce
, evidence => Evd }
|| {Nonce, Evd} <- Solutions] }}).
%% report_solution(Nonce, Solution, W, #st{connected = true}) ->
%% #{seq := Seq} = W#worker.cand,
%% gmhc_handler:call(#{solution => #{ seq => Seq
%% , nonce => Nonce
%% , evidence => Solution }}).
report_no_solution(Nonce, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
%% ?LOG_DEBUG("report no_solution Seq = ~p, Nonce = ~p", [Seq, Nonce]),
gmhc_handler:call(#{via => Via,
no_solution => #{ seq => Seq
, nonce => Nonce}}).
maybe_request_nonces(#st{ candidate = #{via := Via, seq := Seq, nonces := Nonces}
, nonces = N} = S) when ?CONNECTED(S) ->
case Nonces == [] of
true ->
%% ?LOG_DEBUG("Request more nonces, Seq = ~p, N = ~p", [Seq, N]),
Res = gmhc_handler:call(#{via => Via,
get_nonces => #{ seq => Seq
, n => N }}),
nonces_result(Res, Seq, S);
false ->
S
end;
maybe_request_nonces(S) ->
S.
nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
case Seq == Seq0 of
true ->
%% ?LOG_DEBUG("Got nonces = ~p", [Nonces]),
#st{candidate = Cand} = S,
S#st{candidate = Cand#{nonces => Nonces}};
false ->
?LOG_DEBUG("Seq mismatch - wtf?? ~p - ~p", [Seq, Seq0]),
S
end;
nonces_result({error, #{message := <<"outdated">>}}, Seq0, S) ->
Workers = stop_workers_for_seq(Seq0, S#st.workers),
S#st{workers = Workers}.
handle_worker_result({worker_result, Result}, W, S) ->
%% ?LOG_DEBUG("worker result: ~p", [Result]),
case Result of
{solutions, Solutions} ->
{Cont, S1} = report_solutions_(Solutions, W, S),
maybe_continue(Cont, W, S1);
{solution, Nonce, Solution} ->
%% report_solution(Nonce, Solution, W, S),
{Cont, S1} = report_solutions_([{Nonce, Solution}], W, S),
maybe_continue(Cont, W, S1);
{no_solution, Nonce} ->
report_no_solution(Nonce, W, S),
maybe_restart_worker(W, S);
{error, S} ->
?LOG_DEBUG("Worker ~p reported error as normal", [W#worker.index]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Result})),
Ws = incr_worker_error(W, S#st.workers),
S#st{workers = Ws}
end;
handle_worker_result(Error, W, S) ->
?LOG_DEBUG("Got worker error from ~p: ~p", [W#worker.index, Error]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Error})),
Ws = incr_worker_error(W, S#st.workers),
S#st{workers = Ws}.
report_solutions_(Solutions, W, S) ->
case report_solutions(Solutions, W, S) of
ok ->
Ws = reset_worker(W, S#st.workers),
Ws1 = stop_workers(Ws),
{stopped, S#st{workers = Ws1}};
continue ->
{continue, S};
{error, _} ->
{error, S}
end.
reset_worker(#worker{index = I} = W, Ws) ->
W1 = reset_worker_(W),
lists:keyreplace(I, #worker.index, Ws, W1).
reset_worker_(W) ->
%% ?LOG_DEBUG("reset_worker ~p", [W#worker.index]),
W#worker{pid = undefined, mref = undefined,
nonce = undefined, cand = undefined}.
incr_worker_error(#worker{errors = Es, index = I} = W, Ws) ->
%% ?LOG_DEBUG("Increment worker (~p) error count: ~p", [I, Es+1]),
W1 = reset_worker_(W#worker{errors = Es+1}),
lists:keyreplace(I, #worker.index, Ws, W1).
maybe_continue(stopped, _, S) ->
S;
maybe_continue(continue, W, S) ->
maybe_restart_worker(W, S);
maybe_continue(error, W, S) ->
?LOG_INFO("Won't restart worker ~p due to error", [W#worker.index]),
S.
maybe_restart_worker(#worker{index = I} = W, #st{candidate = C} = S) ->
case maps:get(nonces, C) of
[] ->
%% Waiting for nonces
Ws = reset_worker(W, S#st.workers),
S#st{workers = Ws};
Ns ->
{Nonce, Ns1} = pick_nonce(W#worker.nonces, Ns),
%% ?LOG_DEBUG("restart worker ~p with nonce ~p", [I, Nonce]),
W1 = reset_worker_(W),
W2 = spawn_worker(W1#worker{nonce = Nonce}, C),
Ws = lists:keyreplace(I, #worker.index, S#st.workers, W2),
S1 = S#st{candidate = C#{nonces => Ns1}, workers = Ws},
maybe_request_nonces(S1)
end.
%% In a Gajumaru node, a typical worker config might look like this:
%% "cuckoo": {
%% "edge_bits": 29,
%% "miners": [{"executable": "mean29-avx2"},
%% {"executable": "lean29-avx2"},
%% {"executable": "lean29-avx2"},
%% {"executable": "lean29-avx2"}]
%% }
stop_workers(Workers) ->
[stop_worker(W) || W <- Workers].
stop_workers_for_seq(Seq, Workers) ->
[stop_worker(W) || #worker{cand = #{seq := Seq1}} = W <- Workers,
Seq1 =:= Seq].
stop_worker(#worker{pid = Pid, mref = MRef} = W) when is_pid(Pid) ->
exit(Pid, kill),
receive
{'EXIT', Pid, _} -> ok;
{'DOWN', MRef, process, Pid, _} -> ok
end,
W#worker{pid = undefined, mref = undefined, nonce = undefined};
stop_worker(W) ->
W.
assign_nonces(Ws, #{nonces := Ns} = C) ->
{Ws1, Nonces1} = assign_nonces_(Ws, Ns, []),
{Ws1, C#{nonces => Nonces1}}.
assign_nonces_([W | Ws], [], Acc) ->
assign_nonces_(Ws, [], [W#worker{nonce = undefined}|Acc]);
assign_nonces_([#worker{nonces = N} = W | Ws], Ns, Acc) ->
{Nonce, Ns1} = pick_nonce(N, Ns),
assign_nonces_(Ws, Ns1, [W#worker{nonce = Nonce}|Acc]);
assign_nonces_([], Ns, Acc) ->
{lists:reverse(Acc), Ns}.
-spec calc_nonces(gmhw_pow_cuckoo:config()) -> non_neg_integer().
calc_nonces(Cfg) ->
NInstances = case gmhw_pow_cuckoo:addressed_instances(Cfg) of
undefined -> 1;
L -> length(L)
end,
Repeats = gmhw_pow_cuckoo:repeats(Cfg),
Repeats * NInstances.
pick_nonce(_, [A, A]) ->
%% ?LOG_DEBUG("last nonce (~p)", [A]),
{A, []};
pick_nonce(N, [A, B]) when A < B ->
A1 = A + N,
New = if A1 > B -> [];
true -> [A1, B]
end,
%% ?LOG_DEBUG("Remanining nonces: ~p", [New]),
{A, New}.
%% Dialyzer doesn't like that the fun passed to spawn_link/1
%% doesn't have a local return (it communicates its result via the exit reason).
-dialyzer({no_return, spawn_worker/2}).
spawn_worker(#worker{nonce = undefined} = W, _) ->
W;
spawn_worker(#worker{errors = Es} = W, _) when Es >= ?MAX_ERRORS ->
?LOG_DEBUG("Won't start worker - reached max error count: ~p", [W]),
W;
spawn_worker(#worker{pid = undefined, nonce = Nonce, config = Cfg} = W, Cand) ->
Me = self(),
#{candidate := Data, target := Target, edge_bits := EdgeBits} = Cand,
Pid = spawn_link(
fun() ->
Cfg1 = gmhw_pow_cuckoo:set_edge_bits(EdgeBits, Cfg),
init_worker(Data, Nonce, Target, Cfg1, Me)
end),
MRef = erlang:monitor(process, Pid),
W#worker{pid = Pid, mref = MRef, cand = Cand, nonce = Nonce}.
-spec init_worker(binary(), integer(), integer(), tuple(), pid()) -> no_return().
init_worker(Data, Nonce, Target, Config, Parent) ->
Res = gmhc_workers:generate_from_hash(Data, Target, Nonce, Config, undefined),
%% ?LOG_DEBUG("worker result: ~p", [Res]),
case Res of
{ok, Solutions} when is_list(Solutions) ->
worker_result(Parent, {solutions, Solutions});
%% {ok, {Nonce1, Solution}} ->
%% worker_result(Parent, {solution, Nonce1, Solution});
{error, no_solution} ->
%% TODO: If we are using repeats, then we might report
%% no_solution for each nonce tried.
worker_result(Parent, {no_solution, Nonce});
{error, Other} ->
?LOG_ERROR("Bad worker! {error, ~p}", [Other]),
gmhc_events:publish(error, ?ERR_EVT(#{error => cannot_start_worker,
data => {error, Other}})),
exit(Other)
end.
worker_result(Pid, Result) ->
unlink(Pid),
exit({worker_result, Result}).
decode_candidate_hash(#{candidate := C} = Cand) ->
{ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C),
Cand#{candidate := Hash}.
+37
View File
@@ -0,0 +1,37 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ChildSpecs = [ worker(gmhc_server)
, worker(gmhc_handler)
, supervisor(gmhc_connectors_sup) ],
SupFlags = #{ strategy => one_for_one
, intensity => 1
, period => 5
, auto_shutdown => never },
{ok, {SupFlags, ChildSpecs}}.
worker (Mod) -> child(Mod, worker).
supervisor(Mod) -> child(Mod, supervisor).
child(Mod, Type) ->
#{ id => Mod
, type => Type
, start => {Mod, start_link, []}
, restart => permanent
, shutdown => 5000
, significant => false
, modules => [Mod] }.
+74
View File
@@ -0,0 +1,74 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2025, QPQ AG
%%% @copyright (C) 2017, Aeternity Anstalt
%%%
%%% @doc Hive worker configuration logic, based on aec_mining.erl
%%% in the Gajumaru platform (https://git.qpq.swiss/QPQ-AG/gajumaru)
%%% @end
%%%-------------------------------------------------------------------
-module(gmhc_workers).
-export([
get_worker_configs/0
, generate_from_hash/5
]).
-include_lib("kernel/include/logger.hrl").
-define(DEFAULT_EXECUTABLE_GROUP , <<"gajumine">>).
-define(DEFAULT_EXTRA_ARGS , <<>>).
-define(DEFAULT_HEX_ENCODED_HEADER , false).
-define(DEFAULT_REPEATS , 1).
-define(DEFAULT_EDGE_BITS , 29).
%%------------------------------------------------------------------------------
%% Read and parse worker configs.
%%
%% Workers defined in gajumaru.{json,yaml} user config file take precedence.
%% If there are no workers defined in the user config, sys.config cuckoo
%% workers are read. If there are neither user config nor sys.config workers
%% ?DEFAULT_CUCKOO_ENV is used as the last resort option (i.e. mean29-generic
%% without any extra args).
%%------------------------------------------------------------------------------
-spec get_worker_configs() -> [gmhw_pow_cuckoo:config()].
get_worker_configs() ->
ConfigMaps = worker_config_map(),
?LOG_DEBUG("ConfigMaps = ~p", [ConfigMaps]),
lists:foldl(
fun(Cfg, Configs) ->
[build_worker_config(Cfg) | Configs]
end, [], ConfigMaps).
-spec generate_from_hash(gmhw_pow_cuckoo:hash(), gmhw_pow:sci_target(),
gmhw_pow:nonce(), gmhw_pow_cuckoo:config(),
gmhw_pow:instance() | undefined) ->
{ok, [{gmhw_pow:nonce(), gmhw_pow_cuckoo:solution()}]} | {error, term()}.
generate_from_hash(Hash, Target, Nonce, Config, WorkerInstance) ->
gmhw_pow_cuckoo:generate_from_hash(Hash, Target, Nonce, Config, WorkerInstance, true).
%% Internal functions.
%%------------------------------------------------------------------------------
%% Config handling
%%------------------------------------------------------------------------------
build_worker_config(Config) when is_map(Config) ->
Exec = maps:get(<<"executable">>, Config),
ExecGroup = maps:get(<<"executable_group">>, Config, ?DEFAULT_EXECUTABLE_GROUP),
ExtraArgs = maps:get(<<"extra_args">>, Config, ?DEFAULT_EXTRA_ARGS),
HexEncHdr = maps:get(<<"hex_encoded_header">>, Config,
hex_encoding_default(ExecGroup, Exec)),
Repeats = maps:get(<<"repeats">>, Config, ?DEFAULT_REPEATS),
Instances = maps:get(<<"addressed_instances">>, Config, undefined),
EdgeBits = ?DEFAULT_EDGE_BITS,
gmhw_pow_cuckoo:config(Exec, ExecGroup, ExtraArgs, HexEncHdr, Repeats, EdgeBits, Instances);
build_worker_config({Exec, ExtraArgs, HexEncHdr, Repeats, Instances, ExecGroup}) ->
EdgeBits = ?DEFAULT_EDGE_BITS,
gmhw_pow_cuckoo:config(Exec, ExecGroup, ExtraArgs, HexEncHdr, Repeats, EdgeBits, Instances).
worker_config_map() ->
gmhc_config:get_config([<<"workers">>]).
hex_encoding_default(_, <<"cuda29">>) -> true;
hex_encoding_default(_, _) -> ?DEFAULT_HEX_ENCODED_HEADER.
+26
View File
@@ -0,0 +1,26 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
{application, gmhive_client,
[{description, "Gajumaru Hive Client"},
{vsn, "0.1.0"},
{registered, []},
{applications,
[
kernel
, stdlib
, sasl
, gproc
, inets
, ssl
, enoise
, gmconfig
, gmhive_protocol
, gmhive_worker
]},
{mod, {gmhc_app, []}},
{start_phases, [ {connect_to_primary, []} ]},
{env, []},
{modules, []},
{maintainers, ["QPQ IaaS AG"]},
{licensens, ["ISC"]},
{links, [{"gitea", "https://git.qpq.swiss/gmhive_client"}]}
]}.
+36
View File
@@ -0,0 +1,36 @@
-module(gmhive_client).
-export([ connect/1
, disconnect/1
, status/0
, status/1 ]).
-type connect_opts() :: gmhc_connector:connect_opts().
-type id() :: non_neg_integer().
-type status() :: connected | disconnected.
-export_type([ connect_opts/0 ]).
-spec connect(connect_opts()) -> {ok, id()} | {error, any()}.
connect(Opts) when is_map(Opts) ->
Id = gmhc_counters:add_read(connector),
case gmhc_connectors_sup:start_connector(Opts#{id => Id}) of
{ok, _Pid} ->
{ok, Id};
{error, _} = Error ->
Error
end.
%% gmhc_connector:connect(Opts).
-spec disconnect(id()) -> ok.
disconnect(Id) ->
gmhc_connector:disconnect(Id).
-spec status() -> [{id(), status()}].
status() ->
gmhc_connector:status().
-spec status(id()) -> status().
status(Id) ->
gmhc_connector:status(Id).