Compare commits

...

21 Commits

Author SHA1 Message Date
e85df7c27c Merge pull request 'Add client name to connect msg, more descriptive 'disconnected' event' (#20) from uw-restart-logic into master
Reviewed-on: #20
2025-11-19 21:40:00 +09:00
Ulf Wiger
2078ae7115 More descriptive disconnected event, client info 2025-11-19 08:46:05 +01:00
Ulf Wiger
8e795da00d Raise timeouts and restart allowances 2025-10-25 09:32:43 +02:00
Ulf Wiger
bfccda2c3f Protect against accidental multiple concurrent connections 2025-10-24 20:30:41 +02:00
0e4b6d7873 Merge pull request 'Fix restart logic and add watchdog' (#18) from uw-watchdog into master
Reviewed-on: #18
2025-10-24 15:42:50 +09:00
Ulf Wiger
02eaf1ee47 Fix bug in 'unwatch()' function 2025-10-24 07:14:16 +02:00
Ulf Wiger
1f6066705c Fix restart logic and add watchdog 2025-10-23 22:22:59 +02:00
283e0274e0 Merge pull request 'Cache connection info for faster and more robust reconnect' (#17) from uw-cache-conn into master
Reviewed-on: #17
Reviewed-by: Craig Everett <craigeverett@qpq.swiss>
2025-10-16 00:20:01 +09:00
Ulf Wiger
847ffc810a Fix cache_dir() problem, made it configurable 2025-10-15 12:41:00 +02:00
Ulf Wiger
1e60f35dd3 Handle case where connection meta-data are faulty 2025-10-14 13:02:38 +02:00
Ulf Wiger
570f31ab3c Network specific cache filename + more sanity checks 2025-10-13 13:57:34 +02:00
Ulf Wiger
cb0d8f3689 Account-specific cache file, bugfixes, better error info
add gmhc_lib.erl
2025-10-12 19:59:29 +02:00
Ulf Wiger
5eff27367d Cache connection info 2025-10-09 20:17:27 +02:00
Ulf Wiger
968f9d92f2 Update README.md with latest json schema (automated) 2025-10-08 15:39:48 +02:00
34f3c93aaa Merge pull request 'Start tty logger if headless, check config for duplicate ids' (#15) from uw-check-for-dups into master
Reviewed-on: #15
2025-10-01 18:33:30 +09:00
Ulf Wiger
c37ee1c3af Start tty logger if headless, check config for duplicate ids 2025-09-30 19:55:12 +02:00
408bd9fc18 Merge pull request 'Report no_solution for all relevant nonces with repeats' (#14) from uw-repeats-reporting into master
Reviewed-on: #14
2025-09-27 23:27:56 +09:00
Ulf Wiger
2a33d06bd6 Report no_solution for all relevant nonces with repeats 2025-09-25 23:25:00 +02:00
8cb2c76614 Merge pull request 'Configurable progress reporting' (#13) from uw-progress-reporting into master
Reviewed-on: #13
Reviewed-by: Jarvis Carroll <jarviscarrol@qpq.swiss>
Reviewed-by: Craig Everett <craigeverett@qpq.swiss>
2025-09-24 14:44:09 +09:00
Ulf Wiger
07b658d509 Configurable progress reporting 2025-09-23 16:44:46 +02:00
8a68244c90 Merge pull request 'Update gmhive_worker and gmcuckoo deps, miner returns in debug' (#12) from uw-miner-returns into master
Reviewed-on: #12
2025-09-23 21:46:32 +09:00
27 changed files with 954 additions and 120 deletions

2
.gitignore vendored
View File

@ -19,7 +19,7 @@ VERSION
*.aes~
*.config~
*.args~
data/mnesia
data/*
_checkouts*/
tmp/
rebar3.crashdump

10
Makefile Normal file
View File

@ -0,0 +1,10 @@
BUILD=_build/default/lib/gmhive_client
SCHEMA_OUT=$(BUILD)/priv/gmhc_schema.json
schema:
ERL_LIBS=_build/default/lib \
erl -run gmhc_config_schema export $(SCHEMA_OUT) -run init stop
scripts/update_readme_json.sh "README.md" "$(SCHEMA_OUT)"

View File

@ -82,6 +82,20 @@ only does this inform the client of the network for which the client should
mine, but it also changes what URL the client will use to find the hive
server. (For `"testnet"` this will be `"test.gajumining.com"`.)
### Caching of connection data
The client retrieves the Hive Server connection info from gajumining.com.
This information is account-specific, and may change over time.
To speed up the reconnecting process, should the Hive Server restart,
this connection information is cached locally for up to 24 hours.
The normal place for the cached data would be
`$ZOMP_DIR//var/uwiger/gmhive_client/Vsn/setup.data/mainnet/gmhc_eureka.XXXXXXXX.cache`,
where `Vsn` is the current version of `gmhive_client`, and `XXXXXXXX` is a fragment
of the current account ID. The data location can be changed with the command-line
option `-setup data_dir Dir`.
## JSON Schema
```json
@ -89,6 +103,10 @@ server. (For `"testnet"` this will be `"test.gajumining.com"`.)
"$schema": "http://json-schema.org/draft-04/schema#",
"additionalProperties": false,
"properties": {
"cache_dir": {
"description": "Location of cache, default is 'setup:data_dir()'",
"type": "string"
},
"extra_pubkeys": {
"default": [],
"description": "Additional worker pubkeys, sharing rewards",
@ -155,6 +173,16 @@ server. (For `"testnet"` this will be `"test.gajumining.com"`.)
"pattern": "^ak_[1-9A-HJ-NP-Za-km-z]*$",
"type": "string"
},
"report": {
"default": "silent",
"description": "Progress reporting",
"enum": [
"debug",
"progress",
"silent"
],
"type": "string"
},
"type": {
"default": "worker",
"description": "monitor mode can be used to see if a pool is alive",
@ -174,7 +202,7 @@ server. (For `"testnet"` this will be `"test.gajumining.com"`.)
"properties": {
"executable": {
"default": "mean29-generic",
"description": "Executable binary of the worker. Can be a fully qualified path, but the software may apply default logic to locate a plain basename.",
"description": "Executable binary of the worker. Can be a fully qualified path, \nbut the software may apply default logic to locate a plain basename.",
"type": "string"
},
"extra_args": {

View File

@ -1,6 +1,6 @@
{application,gmhive_client,
[{description,"Gajumaru Hive Client"},
{vsn,"0.5.1"},
{vsn,"0.10.0"},
{registered,[]},
{applications,[kernel,stdlib,sasl,gproc,inets,ssl,enoise,
gmconfig,gmhive_protocol,gmhive_worker]},

View File

@ -1,4 +1,5 @@
#{ pubkey => <<"ak_zjyvDmMbXafMADYrxDs4uMiYM5zEVJBqWgv619NUQ37Gkwt7z">>
, report => <<"progress">>
, pool_admin => #{url => <<"local">>}
, pool => #{id => <<"ct_26xqeE3YKmZV8jrks57JSgZRCHSuG4RGzpnvdz6AAiSSTVbJRM">>,
host => <<"127.0.0.1">>}

View File

@ -1,5 +1,5 @@
#{ pubkey => <<"ak_2KAcA2Pp1nrR8Wkt3FtCkReGzAi8vJ9Snxa4PcmrthVx8AhPe8">>
, pool => #{id => <<"ct_LRbi65kmLtE7YMkG6mvG5TxAXTsPJDZjAtsPuaXtRyPA7gnfJ">>}
, mining =>
, workers =>
[#{executable => <<"mean15-generic">>}]
}.

139
priv/gmhc_schema.json Normal file
View File

@ -0,0 +1,139 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"additionalProperties": false,
"properties": {
"cache_dir": {
"description": "Location of cache, default is 'setup:data_dir()'",
"type": "string"
},
"extra_pubkeys": {
"default": [],
"description": "Additional worker pubkeys, sharing rewards",
"items": {
"pattern": "^ak_[1-9A-HJ-NP-Za-km-z]*$",
"type": "string"
},
"type": "array"
},
"network": {
"default": "mainnet",
"type": "string"
},
"pool": {
"additionalProperties": false,
"properties": {
"host": {
"default": "127.0.0.1",
"description": "Hostname of hive server",
"example": "0.0.0.0",
"type": "string"
},
"id": {
"description": "Pool contract id",
"pattern": "^ct_[1-9A-HJ-NP-Za-km-z]*$",
"type": "string"
},
"port": {
"default": 17888,
"description": "Hive server listen port",
"minimum": 1,
"type": "integer"
}
},
"type": "object"
},
"pool_admin": {
"additionalProperties": false,
"properties": {
"default_per_network": {
"additionalProperties": false,
"properties": {
"mainnet": {
"default": "https://gajumining.com/api/workers/{CLIENT_ID}",
"type": "string"
},
"testnet": {
"default": "https://test.gajumining.com/api/workers/{CLIENT_ID}",
"type": "string"
}
},
"type": "object"
},
"url": {
"default": "https://test.gajumining.com/api/workers/{CLIENT_ID}",
"description": "URL of Eureka worker api",
"type": "string"
}
},
"type": "object"
},
"pubkey": {
"description": "Primary client pubkey",
"pattern": "^ak_[1-9A-HJ-NP-Za-km-z]*$",
"type": "string"
},
"report": {
"default": "silent",
"description": "Progress reporting",
"enum": [
"debug",
"progress",
"silent"
],
"type": "string"
},
"type": {
"default": "worker",
"description": "monitor mode can be used to see if a pool is alive",
"enum": [
"worker",
"monitor"
],
"type": "string"
},
"workers": {
"default": [
{ "executable": "mean29-generic" }
],
"description": "Definitions of workers' configurations. If no worker are configured one worker is used as default, i.e. 'mean29-generic' executable without any extra args.",
"items": {
"additionalProperties": false,
"properties": {
"executable": {
"default": "mean29-generic",
"description": "Executable binary of the worker. Can be a fully qualified path, \nbut the software may apply default logic to locate a plain basename.",
"type": "string"
},
"extra_args": {
"default": "",
"description": "Extra arguments to pass to the worker executable binary. The safest choice is specifying no arguments i.e. empty string.",
"type": "string"
},
"hex_encoded_header": {
"default": false,
"description": "Hexadecimal encode the header argument that is send to the worker executable. CUDA executables expect hex encoded header.",
"type": "boolean"
},
"instances": {
"description": "Instances used by the worker in case of Multi-GPU mining. Numbers on the configuration list represent GPU devices that are to be addressed by the worker.",
"example": [0,1,2,3],
"items": { "type": "integer" },
"minItems": 1,
"type": "array"
},
"repeats": {
"default": 1,
"description": "Number of tries to do in each worker context - WARNING: it should be set so the worker process runs for 3-5s or else the node risk missing out on new micro blocks.",
"type": "integer"
}
},
"required": [
"executable"
],
"type": "object"
},
"type": "array"
}
},
"type": "object"
}

View File

@ -5,11 +5,13 @@
{erl_opts, [debug_info]}.
{plugins, [rebar3_hex]}.
{post_hooks, [{compile, "make schema"}]}.
{deps, [
{enoise, {git, "https://git.qpq.swiss/QPQ-AG/enoise.git", {ref, "029292817e"}}},
{gmhive_protocol,
{git, "https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git",
{ref, "818ce33"}}},
{ref, "8d4652a"}}},
{gmhive_worker, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_worker", {ref, "cabd104114"}}},
{gmconfig, {git, "https://git.qpq.swiss/QPQ-AG/gmconfig.git",
{ref, "38620ff9e2"}}},

View File

@ -25,7 +25,7 @@
1},
{<<"gmhive_protocol">>,
{git,"https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git",
{ref,"818ce33cc1dec74c020515be48fb548a5207befd"}},
{ref,"8d4652a79a2ad8f51e1fe560c15c698d4c92485c"}},
0},
{<<"gmhive_worker">>,
{git,"https://git.qpq.swiss/QPQ-AG/gmhive_worker",

51
scripts/update_readme_json.sh Executable file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
set -euo pipefail
README="$1"
SCHEMA="$2"
if [[ ! -f "$README" || ! -f "$SCHEMA" ]]; then
echo "Usage: $0 README.md schema.json"
exit 1
fi
tmpfile=$(mktemp)
awk -v schema_file="$SCHEMA" '
BEGIN {
in_json_block = 0
in_schema_section = 0
}
/^##[[:space:]]+JSON[[:space:]]+Schema/ {
in_schema_section = 1
print
next
}
/^##[[:space:]]+/ && in_schema_section {
# Another section starts, end the schema section
in_schema_section = 0
}
in_schema_section && /^```json/ {
print "```json"
while ((getline line < schema_file) > 0) print line
close(schema_file)
in_json_block = 1
next
}
in_json_block && /^```$/ {
print "```"
in_json_block = 0
next
}
!(in_schema_section && in_json_block) {
print
}
' "$README" > "$tmpfile"
mv "$tmpfile" "$README"
echo "✅ Updated JSON schema block in $README"

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_app).
-vsn("0.4.8").
-vsn("0.8.3").
-behaviour(application).
@ -17,11 +17,11 @@
-spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}.
start(Opts) ->
application:load(gmhive_client),
{error,_} = application:stop(gmhive_client),
_ = application:stop(gmhive_client),
_ = lists:foreach(fun({K, V}) ->
application:set_env(gmhive_client, K, V)
end, Opts),
application:ensure_all_started(gmhive_client).
application:ensure_all_started(gmhive_client, permanent).
start(_StartType, _StartArgs) ->
set_things_up(),
@ -45,8 +45,31 @@ stop(_State) ->
ok.
set_things_up() ->
maybe_add_logger_handler(),
gmhc_counters:initialize(),
gmhc_config:load_config(),
logger:set_module_level([gmhw_pow_cuckoo], notice),
?LOG_DEBUG("Config: ~p", [gmconfig:user_config()]),
case gmhc_config:get_config([<<"report">>]) of
<<"debug">> ->
?LOG_NOTICE("Starting debug reporter", []),
gmhc_events:debug();
<<"progress">> ->
?LOG_NOTICE("Starting progress reporter", []),
gmhc_events:progress();
_ -> ok
end,
ok.
maybe_add_logger_handler() ->
case is_headless() orelse application:get_env(gmhive_client, tty_logger, false) of
true ->
Level = application:get_env(gmhive_client, tty_logger_level, error),
io:fwrite("Adding logger handler: ~p~n", [Level]),
logger:add_handler(gmhc_tty, logger_std_h, #{level => Level});
false ->
ok
end.
is_headless() ->
undefined == application:get_key(gajumine, vsn).

View File

@ -1,5 +1,5 @@
-module(gmhc_config).
-vsn("0.4.8").
-vsn("0.8.3").
-export([ load_config/0
, get_config/1
@ -14,8 +14,20 @@ load_config() ->
gmconfig:apply_os_env(),
gmconfig:process_plain_args(),
check_application_env(),
check_final_config(),
ok.
check_final_config() ->
ExtraPubkeys = get_config([<<"extra_pubkeys">>]),
AllKeys = [get_config([<<"pubkey">>]) | ExtraPubkeys],
case AllKeys -- lists:usort(AllKeys) of
[] ->
ok;
Duplicates ->
?LOG_ERROR("Duplicate account ids found: ~p", [Duplicates]),
error({duplicate_account_ids, Duplicates})
end.
instrument_gmconfig() ->
gmconfig:set_gmconfig_env(gmconfig_env()).

View File

@ -1,8 +1,9 @@
-module(gmhc_config_schema).
-vsn("0.4.8").
-vsn("0.8.3").
-export([ schema/0
, to_json/0 ]).
, to_json/0
, export/1 ]).
-import(gmconfig_schema_helpers,
[ str/1
@ -22,6 +23,17 @@
to_json() ->
json:encode(schema()).
export(ToFile) ->
case file:open(ToFile, [write]) of
{ok, Fd} ->
try ok = io:put_chars(Fd, json:format(schema(), #{indent => 4}))
after
file:close(Fd)
end;
{error, _} = Error ->
Error
end.
schema() ->
obj(schema_init(),
#{
@ -39,6 +51,10 @@ schema() ->
, pool => pool()
, pool_admin => pool_admin()
, workers => workers()
, cache_dir => str(#{description => <<"Location of cache, default is 'setup:data_dir()'">>})
, report => str(#{ enum => [<<"debug">>, <<"progress">>, <<"silent">>]
, default => <<"silent">>
, description => <<"Progress reporting">> })
}).
pool() ->

View File

@ -1,5 +1,5 @@
-module(gmhc_connector).
-vsn("0.4.8").
-vsn("0.8.3").
-behaviour(gen_server).
@ -56,7 +56,11 @@
id :: non_neg_integer()
, auto_connect = true :: boolean()
, econn
, connected = false :: boolean()
, status = disconnected :: disconnected | connecting | connected
, reconnect = true :: boolean()
, reconnect_timer :: timer_ref() | 'undefined'
, recache_timer :: reference() | 'undefined'
, awaiting_connect = [] :: list()
, protocol :: binary() | 'undefined'
, version :: binary() | 'undefined'
@ -141,6 +145,9 @@ init(#{id := Id} = Opts) when is_map(Opts) ->
{ok, S} ->
?LOG_DEBUG("Initial connect succeeded", []),
S;
{error, rejected} ->
?LOG_WARNING("Connection rejected; will not retry", []),
S0#st{econn = undefined, reconnect = false};
{error, _} = Error ->
?LOG_WARNING("Could not connect to core server: ~p", [Error]),
start_reconnect_timer(S0#st{econn = undefined})
@ -179,6 +186,7 @@ handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) ->
end,
S1 = start_reconnect_timer(
S#st{ auto_connect = true
, reconnect = true
, awaiting_connect = Waiters1 }, Opts),
{noreply, S1};
false ->
@ -239,12 +247,21 @@ handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _,
end;
handle_info({tcp_closed, _Port}, #st{} = S) ->
?LOG_DEBUG("got tcp_closed", []),
disconnected(S#st.id),
S1 = case S#st.auto_connect of
true -> start_reconnect_timer(S#st{econn = undefined});
false -> S#st{econn = undefined}
S1 = disconnected(S#st.id, S),
S2 = case S1#st.auto_connect of
true -> start_reconnect_timer(S1#st{econn = undefined});
false -> S1#st{econn = undefined}
end,
{noreply, S1};
{noreply, S2};
handle_info({timeout, _, recache}, #st{opts = Opts, econn = EConn} = S0) ->
S = S0#st{recache_timer = undefined},
case (EConn =/= undefined) andalso S#st.connected of
false ->
{noreply, S};
true ->
cache_eureka_info(Opts),
{noreply, ensure_recache_timer(S)}
end;
handle_info(Msg, S) ->
?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]),
{noreply, S}.
@ -264,6 +281,8 @@ code_change(_FromVsn, S, _Extra) ->
try_connect(Opts, S) ->
try try_connect_(Opts, S)
catch
error:rejected:_ ->
{error, rejected};
error:E:T ->
?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]),
{error, E}
@ -277,8 +296,9 @@ try_connect_(Opts0, S) ->
case try_noise_connect(maps:merge(Opts0, PoolOpts)) of
{ok, EConn, Opts1} ->
S1 = protocol_connect(Opts1, S#st{ econn = EConn
, status = connecting
, reconnect_timer = undefined }),
{ok, S1};
{ok, S1#st{status = connected}};
{error, _} = Error ->
Error
end
@ -286,12 +306,12 @@ try_connect_(Opts0, S) ->
eureka_get_host_port() ->
case gmhc_eureka:get_pool_address() of
#{<<"address">> := Host,
<<"port">> := Port,
<<"pool_id">> := PoolId} ->
#{host => binary_to_list(Host),
{ok, #{host := Host,
port := Port,
pool_id := PoolId}} ->
#{host => unicode:characters_to_list(Host),
port => Port,
pool_id => binary_to_list(PoolId)};
pool_id => unicode:characters_to_list(PoolId)};
{error, _} = Error ->
Error
end.
@ -333,6 +353,8 @@ default_tcp_opts() ->
enoise_opts() ->
[{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}].
start_reconnect_timer(#st{reconnect = false} = S) ->
S;
start_reconnect_timer(#st{} = S) ->
start_reconnect_timer(S, #{}).
@ -344,11 +366,18 @@ start_reconnect_timer(#st{} = S, Opts) ->
false ->
?LOG_DEBUG("starting reconnect timer ...", []),
TRef = start_timer(1000, Opts),
S#st{reconnect_timer = {TRef, 10, 1000, Opts}}
S#st{reconnect_timer = {TRef, 10, 8000 + gmhc_lib:rand(3000), Opts}}
end.
restart_reconnect_timer(#st{reconnect = false} = S) ->
cancel_reconnect_timer(S);
restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) ->
NewT = max(T * 2, ?MAX_RETRY_INTERVAL),
NewT = max(T + 5000 + gmhc_lib:rand(1000), ?MAX_RETRY_INTERVAL),
if (NewT > T andalso NewT =:= ?MAX_RETRY_INTERVAL) ->
gmhc_eureka:invalidate_cache();
true ->
ok
end,
TRef = start_timer(NewT, Opts),
S#st{reconnect_timer = {TRef, 10, NewT, Opts}};
restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) ->
@ -373,11 +402,25 @@ notify_deadline(D, #st{awaiting_connect = Waiters} = S) ->
end, [], Waiters),
S#st{awaiting_connect = Waiters1}.
notify_connected(#st{id = Id, awaiting_connect = Waiters} = S) ->
cache_eureka_info(Opts) ->
gmhc_eureka:cache_good_address(maps:with([host, port, pool_id], Opts)).
notify_connected(#st{id = Id, awaiting_connect = Waiters, opts = Opts} = S) ->
gmhc_events:publish(connected, #{id => Id}),
[gen_server:reply(From, ok) || {From, _} <- Waiters],
gmhc_handler:pool_connected(S#st.id, S#st.opts),
S#st{awaiting_connect = []}.
cache_eureka_info(Opts),
gmhc_connectors_sup:add_restart_info(Id, Opts),
ensure_recache_timer(S#st{awaiting_connect = []}).
ensure_recache_timer(#st{recache_timer = T} = S) ->
case T of
undefined ->
TRef = erlang:start_timer(1*60*1000, self(), recache),
S#st{recache_timer = TRef};
_ when is_reference(T) ->
S
end.
cancel_reconnect_timer(#st{reconnect_timer = T} = S) ->
case T of
@ -401,17 +444,32 @@ protocol_connect(Opts, #st{econn = EConn} = S) ->
Type = to_atom(opt(type, Opts, [<<"type">>])),
RId = erlang:unique_integer(),
Vsns = gmhp_msgs:versions(),
Client = client_name(),
Protocols = gmhp_msgs:protocols(hd(Vsns)),
ConnectReq = #{ protocols => Protocols
, versions => Vsns
, pool_id => PoolId
, pubkey => Pubkey
, extra_pubkeys => Extra
, client => Client
, type => Type
, nonces => gmhc_server:total_nonces()
, signature => ""},
?LOG_DEBUG("ConnectReq = ~p", [ConnectReq]),
Msg = gmhp_msgs:encode_connect(ConnectReq, RId),
try gmhp_msgs:encode_connect(ConnectReq, RId) of
Msg ->
send_connect(EConn, RId, Msg, ConnectReq, Opts, S)
catch error:Error ->
ErrMsg = unicode:characters_to_binary(io_lib:fwrite("~p", [Error])),
disconnected(S#st.id, #{error =>
#{code => gmhp_msgs:error_code(invalid_input),
message => ErrMsg}}, S)
end.
send_connect(EConn, RId, Msg, #{pubkey := Pubkey,
extra_pubkeys := Extra,
pool_id := PoolId,
type := Type}, Opts, S) ->
enoise:send(EConn, Msg),
receive
{noise, EConn, Data} ->
@ -420,20 +478,41 @@ protocol_connect(Opts, #st{econn = EConn} = S) ->
, result := #{connect_ack := #{ protocol := P
, version := V }}
}} ->
connected(S#st.id, Type),
S1 = connected(S#st.id, Type, S),
Opts1 = Opts#{ pubkey => Pubkey
, extra => Extra
, pool_id => PoolId
, type => Type },
notify_connected(S#st{protocol = P, version = V, opts = Opts1});
#{error := #{message := Msg}} ->
?LOG_ERROR("Connect error: ~s", [Msg]),
error(protocol_connect)
notify_connected(S1#st{protocol = P, version = V, opts = Opts1});
#{error := #{code := _, message := ErrMsg}} = ErrReply ->
?LOG_ERROR("Connect error: ~s", [ErrMsg]),
%% TODO: fix the flow so that we send one disconnected event,
%% and set the reconnect in the right place. For now, stuff
%% the `reconnect = false`.
disconnected(S#st.id, ErrReply, S#st{reconnect = false}),
gmhc_eureka:invalidate_cache(),
error(rejected)
end
after 10000 ->
gmhc_eureka:invalidate_cache(),
error(protocol_connect_timeout)
end.
client_name() ->
MyStr = app_string(gmhive_client),
case app_string(gajumine) of
<<>> -> MyStr;
GMStr -> <<MyStr/binary,",",GMStr/binary>>
end.
app_string(App) ->
case application:get_key(App, vsn) of
undefined ->
<<>>;
{ok, Vsn} ->
unicode:characters_to_binary([atom_to_binary(App),"-",Vsn])
end.
to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8);
to_bin(S) ->
@ -443,12 +522,18 @@ to_atom(A) when is_atom(A) -> A;
to_atom(S) ->
binary_to_existing_atom(iolist_to_binary(S), utf8).
connected(Id, Type) when Type==worker; Type==monitor ->
gmhc_server:connected(Id, Type).
connected(Id, Type, S) when Type==worker; Type==monitor ->
gmhc_server:connected(Id, Type),
S#st{connected = true}.
disconnected(Id) ->
gmhc_events:publish(disconnected, #{id => Id}),
gmhc_server:disconnected(Id).
disconnected(Id, S) ->
disconnected(Id, #{}, S).
disconnected(_, _, #st{status = disconnected} = S) -> S;
disconnected(Id, Msg, #st{reconnect = Bool} = S) ->
gmhc_events:publish(disconnected, Msg#{id => Id, reconnecting => Bool}),
gmhc_server:disconnected(Id),
S#st{connected = false, status = disconnected}.
opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) ->
Bool;

View File

@ -1,15 +1,37 @@
-module(gmhc_connectors_sup).
-vsn("0.4.8").
-vsn("0.8.3").
-behavior(supervisor).
-export([ start_link/0
, init/1 ]).
-export([ start_first_connector/0
, start_connector/1]).
, start_connector/1
, add_restart_info/2 ]).
-export([ sort_restart_info/1 ]).
-include_lib("kernel/include/logger.hrl").
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
case supervisor:start_link({local, ?MODULE}, ?MODULE, []) of
{ok, _} = Ok ->
RI0 = gmhc_watchdog:get_restart_info(),
?LOG_ERROR("Restart info: ~p", [RI0]),
RI = sort_restart_info(RI0),
RemoveIds = maps:keys(maps:without(maps:keys(RI), RI0)),
gmhc_watchdog:remove_restart_info(RemoveIds),
?LOG_ERROR("Sorted restart info: ~p", [RI]),
maps:foreach(fun restart_connector/2, RI),
Ok;
Other ->
Other
end.
restart_connector({?MODULE, _}, Opts) ->
start_connector(Opts);
restart_connector(_, _) ->
ok.
start_first_connector() ->
start_connector(#{}).
@ -21,11 +43,14 @@ start_connector(Opts0) ->
end,
supervisor:start_child(?MODULE, [Opts]).
add_restart_info(Id, Opts) ->
gmhc_watchdog:note_started({?MODULE, Id}, Opts).
init([]) ->
Mod = gmhc_connector,
SupFlags = #{ strategy => simple_one_for_one
, intensity => 3
, period => 10 },
, intensity => 5
, period => 60 },
ChildSpecs = [ #{ id => Mod
, start => {Mod, start_link, []}
, type => worker
@ -33,3 +58,13 @@ init([]) ->
, shutdown => 5000
, modules => [Mod] } ],
{ok, {SupFlags, ChildSpecs}}.
sort_restart_info(RI) ->
L = lists:sort(
[{Id, {T,H,P}, I}
|| {{?MODULE,_} = Id, #{type := T, host := H, port := P} = I} <- maps:to_list(RI)]),
ConnTypes = [CT || {_, CT, _} <- lists:ukeysort(2, L)],
lists:foldl(fun(CT, Acc) ->
{Id, _, I} = lists:keyfind(CT, 2, L),
Acc#{Id => I}
end, #{}, ConnTypes).

View File

@ -1,5 +1,5 @@
-module(gmhc_counters).
-vsn("0.4.8").
-vsn("0.8.3").
-export([ initialize/0 ]).

View File

@ -1,20 +1,136 @@
-module(gmhc_eureka).
-vsn("0.4.8").
-vsn("0.8.3").
-export([get_pool_address/0]).
-export([cache_good_address/1,
invalidate_cache/0,
cached_address/0,
cache_filename/0,
cache_dir/0]).
-include_lib("kernel/include/logger.hrl").
-include("gmhc_events.hrl").
get_pool_address() ->
case cached_address() of
{ok, _} = Ok -> Ok;
{error, _} ->
get_pool_address_()
end.
cached_address() ->
I0 = cache_info(),
CacheF = cache_filename(I0),
?LOG_DEBUG("Eureka cache filename: ~p", [CacheF]),
case file:read_file(CacheF) of
{ok, Bin} ->
NowTS = erlang:system_time(seconds),
OldestTS = NowTS - 24*60*60,
try binary_to_term(Bin) of
#{ ts := TS
, network := N
, pubkey := PK
, host := _
, port := _
, pool_id := _} = Map when N == map_get(network, I0),
PK == map_get(pubkey, I0) ->
if TS >= OldestTS ->
Result = maps:remove(ts, Map),
?LOG_DEBUG("Cached eureka info: ~p", [Result]),
{ok, Result};
true ->
{error, outdated}
end;
Other ->
{error, {invalid_cache_term, Other}}
catch
error:E ->
{error, {invalid_cache_data, E}}
end;
{error, _} = Err ->
Err
end.
cache_good_address(#{host := _,
port := _,
pool_id := _} = I0) ->
CacheInfo = cache_info(I0),
CacheF = cache_filename(CacheInfo),
ToCache = CacheInfo#{ts => erlang:system_time(seconds)},
case filelib:ensure_dir(CacheF) of
ok ->
case file:write_file(CacheF, term_to_binary(ToCache)) of
ok ->
{ok, ToCache};
{error, _} = Err ->
?LOG_DEBUG("Couldn't cache eureka in ~p: ~p", [CacheF, Err]),
Err
end;
{error, _} = Err ->
?LOG_ERROR("Cannot save cached info to ~s", [CacheF]),
Err
end.
invalidate_cache() ->
CacheF = cache_filename(),
case file:delete(CacheF) of
ok ->
?LOG_DEBUG("Eureka cache file removed (~p)", [CacheF]),
ok;
{error, _} = Err ->
?LOG_DEBUG("Couldn't remove Eureka cache (~p): ~p", [CacheF, Err]),
Err
end.
cache_info(#{ host := Addr
, port := Port
, pool_id := PoolId }) ->
I0 = cache_info(),
I0#{ host => unicode:characters_to_binary(Addr)
, port => Port
, pool_id => unicode:characters_to_binary(PoolId)}.
cache_info() ->
Pubkey = gmhc_config:get_config([<<"pubkey">>]),
Network = gmhc_config:get_config([<<"network">>]),
#{ pubkey => Pubkey
, network => Network }.
cache_filename() ->
cache_filename(cache_info()).
cache_filename(#{network := Network, pubkey := Pubkey}) ->
Path = filename:join(cache_dir(), Network),
<<"ak_", PKShort:8/binary, _/binary>> = Pubkey,
filename:join(Path, "gmhc_eureka." ++ binary_to_list(PKShort) ++ ".cache").
cache_dir() ->
case gmconfig:find_config([<<"cache_dir">>]) of
{ok, D} ->
D;
undefined ->
case setup_zomp:is_zomp_context() of
true ->
cache_dir_zomp();
false ->
filename:join(setup:data_dir(), "gmhive.cache")
end
end.
cache_dir_zomp() ->
#{package_id := {Realm, App, _}} = zx_daemon:meta(),
filename:join(zx_lib:ppath(var, {Realm, App}), "gmhive.cache").
get_pool_address_() ->
case gmconfig:find_config([<<"pool_admin">>, <<"url">>], [user_config]) of
{ok, URL0} ->
case expand_url(URL0) of
<<"local">> ->
#{<<"address">> => <<"127.0.0.1">>,
<<"port">> => gmconfig:get_config(
[<<"pool">>, <<"port">>], [schema_default]),
<<"pool_id">> => gmhc_config:get_config([<<"pool">>, <<"id">>]) };
{ok, #{host => <<"127.0.0.1">>,
port => gmconfig:get_config(
[<<"pool">>, <<"port">>], [schema_default]),
pool_id => gmhc_config:get_config([<<"pool">>, <<"id">>]) }};
URL ->
?LOG_INFO("Trying to connect to ~p", [URL]),
connect1(URL)
@ -49,9 +165,13 @@ connect1(URL0) ->
Error
end.
get_host_port(Data) ->
get_host_port(#{ <<"address">> := Addr
, <<"port">> := Port
, <<"pool_id">> := PoolId } = Data) ->
?LOG_DEBUG("Data = ~p", [Data]),
maps:with([<<"address">>, <<"port">>, <<"pool_id">>], Data).
{ok, #{ host => Addr
, port => Port
, pool_id => PoolId }}.
request(get, URL) ->
case request(get, URL, []) of

View File

@ -1,5 +1,5 @@
-module(gmhc_events).
-vsn("0.4.8").
-vsn("0.8.3").
-export([subscribe/1,
ensure_subscribed/1,
@ -7,7 +7,17 @@
ensure_unsubscribed/1,
publish/2]).
-export([debug/0]).
-export([debug/0,
progress/0,
stop/0]).
-export([rpt_debug/2,
rpt_progress/2]).
%% internal
-export([init_reporter/2]).
-include_lib("kernel/include/logger.hrl").
-export_type([event/0]).
@ -60,23 +70,100 @@ ensure_unsubscribed(Event) ->
debug() ->
ok = application:ensure_started(gproc),
spawn(fun() ->
subscribe(pool_notification),
subscribe({pool_notification, new_generation}),
subscribe(connected),
subscribe(puzzle),
subscribe(result),
subscribe(error),
subscribe(disconnected),
gmhive_worker:subscribe_returns(),
loop()
end).
spawn_reporter(fun() ->
sub(),
gmhive_worker:subscribe_returns(),
loop(fun rpt_debug/2, false)
end).
loop() ->
progress() ->
ok = application:ensure_started(gproc),
spawn_reporter(fun() ->
sub(),
loop(fun rpt_progress/2, true)
end).
spawn_reporter(F) ->
Parent = self(),
proc_lib:start_link(?MODULE, init_reporter, [F, Parent]).
init_reporter(F, Parent) ->
try_register_reporter(),
proc_lib:init_ack(Parent, self()),
F().
stop() ->
case whereis(gmhc_reporter) of
undefined ->
not_running;
Pid ->
exit(Pid, kill)
end.
sub() ->
subscribe(pool_notification),
subscribe({pool_notification, new_generation}),
subscribe(connected),
subscribe(puzzle),
subscribe(result),
subscribe(error),
subscribe(disconnected).
loop(F, Ts) ->
receive
stop -> ok;
{gproc_ps_event, E, Data} ->
io:fwrite("EVENT ~p: ~p~n", [E, Data]),
loop()
maybe_print(F(E, Data), Ts),
loop(F, Ts)
end.
try_register_reporter() ->
try register(gmhc_reporter, self())
catch
error:_ ->
?LOG_ERROR("Reporter already running. Try gmhc_events:stop().", []),
error(already_running)
end.
maybe_print([], _) ->
ok;
maybe_print(String, Ts) when is_boolean(Ts) ->
TSstr = [[ts(), " "] || Ts],
io:put_chars([TSstr, String, "\n"]).
ts() ->
calendar:system_time_to_rfc3339(erlang:system_time(millisecond),
[{unit, millisecond}, {offset, "Z"}]).
rpt_debug(E, Data) ->
io_lib:fwrite("EVENT ~p: ~p", [E, Data]).
rpt_progress(puzzle, #{info := {_Data, _Target, Nonce, _Config}}) ->
w("Trying nonce: ~p", [Nonce]);
rpt_progress(result, #{info := Info}) ->
case Info of
{error, no_solution} ->
[];
{ok, Cycles} ->
w("Found! Reporting ~w cycles to leader.", [length(Cycles)]);
Other ->
w("Unexpected 'result': ~tp", [Other])
end;
rpt_progress(pool_notification, #{info := #{msg := Msg}}) ->
case Msg of
#{solution_accepted := #{seq := Seq}} ->
w("The hive has produced a solution! Sequence: ~w", [Seq]);
#{new_generation := _} -> [];
#{candidate := _} -> [];
Other ->
w("Unexpected 'pool_notification': ~tp", [Other])
end;
rpt_progress(connected, _) ->
w("Connected!", []);
rpt_progress(disconnected, _) ->
w("Disconnected!", []);
rpt_progress(_, _) ->
[].
w(Fmt, Args) ->
io_lib:fwrite(Fmt, Args).

View File

@ -1,5 +1,5 @@
-module(gmhc_handler).
-vsn("0.4.8").
-vsn("0.8.3").
-behavior(gen_server).
-export([ start_link/0
@ -12,6 +12,7 @@
]).
-export([ call/1
, async_call/1
, notify/1
, pool_connected/2
, from_pool/1 ]).
@ -27,7 +28,7 @@
-record(st, {pools = [], opts = #{}}).
-define(CALL_TIMEOUT, 5000).
-define(CALL_TIMEOUT, 10_000).
-include_lib("kernel/include/logger.hrl").
@ -38,6 +39,13 @@ call(Req) ->
{error, Reason}
end.
async_call(Req) ->
try gen_server:call(?MODULE, {async_call, Req}, ?CALL_TIMEOUT)
catch
exit:Reason ->
{error, Reason}
end.
notify(Msg) ->
gen_server:cast(?MODULE, {notify, Msg}).
@ -56,6 +64,8 @@ init([]) ->
handle_call({call, Req}, _From, #st{} = S) ->
{reply, call_connector(Req), S};
handle_call({async_call, Req}, _From, #st{} = S) ->
{reply, call_connector(Req, false), S};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_method}, S}.
@ -119,27 +129,39 @@ maybe_publish(_) ->
maybe_via(#{via := Via}, Info) ->
Info#{via => Via}.
call_connector(Req0) ->
call_connector(Req) ->
call_connector(Req, true).
call_connector(Req0, Wait) ->
{ViaId, Req} = maps:take(via, Req0),
case gmhc_connector:whereis_id(ViaId) of
undefined ->
{error, no_connection};
Pid when is_pid(Pid) ->
Id = erlang:unique_integer(),
MRef = erlang:monitor(process, Pid),
MRef = case Wait of
true -> erlang:monitor(process, Pid);
false -> none
end,
gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}),
receive
{from_pool, #{reply := #{ id := Id, result := Result }}} ->
erlang:demonitor(MRef),
Result;
{from_pool, #{error := #{ id := Id } = Error}} ->
erlang:demonitor(MRef),
{error, maps:remove(id, Error)};
{'DOWN', MRef, _, _, _} ->
{error, no_connection}
after 5000 ->
erlang:demonitor(MRef),
{error, {timeout, process_info(self(), messages)}}
case Wait of
true ->
receive
{from_pool, #{reply := #{ id := Id
, result := Result }}} ->
erlang:demonitor(MRef),
Result;
{from_pool, #{error := #{ id := Id } = Error}} ->
erlang:demonitor(MRef),
{error, maps:remove(id, Error)};
{'DOWN', MRef, _, _, _} ->
{error, no_connection}
after 5000 ->
erlang:demonitor(MRef),
{error, {timeout, process_info(self(), messages)}}
end;
false ->
ok
end
end.

9
src/gmhc_lib.erl Normal file
View File

@ -0,0 +1,9 @@
-module(gmhc_lib).
-vsn("0.8.3").
-export([ rand/1 ]).
rand(Range) ->
<<Rx:32>> = crypto:strong_rand_bytes(4),
Range - (Rx rem Range).

View File

@ -1,5 +1,5 @@
-module(gmhc_server).
-vsn("0.4.8").
-vsn("0.8.3").
-behaviour(gen_server).
@ -92,7 +92,9 @@ handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) ->
monitor ->
stop_workers(S1#st.workers), % shouldn't be any running
S1#st{workers = [], working = false};
worker -> S1#st{working = true}
worker ->
gmhc_watchdog:watch(5*60_000, 1), %% 5 minutes, one-shot
S1#st{working = true}
end,
{reply, ok, S2};
handle_call(_Req, _From, S) ->
@ -126,6 +128,7 @@ handle_cast({from_pool, #{via := Connector,
handle_cast({disconnected, Id}, #st{connected = Conn} = S) ->
?LOG_DEBUG("disconnected: ~p", [Id]),
Conn1 = maps:remove(Id, Conn),
gmhc_watchdog:unwatch(),
S1 = if map_size(Conn1) == 0 ->
Ws = stop_workers(S#st.workers),
S#st{connected = Conn1, workers = Ws};
@ -162,16 +165,11 @@ handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers
{noreply, S}
end;
handle_info({timeout, _, check_workers}, #st{workers = Workers} = S) ->
case [W || #worker{cand = undefined} = W <- Workers] of
[] ->
{noreply, S};
Idle ->
S1 = maybe_request_nonces(S),
S2 = lists:foldl(fun(W, Sx) ->
maybe_restart_worker(W, Sx)
end, S1, Idle),
{noreply, S2}
end;
S1 = maybe_request_nonces(S),
S2 = lists:foldl(fun(W, Sx) ->
maybe_restart_worker(W, Sx)
end, S1, Workers),
{noreply, S2};
handle_info(Msg, St) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, St}.
@ -184,6 +182,9 @@ code_change(_FromVsn, S, _Extra) ->
report_solutions(Solutions, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
Nonces = all_nonces(W),
[report_no_solution_(Via, Seq, N)
|| N <- Nonces, not lists:keymember(N, 1, Solutions)],
gmhc_handler:call(
#{via => Via,
solutions => #{ seq => Seq
@ -191,24 +192,28 @@ report_solutions(Solutions, W, #st{} = S) when ?CONNECTED(S) ->
, evidence => Evd }
|| {Nonce, Evd} <- Solutions] }}).
%% report_solution(Nonce, Solution, W, #st{connected = true}) ->
%% #{seq := Seq} = W#worker.cand,
%% gmhc_handler:call(#{solution => #{ seq => Seq
%% , nonce => Nonce
%% , evidence => Solution }}).
report_no_solution(Nonce, W, #st{} = S) when ?CONNECTED(S) ->
report_no_solution(_Nonce, W, #st{} = S) when ?CONNECTED(S) ->
#{via := Via, seq := Seq} = W#worker.cand,
Nonces = all_nonces(W),
%% ?LOG_DEBUG("report no_solution Seq = ~p, Nonce = ~p", [Seq, Nonce]),
gmhc_handler:call(#{via => Via,
no_solution => #{ seq => Seq
, nonce => Nonce}}).
[report_no_solution_(Via, Seq, Nonce1) || Nonce1 <- Nonces],
ok.
report_no_solution_(Via, Seq, Nonce) ->
gmhc_handler:async_call(#{via => Via,
no_solution => #{ seq => Seq
, nonce => Nonce}}).
all_nonces(#worker{nonce = Nonce, config = Config}) ->
case gmhw_pow_cuckoo:repeats(Config) of
1 -> [Nonce];
Rs -> lists:seq(Nonce, Nonce + Rs - 1)
end.
maybe_request_nonces(#st{ candidate = #{via := Via, seq := Seq, nonces := Nonces}
, nonces = N} = S) when ?CONNECTED(S) ->
case Nonces == [] of
true ->
%% ?LOG_DEBUG("Request more nonces, Seq = ~p, N = ~p", [Seq, N]),
Res = gmhc_handler:call(#{via => Via,
get_nonces => #{ seq => Seq
, n => N }}),
@ -222,7 +227,7 @@ maybe_request_nonces(S) ->
nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
case Seq == Seq0 of
true ->
%% ?LOG_DEBUG("Got nonces = ~p", [Nonces]),
wd_ping(),
#st{candidate = Cand} = S,
S#st{candidate = Cand#{nonces => Nonces}};
false ->
@ -232,10 +237,21 @@ nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) ->
nonces_result({error, Reason}, Seq0, S) ->
?LOG_DEBUG("Got error on nonce request: ~p", [Reason]),
Workers = stop_workers_for_seq(Seq0, S#st.workers),
case Reason of
{timeout, _} ->
Timeout = retry_timeout(1000, 3000),
erlang:start_timer(Timeout, self(), check_workers);
_ ->
ok
end,
S#st{workers = Workers}.
retry_timeout(Floor, Range) ->
Floor + gmhc_lib:rand(Range).
handle_worker_result({worker_result, Result}, W, S) ->
%% ?LOG_DEBUG("worker result: ~p", [Result]),
wd_ping(),
case Result of
{solutions, Solutions} ->
{Cont, S1} = report_solutions_(Solutions, W, S),
@ -427,3 +443,6 @@ worker_result(Pid, Result) ->
decode_candidate_hash(#{candidate := C} = Cand) ->
{ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C),
Cand#{candidate := Hash}.
wd_ping() ->
gmhc_watchdog:ping().

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*-
-module(gmhc_sup).
-vsn("0.4.8").
-vsn("0.8.3").
-behaviour(supervisor).
@ -15,12 +15,13 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ChildSpecs = [ worker(gmhc_server)
ChildSpecs = [ worker(gmhc_watchdog)
, worker(gmhc_server)
, worker(gmhc_handler)
, supervisor(gmhc_connectors_sup) ],
SupFlags = #{ strategy => one_for_one
, intensity => 1
, period => 5
SupFlags = #{ strategy => rest_for_one
, intensity => 5 %% We really want the hive client to sort itself out
, period => 5*60 %% Timemout issues can happen infrequently
, auto_shutdown => never },
{ok, {SupFlags, ChildSpecs}}.

167
src/gmhc_watchdog.erl Normal file
View File

@ -0,0 +1,167 @@
-module(gmhc_watchdog).
-behavior(gen_server).
-export([ watch/2
, unwatch/0
, ping/0 ]).
-export([ note_started/2
, remove_restart_info/1
, get_restart_info/0 ]).
-export([ start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3 ]).
-record(st, {services = #{}}).
-record(svc, { n = 5 :: pos_integer()
, n0 = 5 :: pos_integer()
, interval = 5000 :: pos_integer()
, tref :: reference()
, mref :: reference()
}).
-include_lib("kernel/include/logger.hrl").
watch(Interval, N) ->
gen_server:call(?MODULE, {watch, self(), Interval, N}).
unwatch() ->
gen_server:cast(?MODULE, {unwatch, self()}).
ping() ->
gen_server:cast(?MODULE, {ping, self()}).
note_started(Id, Info) ->
gen_server:call(?MODULE, {note_started, Id, Info}).
get_restart_info() ->
gen_server:call(?MODULE, get_restart_info).
remove_restart_info([]) ->
ok;
remove_restart_info(IDs) ->
gen_server:call(?MODULE, {remove_restart_info, IDs}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
handle_call({note_started, Id, Info}, _From, S) ->
update_pt(Id, Info),
{reply, ok, S};
handle_call({remove_restart_info, IDs}, _From, S) ->
remove_restart_info_(IDs),
{reply, ok, S};
handle_call(get_restart_info, _From, S) ->
{reply, get_pt(), S};
handle_call({watch, Pid, Interval, N}, _From, S) ->
{reply, ok, add_watch(Pid, Interval, N, S)};
handle_call(_Req, _From, S) ->
{reply, {error, unknown_method}, S}.
handle_cast({ping, Pid}, S) ->
{noreply, reset_watch(Pid, S)};
handle_cast({unwatch, Pid}, S) ->
{noreply, delete_watch(Pid, S)};
handle_cast(Msg, S) ->
?LOG_DEBUG("Unknown cast: ~p", [Msg]),
{noreply, S}.
handle_info({timeout, TRef, Pid}, S) ->
?LOG_INFO("Timeout for pid ~p", [Pid]),
{noreply, ping_timeout(Pid, TRef, S)};
handle_info({'DOWN', _, process, Pid, _}, S) ->
{noreply, delete_watch(Pid, S)};
handle_info(Msg, S) ->
?LOG_DEBUG("Unknown msg: ~p", [Msg]),
{noreply, S}.
terminate(_, _) ->
ok.
code_change(_FromVsn, S, _Extra) ->
{ok, S}.
add_watch(Pid, Interval, N, #st{services = Svcs} = S) ->
MRef = erlang:monitor(process, Pid),
Svc0 = #svc{ interval = Interval
, mref = MRef
, n = N
, n0 = N},
Svc = start_timer(Pid, Svc0),
S#st{services = Svcs#{Pid => Svc}}.
reset_watch(Pid, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{ n0 = N0 } = Svc} ->
Svc1 = restart_timer(Pid, Svc#svc{n = N0}),
S#st{services = Svcs#{Pid := Svc1}};
error ->
S
end.
delete_watch(Pid, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{tref = TRef, mref = MRef}} ->
erlang:cancel_timer(TRef),
erlang:demonitor(MRef),
S#st{services = maps:remove(Pid, Svcs)};
error ->
S
end.
ping_timeout(Pid, TRef, #st{services = Svcs} = S) ->
case maps:find(Pid, Svcs) of
{ok, #svc{ n = N, tref = TRef} = Svc} ->
N1 = N-1,
if N1 =< 0 ->
?LOG_ERROR("Will exit Pid ~p", [Pid]),
exit(Pid, kill),
S#st{services = maps:remove(Pid, Svcs)};
true ->
Svc1 = restart_timer(Pid, Svc#svc{n = N1}),
S#st{services = Svcs#{Pid := Svc1}}
end;
{ok, _} ->
?LOG_DEBUG("Timeout didn't match TRef - ignoring", []),
S;
_ ->
S
end.
start_timer(Pid, #svc{interval = T} = Svc) ->
TRef = erlang:start_timer(T, self(), Pid),
Svc#svc{tref = TRef}.
restart_timer(Pid, #svc{tref = TRef} = Svc) ->
erlang:cancel_timer(TRef),
start_timer(Pid, Svc#svc{tref = undefined}).
update_pt(Id, Info) ->
Pt = get_pt(),
put_pt(Pt#{Id => Info}).
remove_restart_info_(IDs) ->
RI = get_pt(),
case maps:without(IDs, RI) of
RI ->
ok;
NewRI ->
put_pt(NewRI)
end.
get_pt() ->
persistent_term:get(pt_key(), #{}).
put_pt(Pt) ->
persistent_term:put(pt_key(), Pt).
pt_key() ->
{?MODULE, restart_info}.

View File

@ -8,7 +8,7 @@
%%%-------------------------------------------------------------------
-module(gmhc_workers).
-vsn("0.4.8").
-vsn("0.8.3").
-export([
get_worker_configs/0

View File

@ -1,5 +1,5 @@
-module(gmhive_client).
-vsn("0.4.8").
-vsn("0.8.3").
-export([ connect/1
, disconnect/1

View File

@ -2,15 +2,15 @@
{type,app}.
{modules,[]}.
{prefix,"gmhc"}.
{desc,"Gajumaru Hive Client"}.
{author,"Ulf Wiger, QPQ AG"}.
{package_id,{"uwiger","gmhive_client",{0,5,1}}}.
{deps,[{"uwiger","gmhive_worker",{0,5,1}},
{desc,"Gajumaru Hive Client"}.
{package_id,{"uwiger","gmhive_client",{0,10,0}}}.
{deps,[{"uwiger","gmhive_protocol",{0,3,1}},
{"uwiger","gmhive_worker",{0,5,1}},
{"uwiger","gmcuckoo",{1,2,4}},
{"otpr","eblake2",{1,0,1}},
{"otpr","base58",{0,1,1}},
{"otpr","gmserialization",{0,1,3}},
{"uwiger","gmhive_protocol",{0,1,1}},
{"uwiger","setup",{2,2,4}},
{"uwiger","gproc",{1,0,1}},
{"uwiger","gmconfig",{0,1,2}},

View File

@ -38,5 +38,12 @@ rm "$IGNORE_TEMP"
cp "$PWD/zomp.meta" "$DST/"
cp "$PWD/Emakefile" "$DST/"
# copy generated schema
SCHEMA="$SRC/priv/gmhc_schema.json"
if [ -e "$SCHEMA" ]; then
mkdir -p "$DST/priv"
cp -a "$SCHEMA" "$DST/priv/$(basename "$SCHEMA")"
fi
# Clean up beam files just in case
[ -d "$DST/ebin" ] && find "$DST/ebin" -name '*.beam' -exec rm -f {} + || true