From 5eff27367d5277a46c517fab0e73faf0673d82fd Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Thu, 9 Oct 2025 20:17:27 +0200 Subject: [PATCH] Cache connection info --- .gitignore | 2 +- Makefile | 4 +- ebin/gmhive_client.app | 2 +- gmhc_config-testnet-local.eterm | 1 + gmhc_config.eterm | 2 +- priv/gmhc_schema.json | 135 ++++++++++++++++++++++++++++++++ src/gmhc_connector.erl | 66 +++++++++++----- src/gmhc_eureka.erl | 85 ++++++++++++++++++-- src/gmhc_server.erl | 31 +++++--- zomp.meta | 4 +- 10 files changed, 289 insertions(+), 43 deletions(-) create mode 100644 priv/gmhc_schema.json diff --git a/.gitignore b/.gitignore index 844107a..5fb29c9 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,7 @@ VERSION *.aes~ *.config~ *.args~ -data/mnesia +data/* _checkouts*/ tmp/ rebar3.crashdump diff --git a/Makefile b/Makefile index f7f9a5e..0df1e96 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ -BUILD="_build/default/lib/gmhive_client" +BUILD=_build/default/lib/gmhive_client SCHEMA_OUT=$(BUILD)/priv/gmhc_schema.json schema: ERL_LIBS=_build/default/lib \ erl -run gmhc_config_schema export $(SCHEMA_OUT) -run init stop - scripts/update_readme_json.sh README.md $(SCHEMA_OUT) + scripts/update_readme_json.sh "README.md" "$(SCHEMA_OUT)" diff --git a/ebin/gmhive_client.app b/ebin/gmhive_client.app index ffa31b4..18ef1ab 100644 --- a/ebin/gmhive_client.app +++ b/ebin/gmhive_client.app @@ -1,6 +1,6 @@ {application,gmhive_client, [{description,"Gajumaru Hive Client"}, - {vsn,"0.6.3"}, + {vsn,"0.7.0"}, {registered,[]}, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, gmconfig,gmhive_protocol,gmhive_worker]}, diff --git a/gmhc_config-testnet-local.eterm b/gmhc_config-testnet-local.eterm index 849f904..289d1f0 100644 --- a/gmhc_config-testnet-local.eterm +++ b/gmhc_config-testnet-local.eterm @@ -1,4 +1,5 @@ #{ pubkey => <<"ak_zjyvDmMbXafMADYrxDs4uMiYM5zEVJBqWgv619NUQ37Gkwt7z">> + , report => <<"progress">> , pool_admin => #{url => <<"local">>} , pool => #{id => <<"ct_26xqeE3YKmZV8jrks57JSgZRCHSuG4RGzpnvdz6AAiSSTVbJRM">>, host => <<"127.0.0.1">>} diff --git a/gmhc_config.eterm b/gmhc_config.eterm index aaff7ae..5c3ae7d 100644 --- a/gmhc_config.eterm +++ b/gmhc_config.eterm @@ -1,5 +1,5 @@ #{ pubkey => <<"ak_2KAcA2Pp1nrR8Wkt3FtCkReGzAi8vJ9Snxa4PcmrthVx8AhPe8">> , pool => #{id => <<"ct_LRbi65kmLtE7YMkG6mvG5TxAXTsPJDZjAtsPuaXtRyPA7gnfJ">>} - , mining => + , workers => [#{executable => <<"mean15-generic">>}] }. diff --git a/priv/gmhc_schema.json b/priv/gmhc_schema.json new file mode 100644 index 0000000..6c0371a --- /dev/null +++ b/priv/gmhc_schema.json @@ -0,0 +1,135 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "additionalProperties": false, + "properties": { + "extra_pubkeys": { + "default": [], + "description": "Additional worker pubkeys, sharing rewards", + "items": { + "pattern": "^ak_[1-9A-HJ-NP-Za-km-z]*$", + "type": "string" + }, + "type": "array" + }, + "network": { + "default": "mainnet", + "type": "string" + }, + "pool": { + "additionalProperties": false, + "properties": { + "host": { + "default": "127.0.0.1", + "description": "Hostname of hive server", + "example": "0.0.0.0", + "type": "string" + }, + "id": { + "description": "Pool contract id", + "pattern": "^ct_[1-9A-HJ-NP-Za-km-z]*$", + "type": "string" + }, + "port": { + "default": 17888, + "description": "Hive server listen port", + "minimum": 1, + "type": "integer" + } + }, + "type": "object" + }, + "pool_admin": { + "additionalProperties": false, + "properties": { + "default_per_network": { + "additionalProperties": false, + "properties": { + "mainnet": { + "default": "https://gajumining.com/api/workers/{CLIENT_ID}", + "type": "string" + }, + "testnet": { + "default": "https://test.gajumining.com/api/workers/{CLIENT_ID}", + "type": "string" + } + }, + "type": "object" + }, + "url": { + "default": "https://test.gajumining.com/api/workers/{CLIENT_ID}", + "description": "URL of Eureka worker api", + "type": "string" + } + }, + "type": "object" + }, + "pubkey": { + "description": "Primary client pubkey", + "pattern": "^ak_[1-9A-HJ-NP-Za-km-z]*$", + "type": "string" + }, + "report": { + "default": "silent", + "description": "Progress reporting", + "enum": [ + "debug", + "progress", + "silent" + ], + "type": "string" + }, + "type": { + "default": "worker", + "description": "monitor mode can be used to see if a pool is alive", + "enum": [ + "worker", + "monitor" + ], + "type": "string" + }, + "workers": { + "default": [ + { "executable": "mean29-generic" } + ], + "description": "Definitions of workers' configurations. If no worker are configured one worker is used as default, i.e. 'mean29-generic' executable without any extra args.", + "items": { + "additionalProperties": false, + "properties": { + "executable": { + "default": "mean29-generic", + "description": "Executable binary of the worker. Can be a fully qualified path, \nbut the software may apply default logic to locate a plain basename.", + "type": "string" + }, + "extra_args": { + "default": "", + "description": "Extra arguments to pass to the worker executable binary. The safest choice is specifying no arguments i.e. empty string.", + "type": "string" + }, + "hex_encoded_header": { + "default": false, + "description": "Hexadecimal encode the header argument that is send to the worker executable. CUDA executables expect hex encoded header.", + "type": "boolean" + }, + "instances": { + "description": "Instances used by the worker in case of Multi-GPU mining. Numbers on the configuration list represent GPU devices that are to be addressed by the worker.", + "example": [0,1,2,3], + "items": { "type": "integer" }, + "minItems": 1, + "type": "array" + }, + "repeats": { + "default": 1, + "description": "Number of tries to do in each worker context - WARNING: it should be set so the worker process runs for 3-5s or else the node risk missing out on new micro blocks.", + "type": "integer" + } + }, + "required": [ + "executable" + ], + "type": "object" + }, + "type": "array" + } + }, + "type": "object" +} diff --git a/src/gmhc_connector.erl b/src/gmhc_connector.erl index c59f2cc..d111671 100644 --- a/src/gmhc_connector.erl +++ b/src/gmhc_connector.erl @@ -56,7 +56,9 @@ id :: non_neg_integer() , auto_connect = true :: boolean() , econn + , connected = false :: boolean() , reconnect_timer :: timer_ref() | 'undefined' + , recache_timer :: reference() | 'undefined' , awaiting_connect = [] :: list() , protocol :: binary() | 'undefined' , version :: binary() | 'undefined' @@ -239,12 +241,22 @@ handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _, end; handle_info({tcp_closed, _Port}, #st{} = S) -> ?LOG_DEBUG("got tcp_closed", []), - disconnected(S#st.id), - S1 = case S#st.auto_connect of - true -> start_reconnect_timer(S#st{econn = undefined}); - false -> S#st{econn = undefined} + S1 = disconnected(S#st.id, S), + S2 = case S1#st.auto_connect of + true -> start_reconnect_timer(S1#st{econn = undefined}); + false -> S1#st{econn = undefined} end, - {noreply, S1}; + {noreply, S2}; +handle_info({timeout, _, recache}, #st{opts = Opts, econn = EConn} = S0) -> + S = S0#st{recache_timer = undefined}, + case (EConn =/= undefined) andalso S#st.connected of + false -> + {noreply, S}; + true -> + ?LOG_NOTICE("Recaching eureka info", []), + cache_eureka_info(Opts), + {noreply, ensure_recache_timer(S)} + end; handle_info(Msg, S) -> ?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]), {noreply, S}. @@ -286,12 +298,12 @@ try_connect_(Opts0, S) -> eureka_get_host_port() -> case gmhc_eureka:get_pool_address() of - #{<<"address">> := Host, - <<"port">> := Port, - <<"pool_id">> := PoolId} -> - #{host => binary_to_list(Host), + {ok, #{host := Host, + port := Port, + pool_id := PoolId}} -> + #{host => unicode:characters_to_list(Host), port => Port, - pool_id => binary_to_list(PoolId)}; + pool_id => unicode:characters_to_list(PoolId)}; {error, _} = Error -> Error end. @@ -348,6 +360,7 @@ start_reconnect_timer(#st{} = S, Opts) -> end. restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) -> + gmhc_eureka:invalidate_cache(), NewT = max(T * 2, ?MAX_RETRY_INTERVAL), TRef = start_timer(NewT, Opts), S#st{reconnect_timer = {TRef, 10, NewT, Opts}}; @@ -373,11 +386,24 @@ notify_deadline(D, #st{awaiting_connect = Waiters} = S) -> end, [], Waiters), S#st{awaiting_connect = Waiters1}. -notify_connected(#st{id = Id, awaiting_connect = Waiters} = S) -> +cache_eureka_info(Opts) -> + gmhc_eureka:cache_good_address(maps:with([host, port, pool_id], Opts)). + +notify_connected(#st{id = Id, awaiting_connect = Waiters, opts = Opts} = S) -> gmhc_events:publish(connected, #{id => Id}), [gen_server:reply(From, ok) || {From, _} <- Waiters], gmhc_handler:pool_connected(S#st.id, S#st.opts), - S#st{awaiting_connect = []}. + cache_eureka_info(Opts), + ensure_recache_timer(S#st{awaiting_connect = []}). + +ensure_recache_timer(#st{recache_timer = T} = S) -> + case T of + undefined -> + TRef = erlang:start_timer(1*60*1000, self(), recache), + S#st{recache_timer = TRef}; + _ when is_reference(T) -> + S + end. cancel_reconnect_timer(#st{reconnect_timer = T} = S) -> case T of @@ -420,17 +446,19 @@ protocol_connect(Opts, #st{econn = EConn} = S) -> , result := #{connect_ack := #{ protocol := P , version := V }} }} -> - connected(S#st.id, Type), + S1 = connected(S#st.id, Type, S), Opts1 = Opts#{ pubkey => Pubkey , extra => Extra , pool_id => PoolId , type => Type }, - notify_connected(S#st{protocol = P, version = V, opts = Opts1}); + notify_connected(S1#st{protocol = P, version = V, opts = Opts1}); #{error := #{message := Msg}} -> ?LOG_ERROR("Connect error: ~s", [Msg]), + gmhc_eureka:invalidate_cache(), error(protocol_connect) end after 10000 -> + gmhc_eureka:invalidate_cache(), error(protocol_connect_timeout) end. @@ -443,12 +471,14 @@ to_atom(A) when is_atom(A) -> A; to_atom(S) -> binary_to_existing_atom(iolist_to_binary(S), utf8). -connected(Id, Type) when Type==worker; Type==monitor -> - gmhc_server:connected(Id, Type). +connected(Id, Type, S) when Type==worker; Type==monitor -> + gmhc_server:connected(Id, Type), + S#st{connected = true}. -disconnected(Id) -> +disconnected(Id, S) -> gmhc_events:publish(disconnected, #{id => Id}), - gmhc_server:disconnected(Id). + gmhc_server:disconnected(Id), + S#st{connected = false}. opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) -> Bool; diff --git a/src/gmhc_eureka.erl b/src/gmhc_eureka.erl index 14b9e05..b263b09 100644 --- a/src/gmhc_eureka.erl +++ b/src/gmhc_eureka.erl @@ -2,19 +2,88 @@ -vsn("0.6.1"). -export([get_pool_address/0]). +-export([cache_good_address/1, + invalidate_cache/0]). -include_lib("kernel/include/logger.hrl"). -include("gmhc_events.hrl"). get_pool_address() -> + case cached_address() of + {ok, _} = Ok -> Ok; + {error, _} -> + get_pool_address_() + end. + +cached_address() -> + CacheF = cache_filename(), + ?LOG_DEBUG("Eureka cache filename: ~p", [CacheF]), + case file:read_file(CacheF) of + {ok, Bin} -> + NowTS = erlang:system_time(seconds), + OldestTS = NowTS - 24*60*60, + try binary_to_term(Bin) of + #{ ts := TS + , host := _ + , port := _ + , pool_id := _} = Map -> + if TS >= OldestTS -> + Result = maps:remove(ts, Map), + ?LOG_DEBUG("Cached eureka info: ~p", [Result]), + {ok, Result}; + true -> + {error, outdated} + end; + Other -> + {error, {invalid_cache_term, Other}} + catch + error:E -> + {error, {invalid_cache_data, E}} + end; + {error, _} = Err -> + Err + end. + +cache_good_address(#{host := Addr, + port := Port, + pool_id := PoolId}) -> + CacheF = cache_filename(), + ToCache = #{ host => unicode:characters_to_binary(Addr) + , port => Port + , pool_id => unicode:characters_to_binary(PoolId) + , ts => erlang:system_time(seconds)}, + case file:write_file(CacheF, term_to_binary(ToCache)) of + ok -> + ?LOG_DEBUG("Cached eureka info in: ~p", [CacheF]), + {ok, ToCache}; + {error, _} = Err -> + ?LOG_DEBUG("Couldn't cache eureka in ~p: ~p", [CacheF, Err]), + Err + end. + +invalidate_cache() -> + CacheF = cache_filename(), + case file:delete_file(CacheF) of + ok -> + ?LOG_DEBUG("Eureka cache file removed (~p)", [CacheF]), + ok; + {error, _} = Err -> + ?LOG_DEBUG("Couldn't remove Eureka cache (~p): ~p", [CacheF, Err]), + Err + end. + +cache_filename() -> + filename:join(setup:data_dir(), "gmhc_eureka.cache"). + +get_pool_address_() -> case gmconfig:find_config([<<"pool_admin">>, <<"url">>], [user_config]) of {ok, URL0} -> case expand_url(URL0) of <<"local">> -> - #{<<"address">> => <<"127.0.0.1">>, - <<"port">> => gmconfig:get_config( - [<<"pool">>, <<"port">>], [schema_default]), - <<"pool_id">> => gmhc_config:get_config([<<"pool">>, <<"id">>]) }; + #{host => <<"127.0.0.1">>, + port => gmconfig:get_config( + [<<"pool">>, <<"port">>], [schema_default]), + pool_id => gmhc_config:get_config([<<"pool">>, <<"id">>]) }; URL -> ?LOG_INFO("Trying to connect to ~p", [URL]), connect1(URL) @@ -49,9 +118,13 @@ connect1(URL0) -> Error end. -get_host_port(Data) -> +get_host_port(#{ <<"address">> := Addr + , <<"port">> := Port + , <<"pool_id">> := PoolId } = Data) -> ?LOG_DEBUG("Data = ~p", [Data]), - maps:with([<<"address">>, <<"port">>, <<"pool_id">>], Data). + {ok, #{ host => Addr + , port => Port + , pool_id => PoolId }}. request(get, URL) -> case request(get, URL, []) of diff --git a/src/gmhc_server.erl b/src/gmhc_server.erl index a3856ed..67ccece 100644 --- a/src/gmhc_server.erl +++ b/src/gmhc_server.erl @@ -162,16 +162,11 @@ handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers {noreply, S} end; handle_info({timeout, _, check_workers}, #st{workers = Workers} = S) -> - case [W || #worker{cand = undefined} = W <- Workers] of - [] -> - {noreply, S}; - Idle -> - S1 = maybe_request_nonces(S), - S2 = lists:foldl(fun(W, Sx) -> - maybe_restart_worker(W, Sx) - end, S1, Idle), - {noreply, S2} - end; + S1 = maybe_request_nonces(S), + S2 = lists:foldl(fun(W, Sx) -> + maybe_restart_worker(W, Sx) + end, S1, Workers), + {noreply, S2}; handle_info(Msg, St) -> ?LOG_DEBUG("Unknown msg: ~p", [Msg]), {noreply, St}. @@ -216,7 +211,6 @@ maybe_request_nonces(#st{ candidate = #{via := Via, seq := Seq, nonces := Nonces , nonces = N} = S) when ?CONNECTED(S) -> case Nonces == [] of true -> - %% ?LOG_DEBUG("Request more nonces, Seq = ~p, N = ~p", [Seq, N]), Res = gmhc_handler:call(#{via => Via, get_nonces => #{ seq => Seq , n => N }}), @@ -230,7 +224,6 @@ maybe_request_nonces(S) -> nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) -> case Seq == Seq0 of true -> - %% ?LOG_DEBUG("Got nonces = ~p", [Nonces]), #st{candidate = Cand} = S, S#st{candidate = Cand#{nonces => Nonces}}; false -> @@ -240,8 +233,22 @@ nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) -> nonces_result({error, Reason}, Seq0, S) -> ?LOG_DEBUG("Got error on nonce request: ~p", [Reason]), Workers = stop_workers_for_seq(Seq0, S#st.workers), + case Reason of + {timeout, _} -> + Timeout = retry_timeout(1000, 3000), + erlang:start_timer(Timeout, self(), check_workers); + _ -> + ok + end, S#st{workers = Workers}. +retry_timeout(Floor, Range) -> + Floor + rand(Range). + +rand(Range) -> + <> = crypto:strong_rand_bytes(4), + Range - (Rx rem Range). + handle_worker_result({worker_result, Result}, W, S) -> %% ?LOG_DEBUG("worker result: ~p", [Result]), case Result of diff --git a/zomp.meta b/zomp.meta index e467016..962e204 100644 --- a/zomp.meta +++ b/zomp.meta @@ -2,9 +2,9 @@ {type,app}. {modules,[]}. {prefix,"gmhc"}. -{desc,"Gajumaru Hive Client"}. {author,"Ulf Wiger, QPQ AG"}. -{package_id,{"uwiger","gmhive_client",{0,6,3}}}. +{desc,"Gajumaru Hive Client"}. +{package_id,{"uwiger","gmhive_client",{0,7,0}}}. {deps,[{"uwiger","gmhive_worker",{0,5,1}}, {"uwiger","gmcuckoo",{1,2,4}}, {"otpr","eblake2",{1,0,1}},