From cb0d8f3689c97a84158bcd1c23581fc41a1f4633 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Sun, 12 Oct 2025 19:58:22 +0200 Subject: [PATCH] Account-specific cache file, bugfixes, better error info add gmhc_lib.erl --- ebin/gmhive_client.app | 2 +- rebar.config | 2 +- rebar.lock | 2 +- src/gmhc_connector.erl | 33 ++++++++++++++++++++++++++------- src/gmhc_eureka.erl | 13 +++++++------ src/gmhc_lib.erl | 8 ++++++++ src/gmhc_server.erl | 6 +----- zomp.meta | 8 ++++---- 8 files changed, 49 insertions(+), 25 deletions(-) create mode 100644 src/gmhc_lib.erl diff --git a/ebin/gmhive_client.app b/ebin/gmhive_client.app index 18ef1ab..4af5114 100644 --- a/ebin/gmhive_client.app +++ b/ebin/gmhive_client.app @@ -1,6 +1,6 @@ {application,gmhive_client, [{description,"Gajumaru Hive Client"}, - {vsn,"0.7.0"}, + {vsn,"0.8.0"}, {registered,[]}, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, gmconfig,gmhive_protocol,gmhive_worker]}, diff --git a/rebar.config b/rebar.config index cbfeeeb..0e2aa69 100644 --- a/rebar.config +++ b/rebar.config @@ -11,7 +11,7 @@ {enoise, {git, "https://git.qpq.swiss/QPQ-AG/enoise.git", {ref, "029292817e"}}}, {gmhive_protocol, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git", - {ref, "818ce33"}}}, + {ref, "8d4652a"}}}, {gmhive_worker, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_worker", {ref, "cabd104114"}}}, {gmconfig, {git, "https://git.qpq.swiss/QPQ-AG/gmconfig.git", {ref, "38620ff9e2"}}}, diff --git a/rebar.lock b/rebar.lock index bd78cfe..50980b1 100644 --- a/rebar.lock +++ b/rebar.lock @@ -25,7 +25,7 @@ 1}, {<<"gmhive_protocol">>, {git,"https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git", - {ref,"818ce33cc1dec74c020515be48fb548a5207befd"}}, + {ref,"8d4652a79a2ad8f51e1fe560c15c698d4c92485c"}}, 0}, {<<"gmhive_worker">>, {git,"https://git.qpq.swiss/QPQ-AG/gmhive_worker", diff --git a/src/gmhc_connector.erl b/src/gmhc_connector.erl index d111671..904bd7b 100644 --- a/src/gmhc_connector.erl +++ b/src/gmhc_connector.erl @@ -57,6 +57,7 @@ , auto_connect = true :: boolean() , econn , connected = false :: boolean() + , reconnect = true :: boolean() , reconnect_timer :: timer_ref() | 'undefined' , recache_timer :: reference() | 'undefined' , awaiting_connect = [] :: list() @@ -143,6 +144,9 @@ init(#{id := Id} = Opts) when is_map(Opts) -> {ok, S} -> ?LOG_DEBUG("Initial connect succeeded", []), S; + {error, rejected} -> + ?LOG_WARNING("Connection rejected; will not retry", []), + S0#st{econn = undefined, reconnect = false}; {error, _} = Error -> ?LOG_WARNING("Could not connect to core server: ~p", [Error]), start_reconnect_timer(S0#st{econn = undefined}) @@ -181,6 +185,7 @@ handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) -> end, S1 = start_reconnect_timer( S#st{ auto_connect = true + , reconnect = true , awaiting_connect = Waiters1 }, Opts), {noreply, S1}; false -> @@ -276,6 +281,8 @@ code_change(_FromVsn, S, _Extra) -> try_connect(Opts, S) -> try try_connect_(Opts, S) catch + error:rejected:_ -> + {error, rejected}; error:E:T -> ?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]), {error, E} @@ -345,6 +352,8 @@ default_tcp_opts() -> enoise_opts() -> [{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}]. +start_reconnect_timer(#st{reconnect = false} = S) -> + S; start_reconnect_timer(#st{} = S) -> start_reconnect_timer(S, #{}). @@ -356,12 +365,18 @@ start_reconnect_timer(#st{} = S, Opts) -> false -> ?LOG_DEBUG("starting reconnect timer ...", []), TRef = start_timer(1000, Opts), - S#st{reconnect_timer = {TRef, 10, 1000, Opts}} + S#st{reconnect_timer = {TRef, 10, 8000 + gmhc_lib:rand(3000), Opts}} end. +restart_reconnect_timer(#st{reconnect = false} = S) -> + cancel_reconnect_timer(S); restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) -> - gmhc_eureka:invalidate_cache(), - NewT = max(T * 2, ?MAX_RETRY_INTERVAL), + NewT = max(T + 5000 + gmhc_lib:rand(1000), ?MAX_RETRY_INTERVAL), + if (NewT > T andalso NewT =:= ?MAX_RETRY_INTERVAL) -> + gmhc_eureka:invalidate_cache(); + true -> + ok + end, TRef = start_timer(NewT, Opts), S#st{reconnect_timer = {TRef, 10, NewT, Opts}}; restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) -> @@ -452,10 +467,11 @@ protocol_connect(Opts, #st{econn = EConn} = S) -> , pool_id => PoolId , type => Type }, notify_connected(S1#st{protocol = P, version = V, opts = Opts1}); - #{error := #{message := Msg}} -> - ?LOG_ERROR("Connect error: ~s", [Msg]), + #{error := #{code := _, message := ErrMsg}} = ErrReply -> + ?LOG_ERROR("Connect error: ~s", [ErrMsg]), + disconnected(S#st.id, ErrReply, S), gmhc_eureka:invalidate_cache(), - error(protocol_connect) + error(rejected) end after 10000 -> gmhc_eureka:invalidate_cache(), @@ -476,7 +492,10 @@ connected(Id, Type, S) when Type==worker; Type==monitor -> S#st{connected = true}. disconnected(Id, S) -> - gmhc_events:publish(disconnected, #{id => Id}), + disconnected(Id, #{}, S). + +disconnected(Id, Msg, S) -> + gmhc_events:publish(disconnected, Msg#{id => Id}), gmhc_server:disconnected(Id), S#st{connected = false}. diff --git a/src/gmhc_eureka.erl b/src/gmhc_eureka.erl index b263b09..a9ee860 100644 --- a/src/gmhc_eureka.erl +++ b/src/gmhc_eureka.erl @@ -63,7 +63,7 @@ cache_good_address(#{host := Addr, invalidate_cache() -> CacheF = cache_filename(), - case file:delete_file(CacheF) of + case file:delete(CacheF) of ok -> ?LOG_DEBUG("Eureka cache file removed (~p)", [CacheF]), ok; @@ -73,17 +73,18 @@ invalidate_cache() -> end. cache_filename() -> - filename:join(setup:data_dir(), "gmhc_eureka.cache"). + <<"ak_", PKShort:8/binary, _/binary>> = gmhc_config:get_config([<<"pubkey">>]), + filename:join(setup:data_dir(), "gmhc_eureka." ++ binary_to_list(PKShort) ++ ".cache"). get_pool_address_() -> case gmconfig:find_config([<<"pool_admin">>, <<"url">>], [user_config]) of {ok, URL0} -> case expand_url(URL0) of <<"local">> -> - #{host => <<"127.0.0.1">>, - port => gmconfig:get_config( - [<<"pool">>, <<"port">>], [schema_default]), - pool_id => gmhc_config:get_config([<<"pool">>, <<"id">>]) }; + {ok, #{host => <<"127.0.0.1">>, + port => gmconfig:get_config( + [<<"pool">>, <<"port">>], [schema_default]), + pool_id => gmhc_config:get_config([<<"pool">>, <<"id">>]) }}; URL -> ?LOG_INFO("Trying to connect to ~p", [URL]), connect1(URL) diff --git a/src/gmhc_lib.erl b/src/gmhc_lib.erl new file mode 100644 index 0000000..48ecf92 --- /dev/null +++ b/src/gmhc_lib.erl @@ -0,0 +1,8 @@ +-module(gmhc_lib). + +-export([ rand/1 ]). + + +rand(Range) -> + <> = crypto:strong_rand_bytes(4), + Range - (Rx rem Range). diff --git a/src/gmhc_server.erl b/src/gmhc_server.erl index 67ccece..a9efc33 100644 --- a/src/gmhc_server.erl +++ b/src/gmhc_server.erl @@ -243,11 +243,7 @@ nonces_result({error, Reason}, Seq0, S) -> S#st{workers = Workers}. retry_timeout(Floor, Range) -> - Floor + rand(Range). - -rand(Range) -> - <> = crypto:strong_rand_bytes(4), - Range - (Rx rem Range). + Floor + gmhc_lib:rand(Range). handle_worker_result({worker_result, Result}, W, S) -> %% ?LOG_DEBUG("worker result: ~p", [Result]), diff --git a/zomp.meta b/zomp.meta index 962e204..5a7c7d4 100644 --- a/zomp.meta +++ b/zomp.meta @@ -2,15 +2,15 @@ {type,app}. {modules,[]}. {prefix,"gmhc"}. -{author,"Ulf Wiger, QPQ AG"}. {desc,"Gajumaru Hive Client"}. -{package_id,{"uwiger","gmhive_client",{0,7,0}}}. -{deps,[{"uwiger","gmhive_worker",{0,5,1}}, +{author,"Ulf Wiger, QPQ AG"}. +{package_id,{"uwiger","gmhive_client",{0,8,0}}}. +{deps,[{"uwiger","gmhive_protocol",{0,2,0}}, + {"uwiger","gmhive_worker",{0,5,1}}, {"uwiger","gmcuckoo",{1,2,4}}, {"otpr","eblake2",{1,0,1}}, {"otpr","base58",{0,1,1}}, {"otpr","gmserialization",{0,1,3}}, - {"uwiger","gmhive_protocol",{0,1,1}}, {"uwiger","setup",{2,2,4}}, {"uwiger","gproc",{1,0,1}}, {"uwiger","gmconfig",{0,1,2}},