Add missing demonitor() calls #11

Merged
uwiger merged 4 commits from uw-demonitor into master 2025-08-27 19:28:50 +09:00
17 changed files with 42 additions and 11 deletions

View File

@ -1,6 +1,6 @@
{application,gmhive_client, {application,gmhive_client,
[{description,"Gajumaru Hive Client"}, [{description,"Gajumaru Hive Client"},
{vsn,"0.4.5"}, {vsn,"0.4.8"},
{registered,[]}, {registered,[]},
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise, {applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
gmconfig,gmhive_protocol,gmhive_worker]}, gmconfig,gmhive_protocol,gmhive_worker]},

View File

@ -10,7 +10,7 @@
{gmhive_protocol, {gmhive_protocol,
{git, "https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git", {git, "https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git",
{ref, "818ce33"}}}, {ref, "818ce33"}}},
{gmhive_worker, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_worker", {ref, "255ef59"}}}, {gmhive_worker, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_worker", {ref, "fac460714f"}}},
{gmconfig, {git, "https://git.qpq.swiss/QPQ-AG/gmconfig.git", {gmconfig, {git, "https://git.qpq.swiss/QPQ-AG/gmconfig.git",
{ref, "38620ff9e2"}}}, {ref, "38620ff9e2"}}},
{gproc, "1.0.0"}, {gproc, "1.0.0"},

View File

@ -29,7 +29,7 @@
0}, 0},
{<<"gmhive_worker">>, {<<"gmhive_worker">>,
{git,"https://git.qpq.swiss/QPQ-AG/gmhive_worker", {git,"https://git.qpq.swiss/QPQ-AG/gmhive_worker",
{ref,"255ef59ccd7f795d2d25f2d0ebcf24e3251b6f36"}}, {ref,"fac460714fc228eb0b723a3f292a44aec77f094a"}},
0}, 0},
{<<"gmserialization">>, {<<"gmserialization">>,
{git,"https://git.qpq.swiss/QPQ-AG/gmserialization.git", {git,"https://git.qpq.swiss/QPQ-AG/gmserialization.git",

View File

@ -1,5 +1,6 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_app). -module(gmhc_app).
-vsn("0.4.8").
-behaviour(application). -behaviour(application).

View File

@ -1,4 +1,5 @@
-module(gmhc_config). -module(gmhc_config).
-vsn("0.4.8").
-export([ load_config/0 -export([ load_config/0
, get_config/1 , get_config/1

View File

@ -1,4 +1,5 @@
-module(gmhc_config_schema). -module(gmhc_config_schema).
-vsn("0.4.8").
-export([ schema/0 -export([ schema/0
, to_json/0 ]). , to_json/0 ]).

View File

@ -1,4 +1,5 @@
-module(gmhc_connector). -module(gmhc_connector).
-vsn("0.4.8").
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -1,4 +1,5 @@
-module(gmhc_connectors_sup). -module(gmhc_connectors_sup).
-vsn("0.4.8").
-behavior(supervisor). -behavior(supervisor).
-export([ start_link/0 -export([ start_link/0

View File

@ -1,4 +1,5 @@
-module(gmhc_counters). -module(gmhc_counters).
-vsn("0.4.8").
-export([ initialize/0 ]). -export([ initialize/0 ]).

View File

@ -1,4 +1,5 @@
-module(gmhc_eureka). -module(gmhc_eureka).
-vsn("0.4.8").
-export([get_pool_address/0]). -export([get_pool_address/0]).

View File

@ -1,4 +1,5 @@
-module(gmhc_events). -module(gmhc_events).
-vsn("0.4.8").
-export([subscribe/1, -export([subscribe/1,
ensure_subscribed/1, ensure_subscribed/1,

View File

@ -1,4 +1,5 @@
-module(gmhc_handler). -module(gmhc_handler).
-vsn("0.4.8").
-behavior(gen_server). -behavior(gen_server).
-export([ start_link/0 -export([ start_link/0
@ -129,12 +130,15 @@ call_connector(Req0) ->
gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}), gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}),
receive receive
{from_pool, #{reply := #{ id := Id, result := Result }}} -> {from_pool, #{reply := #{ id := Id, result := Result }}} ->
erlang:demonitor(MRef),
Result; Result;
{from_pool, #{error := #{ id := Id } = Error}} -> {from_pool, #{error := #{ id := Id } = Error}} ->
erlang:demonitor(MRef),
{error, maps:remove(id, Error)}; {error, maps:remove(id, Error)};
{'DOWN', MRef, _, _, _} -> {'DOWN', MRef, _, _, _} ->
{error, no_connection} {error, no_connection}
after 5000 -> after 5000 ->
erlang:demonitor(MRef),
{error, {timeout, process_info(self(), messages)}} {error, {timeout, process_info(self(), messages)}}
end end
end. end.

View File

@ -1,4 +1,5 @@
-module(gmhc_server). -module(gmhc_server).
-vsn("0.4.8").
-behaviour(gen_server). -behaviour(gen_server).
@ -44,7 +45,7 @@
-define(CONNECTED(S), map_size(S#st.connected) > 0). -define(CONNECTED(S), map_size(S#st.connected) > 0).
-define(MAX_ERRORS, 5). -define(MAX_ERRORS, 50).
connected(Id, Type) -> connected(Id, Type) ->
gen_server:call(?MODULE, {connected, Id, Type}). gen_server:call(?MODULE, {connected, Id, Type}).
@ -120,7 +121,7 @@ handle_cast({from_pool, #{via := Connector,
catch catch
Cat:Err:St -> Cat:Err:St ->
?LOG_ERROR("CAUGHT ~p:~p / ~p", [Cat, Err, St]), ?LOG_ERROR("CAUGHT ~p:~p / ~p", [Cat, Err, St]),
{noreply, S} {noreply, S}
end; end;
handle_cast({disconnected, Id}, #st{connected = Conn} = S) -> handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
?LOG_DEBUG("disconnected: ~p", [Id]), ?LOG_DEBUG("disconnected: ~p", [Id]),
@ -154,11 +155,23 @@ handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error, gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
data => Reason})), data => Reason})),
Ws1 = incr_worker_error(W, Workers), Ws1 = incr_worker_error(W, Workers),
erlang:start_timer(100, self(), check_workers),
{noreply, S#st{workers = Ws1}}; {noreply, S#st{workers = Ws1}};
false -> false ->
%% ?LOG_DEBUG("EXIT apparently not from worker?? (~p)", [Pid]), %% ?LOG_DEBUG("EXIT apparently not from worker?? (~p)", [Pid]),
{noreply, S} {noreply, S}
end; end;
handle_info({timeout, _, check_workers}, #st{workers = Workers} = S) ->
case [W || #worker{cand = undefined} = W <- Workers] of
[] ->
{noreply, S};
Idle ->
S1 = maybe_request_nonces(S),
S2 = lists:foldl(fun(W, Sx) ->
maybe_restart_worker(W, Sx)
end, S1, Idle),
{noreply, S2}
end;
handle_info(Msg, St) -> handle_info(Msg, St) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]), ?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, St}. {noreply, St}.
@ -226,14 +239,14 @@ handle_worker_result({worker_result, Result}, W, S) ->
case Result of case Result of
{solutions, Solutions} -> {solutions, Solutions} ->
{Cont, S1} = report_solutions_(Solutions, W, S), {Cont, S1} = report_solutions_(Solutions, W, S),
maybe_continue(Cont, W, S1); maybe_continue(Cont, reset_errors(W), S1);
{solution, Nonce, Solution} -> {solution, Nonce, Solution} ->
%% report_solution(Nonce, Solution, W, S), %% report_solution(Nonce, Solution, W, S),
{Cont, S1} = report_solutions_([{Nonce, Solution}], W, S), {Cont, S1} = report_solutions_([{Nonce, Solution}], W, S),
maybe_continue(Cont, W, S1); maybe_continue(Cont, reset_errors(W), S1);
{no_solution, Nonce} -> {no_solution, Nonce} ->
report_no_solution(Nonce, W, S), report_no_solution(Nonce, W, S),
maybe_restart_worker(W, S); maybe_restart_worker(reset_errors(W), S);
{error, S} -> {error, S} ->
?LOG_DEBUG("Worker ~p reported error as normal", [W#worker.index]), ?LOG_DEBUG("Worker ~p reported error as normal", [W#worker.index]),
gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error, gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error,
@ -261,6 +274,9 @@ report_solutions_(Solutions, W, S) ->
{error, S} {error, S}
end. end.
reset_errors(#worker{} = W) ->
W#worker{errors = 0}.
reset_worker(#worker{index = I} = W, Ws) -> reset_worker(#worker{index = I} = W, Ws) ->
W1 = reset_worker_(W), W1 = reset_worker_(W),
lists:keyreplace(I, #worker.index, Ws, W1). lists:keyreplace(I, #worker.index, Ws, W1).

View File

@ -1,5 +1,6 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- %% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_sup). -module(gmhc_sup).
-vsn("0.4.8").
-behaviour(supervisor). -behaviour(supervisor).

View File

@ -8,6 +8,7 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(gmhc_workers). -module(gmhc_workers).
-vsn("0.4.8").
-export([ -export([
get_worker_configs/0 get_worker_configs/0

View File

@ -1,4 +1,5 @@
-module(gmhive_client). -module(gmhive_client).
-vsn("0.4.8").
-export([ connect/1 -export([ connect/1
, disconnect/1 , disconnect/1

View File

@ -4,9 +4,9 @@
{prefix,"gmhc"}. {prefix,"gmhc"}.
{author,"Ulf Wiger, QPQ AG"}. {author,"Ulf Wiger, QPQ AG"}.
{desc,"Gajumaru Hive Client"}. {desc,"Gajumaru Hive Client"}.
{package_id,{"uwiger","gmhive_client",{0,4,5}}}. {package_id,{"uwiger","gmhive_client",{0,4,8}}}.
{deps,[{"uwiger","gmcuckoo",{1,2,3}}, {deps,[{"uwiger","gmhive_worker",{0,4,0}},
{"uwiger","gmhive_worker",{0,3,0}}, {"uwiger","gmcuckoo",{1,2,3}},
{"otpr","eblake2",{1,0,1}}, {"otpr","eblake2",{1,0,1}},
{"otpr","base58",{0,1,1}}, {"otpr","base58",{0,1,1}},
{"otpr","gmserialization",{0,1,3}}, {"otpr","gmserialization",{0,1,3}},