commit a979e18b5126275b2c3090f2a427b8cbaca6ecb8 Author: Ulf Wiger Date: Tue May 13 23:56:46 2025 +0200 First commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..844107a --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +.rebar3 +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +.directory +ebin/*.beam +log +erl_crash.dump +.rebar +logs +_build +REVISION +VERSION +*.erl~ +*.aes~ +*.config~ +*.args~ +data/mnesia +_checkouts*/ +tmp/ +rebar3.crashdump +eqc-lib +eqc +eunit_report/*.xml +.history +.vscode diff --git a/Emakefile b/Emakefile new file mode 100644 index 0000000..68c7b67 --- /dev/null +++ b/Emakefile @@ -0,0 +1 @@ +{"src/*", [debug_info, {i, "include/"}, {outdir, "ebin/"}]}. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a04128 --- /dev/null +++ b/LICENSE @@ -0,0 +1,165 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/README.md b/README.md new file mode 100644 index 0000000..8fefd3f --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +# Gajumaru Hive Client + +When running tests locally: + +`rebar3 shell` + +If connecting to the `testnet` hive: +`GMMP_CLIENT_CONFIG=gmhc_config-testnet.eterm rebar3 shell` + +If connecting to a `testnet` hive running locally: +`GMHC_CONFIG=gmhc_config-testnet-local.eterm rebar3 shell` diff --git a/config/sys.config b/config/sys.config new file mode 100644 index 0000000..43b4869 --- /dev/null +++ b/config/sys.config @@ -0,0 +1,28 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- + +[ + { kernel, + [ + {inet_dist_use_interface, {127,0,0,1}}, + {logger_level, all}, + {logger, + [ {handler, default, logger_std_h, + #{ + formatter => + {logger_formatter, #{ legacy_header => false + , single_line => true + , template => [ "[", time, "] " + , mfa, ":", line, " " + , pid, " " + , "[", level, "] " + , msg, "\n"] + }} + }} + ]} + ]}, + + { setup, [ + {data_dir, "$HOME/data"}, + {log_dir, "$HOME/log"} + ]} +]. diff --git a/config/vm.args b/config/vm.args new file mode 100644 index 0000000..ee38dc0 --- /dev/null +++ b/config/vm.args @@ -0,0 +1,7 @@ +-sname gmhc@localhost + +-setcookie gmhc_cookie + +# This setting should not be changed, since this will not do any harm if the +# system doesn't support kernel polling. ++K true diff --git a/ebin/gmhive_client.app b/ebin/gmhive_client.app new file mode 100644 index 0000000..c5b3ad3 --- /dev/null +++ b/ebin/gmhive_client.app @@ -0,0 +1,102 @@ +{application,gmhive_client, + [{description,"Gajumaru Hive client"}, + {vsn,"0.1.0"}, + {registered,[]}, + {applications,[kernel,stdlib,sasl,gproc,enoise,gmconfig, + gmhive_protocol,gmhive_worker]}, + {mod,{gmhive_client,[]}}, + {start_phases,[{connect_to_primary,[]}]}, + {env,[]}, + {modules,[base58,eblake2,eblake2_tests,timing,enacl_eqc, + enacl_ext_eqc,enacl,enacl_ext,enacl_nif,enacl_SUITE, + enoise,enoise_cipher_state,enoise_connection, + enoise_crypto,enoise_hs_state,enoise_keypair, + enoise_protocol,enoise_sym_state, + enoise_bad_data_tests,enoise_chiper_state_tests, + enoise_crypto_tests,enoise_hs_state_tests, + enoise_protocol_tests,enoise_sym_state_tests, + enoise_tests,enoise_utils,test_utils,gmconfig, + gmconfig_schema_helpers,gmconfig_schema_utils, + gmcuckoo,gmhc_app,gmhc_config,gmhc_config_schema, + gmhc_connector,gmhc_connectors_sup,gmhc_counters, + gmhc_eureka,gmhc_events,gmhc_handler,gmhc_server, + gmhc_sup,gmhc_workers,gmhive_client,gmhp_msgs, + gmhw_blake2b_256,gmhw_pow,gmhw_pow_cuckoo, + gmhw_siphash24,gmcuckoo_SUITE,gmhw_pow_cuckoo_tests, + gmhw_pow_tests,gmser_api_encoder,gmser_chain_objects, + gmser_contract_code,gmser_delegation,gmser_dyn, + gmser_id,gmser_rlp,gmserialization, + gmser_api_encoder_tests,gmser_chain_objects_tests, + gmser_contract_code_tests,gmser_delegation_tests, + gmser_rlp_tests,gproc,gproc_app,gproc_bcast, + gproc_dist,gproc_info,gproc_init,gproc_lib, + gproc_monitor,gproc_pool,gproc_ps,gproc_pt,gproc_sup, + setup,setup_app,setup_file,setup_file_io_server, + setup_gen,setup_lib,setup_srv,setup_sup,setup_zomp, + testapp_app,testapp_sup,testapp_app,testapp_sup, + hex_api,hex_api_auth,hex_api_key,hex_api_organization, + hex_api_organization_member,hex_api_package, + hex_api_package_owner,hex_api_release,hex_api_user, + hex_core,hex_erl_tar,hex_filename,hex_http, + hex_http_httpc,hex_licenses,hex_pb_names, + hex_pb_package,hex_pb_signed,hex_pb_versions, + hex_registry,hex_repo,hex_tarball,safe_erl_term,pc, + pc_compilation,pc_port_env,pc_port_specs,pc_prv_clean, + pc_prv_compile,pc_util,rebar3_hex,rebar3_hex_app, + rebar3_hex_build,rebar3_hex_client,rebar3_hex_config, + rebar3_hex_cut,rebar3_hex_error,rebar3_hex_file, + rebar3_hex_httpc_adapter,rebar3_hex_io,rebar3_hex_key, + rebar3_hex_organization,rebar3_hex_owner, + rebar3_hex_publish,rebar3_hex_results, + rebar3_hex_retire,rebar3_hex_search,rebar3_hex_user, + rebar3_hex_version, + rebar_aecuckooprebuilt_app_with_priv_from_git_resource, + rebar_aecuckooprebuilt_dep,verl,verl_parser,base58, + eblake2,eblake2_tests,timing,enacl_eqc,enacl_ext_eqc, + enacl,enacl_ext,enacl_nif,enacl_SUITE,enoise, + enoise_cipher_state,enoise_connection,enoise_crypto, + enoise_hs_state,enoise_keypair,enoise_protocol, + enoise_sym_state,enoise_bad_data_tests, + enoise_chiper_state_tests,enoise_crypto_tests, + enoise_hs_state_tests,enoise_protocol_tests, + enoise_sym_state_tests,enoise_tests,enoise_utils, + test_utils,gmconfig,gmconfig_schema_helpers, + gmconfig_schema_utils,gmcuckoo,gmhp_msgs, + gmhw_blake2b_256,gmhw_pow,gmhw_pow_cuckoo, + gmhw_siphash24,gmcuckoo_SUITE,gmhw_pow_cuckoo_tests, + gmhw_pow_tests,gmser_api_encoder,gmser_chain_objects, + gmser_contract_code,gmser_delegation,gmser_dyn, + gmser_id,gmser_rlp,gmserialization, + gmser_api_encoder_tests,gmser_chain_objects_tests, + gmser_contract_code_tests,gmser_delegation_tests, + gmser_rlp_tests,gproc,gproc_app,gproc_bcast, + gproc_dist,gproc_info,gproc_init,gproc_lib, + gproc_monitor,gproc_pool,gproc_ps,gproc_pt,gproc_sup, + setup,setup_app,setup_file,setup_file_io_server, + setup_gen,setup_lib,setup_srv,setup_sup,setup_zomp, + testapp_app,testapp_sup,testapp_app,testapp_sup, + hex_api,hex_api_auth,hex_api_key,hex_api_organization, + hex_api_organization_member,hex_api_package, + hex_api_package_owner,hex_api_release,hex_api_user, + hex_core,hex_erl_tar,hex_filename,hex_http, + hex_http_httpc,hex_licenses,hex_pb_names, + hex_pb_package,hex_pb_signed,hex_pb_versions, + hex_registry,hex_repo,hex_tarball,safe_erl_term,pc, + pc_compilation,pc_port_env,pc_port_specs,pc_prv_clean, + pc_prv_compile,pc_util,rebar3_hex,rebar3_hex_app, + rebar3_hex_build,rebar3_hex_client,rebar3_hex_config, + rebar3_hex_cut,rebar3_hex_error,rebar3_hex_file, + rebar3_hex_httpc_adapter,rebar3_hex_io,rebar3_hex_key, + rebar3_hex_organization,rebar3_hex_owner, + rebar3_hex_publish,rebar3_hex_results, + rebar3_hex_retire,rebar3_hex_search,rebar3_hex_user, + rebar3_hex_version, + rebar_aecuckooprebuilt_app_with_priv_from_git_resource, + rebar_aecuckooprebuilt_dep,verl,verl_parser,gmhc_app, + gmhc_config,gmhc_config_schema,gmhc_connector, + gmhc_connectors_sup,gmhc_counters,gmhc_eureka, + gmhc_events,gmhc_handler,gmhc_server,gmhc_sup, + gmhc_workers,gmhive_client]}, + {maintainers,["QPQ IaaS AG"]}, + {licensens,["ISC"]}, + {links,[{"gitea","https://git.qpq.swiss/gmhive_client"}]}]}. diff --git a/gmhc_config-testnet-local.eterm b/gmhc_config-testnet-local.eterm new file mode 100644 index 0000000..44f6888 --- /dev/null +++ b/gmhc_config-testnet-local.eterm @@ -0,0 +1,11 @@ +#{ pubkey => <<"ak_zjyvDmMbXafMADYrxDs4uMiYM5zEVJBqWgv619NUQ37Gkwt7z">> + , pool_admin => #{url => <<"local">>} + , pool => #{id => <<"ct_26xqeE3YKmZV8jrks57JSgZRCHSuG4RGzpnvdz6AAiSSTVbJRM">>, + host => <<"127.0.0.1">>} + , mining => + [#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ] + }. diff --git a/gmhc_config-testnet.eterm b/gmhc_config-testnet.eterm new file mode 100644 index 0000000..ce137d1 --- /dev/null +++ b/gmhc_config-testnet.eterm @@ -0,0 +1,10 @@ +#{ pubkey => <<"ak_zjyvDmMbXafMADYrxDs4uMiYM5zEVJBqWgv619NUQ37Gkwt7z">> + , pool => #{id => <<"ct_26xqeE3YKmZV8jrks57JSgZRCHSuG4RGzpnvdz6AAiSSTVbJRM">>, + host => <<"62.171.150.201">>} + , mining => + [#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ,#{executable => <<"mean15-generic">>} + ] + }. diff --git a/gmhc_config.eterm b/gmhc_config.eterm new file mode 100644 index 0000000..aaff7ae --- /dev/null +++ b/gmhc_config.eterm @@ -0,0 +1,5 @@ +#{ pubkey => <<"ak_2KAcA2Pp1nrR8Wkt3FtCkReGzAi8vJ9Snxa4PcmrthVx8AhPe8">> + , pool => #{id => <<"ct_LRbi65kmLtE7YMkG6mvG5TxAXTsPJDZjAtsPuaXtRyPA7gnfJ">>} + , mining => + [#{executable => <<"mean15-generic">>}] + }. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..6345fbc --- /dev/null +++ b/rebar.config @@ -0,0 +1,40 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- + +{minimum_otp_vsn, "27.1"}. + +{erl_opts, [debug_info]}. +{plugins, [rebar3_hex]}. + +{deps, [ + {enoise, {git, "https://git.qpq.swiss/QPQ-AG/enoise.git", {ref, "029292817e"}}}, + {gmhive_protocol, + {git, "https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git", + {ref, "ea549141f5"}}}, + {gmhive_worker, {git, "https://git.qpq.swiss/QPQ-AG/gmhive_worker", {ref, "02ab56dec4"}}}, + {gmconfig, {git, "https://git.qpq.swiss/QPQ-AG/gmconfig.git", + {ref, "32c1ed5c4b"}}}, + {gproc, "1.0.0"}, + {setup, {git, "https://github.com/uwiger/setup", {ref, "3ad83ed"}}} + ]}. + +{post_hooks, [{compile, "./zompify.sh"}]}. + +{relx, [ + {release, { gmhive_client, "0.1.0" }, + [sasl, enacl, enoise, gmhive_worker, + {gmhive_protocol, load}, gmserialization, gproc, gmconfig, gmhive_client ]}, + {dev_mode, true}, + {sys_config, "./config/sys.config"}, + {vm_args, "./config/vm.args"}, + {include_erts, false}, + {generate_start_script, true} + ]}. + +{xref_checks, [undefined_function_calls, undefined_functions, + locals_not_used, + deprecated_function_calls, deprecated_functions]}. + +{dialyzer, [ {warnings, [unknown]} + , {plt_apps, all_deps} + , {base_plt_apps, [erts, kernel, stdlib]} + ]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..ebe53c0 --- /dev/null +++ b/rebar.lock @@ -0,0 +1,53 @@ +{"1.2.0", +[{<<"aecuckooprebuilt">>, + {aecuckooprebuilt_app_with_priv_from_git, + {git,"https://github.com/aeternity/cuckoo-prebuilt.git", + {ref,"90afb699dc9cc41d033a7c8551179d32b3bd569d"}}}, + 1}, + {<<"base58">>, + {git,"https://git.qpq.swiss/QPQ-AG/erl-base58.git", + {ref,"e6aa62eeae3d4388311401f06e4b939bf4e94b9c"}}, + 2}, + {<<"eblake2">>, + {git,"https://git.qpq.swiss/QPQ-AG/eblake2.git", + {ref,"b29d585b8760746142014884007eb8441a3b6a14"}}, + 1}, + {<<"enacl">>, + {git,"https://git.qpq.swiss/QPQ-AG/enacl.git", + {ref,"4eb7ec70084ba7c87b1af8797c4c4e90c84f95a2"}}, + 2}, + {<<"enoise">>, + {git,"https://git.qpq.swiss/QPQ-AG/enoise.git", + {ref,"029292817ea1c685b6377bfae667976c0f4a36bc"}}, + 0}, + {<<"gmconfig">>, + {git,"https://git.qpq.swiss/QPQ-AG/gmconfig.git", + {ref,"32c1ed5c4bc0913adc175fb18b1c9251acc7b606"}}, + 0}, + {<<"gmcuckoo">>, + {git,"https://git.qpq.swiss/QPQ-AG/gmcuckoo.git", + {ref,"9f3aef96d8660e83ef8b5a7ee64908060a8c6572"}}, + 1}, + {<<"gmhive_protocol">>, + {git,"https://git.qpq.swiss/QPQ-AG/gmhive_protocol.git", + {ref,"ea549141f5943b2b1b109a2653b8cd60778107c7"}}, + 0}, + {<<"gmhive_worker">>, + {git,"https://git.qpq.swiss/QPQ-AG/gmhive_worker", + {ref,"02ab56dec4b50947afc118712c96e8c7b9f6b3fe"}}, + 0}, + {<<"gmserialization">>, + {git,"https://git.qpq.swiss/QPQ-AG/gmserialization.git", + {ref,"0288719ae15814f3a53114c657502a24376bebfa"}}, + 1}, + {<<"gproc">>,{pkg,<<"gproc">>,<<"1.0.0">>},0}, + {<<"setup">>, + {git,"https://github.com/uwiger/setup", + {ref,"3ad83ed3cade6780897d11f5f0d987012b93345b"}}, + 0}]}. +[ +{pkg_hash,[ + {<<"gproc">>, <<"AA9EC57F6C9FF065B16D96924168D7C7157CD1FD457680EFE4B1274F456FA500">>}]}, +{pkg_hash_ext,[ + {<<"gproc">>, <<"109F253C2787DE8A371A51179D4973230CBEC6239EE673FA12216A5CE7E4F902">>}]} +]. diff --git a/src/gmhc_app.erl b/src/gmhc_app.erl new file mode 100644 index 0000000..40fbf2b --- /dev/null +++ b/src/gmhc_app.erl @@ -0,0 +1,51 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- +-module(gmhc_app). + +-behaviour(application). + +-export([ start/1 ]). + +-export([ start/2 + , start_phase/3 + , prep_stop/1 + , stop/1 + ]). + +-include_lib("kernel/include/logger.hrl"). + +-spec start([{atom(), any()}]) -> {ok, [atom()]} | {error, any()}. +start(Opts) -> + application:load(gmmp_client), + {error,_} = application:stop(gmmp_client), + _ = lists:foreach(fun({K, V}) -> + application:set_env(gmmp_client, K, V) + end, Opts), + application:ensure_all_started(gmmp_client). + +start(_StartType, _StartArgs) -> + set_things_up(), + gmhc_sup:start_link(). + +start_phase(connect_to_primary, _StartType, []) -> + case application:get_env(gmmp_client, auto_connect, true) of + true -> + gmhc_connectors_sup:start_first_connector(); + false -> + skip + end, + ok; +start_phase(_Phase, _StartType, _PhaseArgs) -> + ok. + +prep_stop(_State) -> + ok. + +stop(_State) -> + ok. + +set_things_up() -> + gmhc_counters:initialize(), + gmhc_config:load_config(), + logger:set_module_level([gmhw_pow_cuckoo], notice), + ?LOG_DEBUG("Config: ~p", [gmconfig:user_config()]), + ok. diff --git a/src/gmhc_config.erl b/src/gmhc_config.erl new file mode 100644 index 0000000..174b55a --- /dev/null +++ b/src/gmhc_config.erl @@ -0,0 +1,67 @@ +-module(gmhc_config). + +-export([ load_config/0 + , get_config/1 + , verify_cfg_props/1 + ]). + +-include_lib("kernel/include/logger.hrl"). + +load_config() -> + instrument_gmconfig(), + ?LOG_DEBUG("Schema: ~p", [gmconfig:schema()]), + gmconfig:load_user_config(report), + gmconfig:apply_os_env(), + gmconfig:process_plain_args(), + ok. + +instrument_gmconfig() -> + gmconfig:set_gmconfig_env(gmconfig_env()). + +-spec gmconfig_env() -> gmconfig:gmconfig(). +gmconfig_env() -> + #{ os_env_prefix => "GMHC" + , config_file_basename => "gmhive_client_config" + , config_file_os_env => "GMHIVE_CLIENT_CONFIG" + , config_file_search_path => [".", fun setup:home/0, fun setup:data_dir/0 ] + , config_plain_args => "-gmhc" + , system_suffix => "" + , schema => fun gmhc_config_schema:to_json/0 }. + +get_config(Path) -> + gmconfig:get_config(Path, cpath(Path)). + +cpath([<<"pubkey">>]) -> [user_config, env(pubkey ) ]; +cpath([<<"extra_pubkeys">>]) -> [user_config, env(extra_pubkeys ), schema_default]; +cpath([<<"pool_admin">>, <<"url">>]) -> [user_config, env(pool_admin_url), schema_default]; +cpath([<<"workers">>]) -> [user_config, env(workers ), schema_default]; +cpath(_) -> [user_config, schema_default]. + +env2cpath(pubkey ) -> [<<"pubkey">>]; +env2cpath(extra_pubkeys ) -> [<<"extra_pubkeys">>]; +env2cpath(pool_admin_url) -> [<<"pool_admin">>, <<"url">>]; +env2cpath(workers ) -> [<<"workers">>]. + +verify_cfg_props(PropList) -> + Cfg = lists:foldl( + fun({K,V}, M) -> + CfgK = env2cpath(K), + gmconfig:merge_config_maps(M, to_cfg_map(CfgK, V)) + end, #{}, PropList), + case gmconfig_schema_utils:valid(Cfg) of + Ok when is_map(Ok) -> + ok; + Other -> + error({invalid_config, Other}) + end. + +to_cfg_map(K, V) -> + to_cfg_map(K, V, #{}). + +to_cfg_map([H], V, M) -> + M#{H => V}; +to_cfg_map([H|T], V, M) -> + M#{H => to_cfg_map(T, V, M)}. + +env(K) -> + {env, gmhive_client, K}. diff --git a/src/gmhc_config_schema.erl b/src/gmhc_config_schema.erl new file mode 100644 index 0000000..ef8b14b --- /dev/null +++ b/src/gmhc_config_schema.erl @@ -0,0 +1,98 @@ +-module(gmhc_config_schema). + +-export([ schema/0 + , to_json/0 ]). + +-import(gmconfig_schema_helpers, + [ str/1 + , str/2 + , pos_int/2 + , int/2 + , bool/2 + , obj/1 + , obj/2 + , array/2 + , schema_init/0 + ]). + +-define(ACCOUNT_PATTERN, <<"^ak_[1-9A-HJ-NP-Za-km-z]*$">>). +-define(CONTRACT_PATTERN, <<"^ct_[1-9A-HJ-NP-Za-km-z]*$">>). + +to_json() -> + json:encode(schema()). + +schema() -> + obj(schema_init(), + #{ + pubkey => str(#{pattern => ?ACCOUNT_PATTERN}, + <<"Primary client pubkey">>) + , extra_pubkeys => array(#{ description => + <<"Additional miner pubkeys, sharing rewards">> + , default => [] + }, + str(#{pattern => ?ACCOUNT_PATTERN})) + , type => str(#{ enum => [<<"miner">>, <<"monitor">>] + , default => <<"miner">> + , description => <<"monitor mode can be used to see if a pool is alive">>}) + , pool => pool() + , pool_admin => pool_admin() + , workers => workers() + }). + +pool() -> + obj(#{ + id => str(#{ pattern => ?CONTRACT_PATTERN}, + <<"Pool contract id">>), + host => str(#{ default => <<"127.0.0.1">> + , example => <<"0.0.0.0">> + , description => <<"Hostname of hive server">> }) + , port => pos_int(17888, <<"Hive server listen port">>) + }). + +pool_admin() -> + obj(#{ + url => str(#{ default => <<"https://test.gajumining.com/api/workers/{CLIENT_ID}">> + , description => <<"URL of Eureka worker api">> }) + }). + +workers() -> + array( + #{default => [#{executable => <<"mean29-generic">>}], + description => + <<"Definitions of workers' configurations. If no worker are configured one worker " + "is used as default, i.e. 'mean29-generic' executable without any extra args.">>}, + obj(#{required => [<<"executable">>]}, + #{executable => + str(#{default => <<"mean29-generic">>, + description => + <<"Executable binary of the worker. Options are: \"mean29-generic\"" + " (memory-intensive), " + "\"mean29-avx2\" (memory-intensive, benefits from faster CPU supporting" + " AVX2 instructions), " + "\"lean29-generic\" (CPU-intensive, useful if memory-constrained), " + "\"lean29-avx2\" (CPU-intensive, useful if memory-constrained, benefits " + "from faster CPU supporting AVX2 instructions).">>}), + executable_group => + str(#{description => <<"Group of executable binaries of the worker.">>, + enum => [ <<"aecuckoo">>, <<"aecuckooprebuilt">>, <<"gmcuckkoo">>, <<"cuda">>, <<"gajumine">> ], + default => <<"aecuckoo">>}), + extra_args => + str(#{description => <<"Extra arguments to pass to the worker executable binary. " + "The safest choice is specifying no arguments i.e. empty string.">>, + default => <<>>}), + hex_encoded_header => + bool(false, <<"Hexadecimal encode the header argument that is send to the worker executable. " + "CUDA executables expect hex encoded header.">>), + repeats => + int(1, <<"Number of tries to do in each worker context - " + "WARNING: it should be set so the worker process " + "runs for 3-5s or else the node risk missing out on new micro blocks.">>), + instances => + array(#{description => + <<"Instances used by the worker in case of Multi-GPU mining. " + "Numbers on the configuration list represent GPU devices that are to " + "be addressed by the worker.">>, + minItems => 1, + example => [0,1,2,3]}, + #{type => <<"integer">>}) + })). diff --git a/src/gmhc_connector.erl b/src/gmhc_connector.erl new file mode 100644 index 0000000..df9adda --- /dev/null +++ b/src/gmhc_connector.erl @@ -0,0 +1,468 @@ +-module(gmhc_connector). + +-behaviour(gen_server). + +-export([ start_link/1 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([ + connect/1 + , disconnect/1 + , status/0 + , status/1 + , send/2 + ]). + +-export([ whereis_id/1 ]). + +%% for outcoing messages +%% -export([ solution/3 +%% , no_solution/2 +%% , nonces/2 +%% ]). + +-include_lib("kernel/include/logger.hrl"). + +-define(MAX_RETRY_INTERVAL, 30000). + +-type retry_opts() :: #{ deadline => pos_integer() }. + +-type timer_ref() :: {reference(), non_neg_integer(), non_neg_integer(), retry_opts()}. + +-type id() :: non_neg_integer(). + +-type connect_opts() :: #{ host => string() + , port => pos_integer() + , timeout => pos_integer() %% overrides deadline + , deadline => pos_integer() %% ms monotonic must be in the future + , nowait => boolean() %% default: false + , tcp_opts => list() + , enoise_opts => list() %% if present, MUST contain {'noise', _} + , tcp_opts => list() + , connect_timeout => pos_integer() + }. + +-export_type([ id/0 + , connect_opts/0 ]). + +-record(st, { + id :: non_neg_integer() + , auto_connect = true :: boolean() + , econn + , reconnect_timer :: timer_ref() | 'undefined' + , awaiting_connect = [] :: list() + , protocol :: binary() | 'undefined' + , version :: binary() | 'undefined' + , opts = #{} :: map() + }). + +-spec connect(connect_opts()) -> ok | {error, any()}. +connect(Opts) when is_map(Opts) -> + {Timeout, Opts1} = manual_connect_timeout(Opts), + gen_server:call(?MODULE, {connect, Opts1}, Timeout). + +manual_connect_timeout(#{timeout := T} = Opts) -> + Deadline = erlang:monotonic_time(millisecond) + T, + {T, maps:remove(timeout, Opts#{deadline => Deadline})}; +manual_connect_timeout(#{deadline := D} = Opts) -> + Timeout = D - erlang:monotonic_time(millisecond), + if Timeout < 0 -> + error(invalid_deadline); + true -> + {Timeout, Opts} + end; +manual_connect_timeout(Opts) -> + DefaultT = 10000, + Deadline = erlang:monotonic_time(millisecond) + DefaultT, + {DefaultT, Opts#{deadline => Deadline}}. + +disconnect(Id) -> + gen_server:call(via(Id), disconnect). + +send(Via, Msg) -> + gen_server:cast(via(Via), {send, Msg}). + +status() -> + {via, gproc, HeadPat} = via('$1'), + Connectors = gproc:select([{{HeadPat,'_','_'}, [], ['$1']}]), + [{Id, status_(via(Id))} || Id <- Connectors]. + +status(Id) -> + {via, gproc, Req} = via(Id), + case gproc:where(Req) of + undefined -> + disconnected; + Pid when is_pid(Pid) -> + status_(Pid) + end. + +status_(Proc) -> + try gen_server:call(Proc, status) + catch + error:_ -> + disconnected + end. + +%% start_link() -> +%% start_link(#{}). + +whereis_id(Id) -> + gproc:where(reg(Id)). + +reg(Id) -> + {n, l, {?MODULE, Id}}. + +via(Id) -> + {via, gproc, reg(Id)}. + +start_link(#{id := Id} = Opts) -> + gen_server:start_link(via(Id), ?MODULE, Opts, []). + +init(#{id := Id} = Opts) when is_map(Opts) -> + AutoConnect = opt_autoconnect(Opts), + S0 = #st{id = Id, auto_connect = AutoConnect}, + Nowait = maps:get(nowait, Opts, false), + if Nowait -> + proc_lib:init_ack({ok, self()}); + true -> + ok + end, + S1 = + case AutoConnect of + true -> + case try_connect(Opts, S0) of + {ok, S} -> + ?LOG_DEBUG("Initial connect succeeded", []), + S; + {error, _} = Error -> + ?LOG_WARNING("Could not connect to core server: ~p", [Error]), + start_reconnect_timer(S0#st{econn = undefined}) + end; + false -> + S0 + end, + if Nowait -> + gen_server:enter_loop(?MODULE, [], S1, via(Id)); + true -> + {ok, S1} + end. + +handle_call(status, _From, #st{econn = EConn} = S) -> + Status = case EConn of + undefined -> disconnected; + _ -> connected + end, + {reply, Status, S}; +handle_call({connect, Opts}, From, #st{awaiting_connect = Waiters} = S) -> + Nowait = maps:get(nowait, Opts, false), + case Nowait of + true -> gen_server:reply(From, ok); + false -> ok + end, + case try_connect(Opts, S) of + {ok, S1} -> + if Nowait -> {noreply, S1}; + true -> {reply, ok, S1} + end; + {error, _} = Error -> + case maps:get(retry, Opts, true) of + true -> + Waiters1 = if Nowait -> Waiters; + true -> [{From, retry_opts(Opts)}|Waiters] + end, + S1 = start_reconnect_timer( + S#st{ auto_connect = true + , awaiting_connect = Waiters1 }, Opts), + {noreply, S1}; + false -> + if Nowait -> {noreply, S}; + true -> {reply, Error, S} + end + end + end; +handle_call(disconnect, _From, #st{econn = EConn} = S) -> + case EConn of + undefined -> + ok; + _ -> + + enoise:close(EConn) + end, + S1 = cancel_reconnect_timer(S#st{ auto_connect = false + , econn = undefined }), + {reply, ok, S1}; +handle_call(_Req, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast({send, Msg0}, #st{ econn = EConn + , protocol = P + , version = V } = S) when EConn =/= undefined -> + try + Msg = maps:remove(via, Msg0), + Data = gmhp_msgs:encode(Msg, P, V), + enoise:send(EConn, Data) + catch + error:E:T -> + ?LOG_ERROR("CAUGHT error:~p / ~p", [E, T]) + end, + {noreply, S}; +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info({noise, EConn, Data}, #st{ id = Id + , econn = EConn + , protocol = P, version = V} = S) -> + try gmhp_msgs:decode(Data, P, V) of + Msg -> + gmhc_handler:from_pool(Msg#{via => Id}) + catch + error:E -> + ?LOG_WARNING("Unknown message (~p): ~p", [E, Data]) + end, + {noreply, S}; +handle_info({timeout, TRef, {reconnect, Opts}}, #st{ reconnect_timer = {TRef, _, _, _} + , auto_connect = true } = S) -> + case try_connect(Opts, S) of + {ok, S1} -> + ?LOG_DEBUG("protocol connected", []), + {noreply, S1}; + {error, _} = Error -> + ?LOG_DEBUG("Reconnect attempt failed: ~p", [Error]), + {noreply, restart_reconnect_timer(S)} + end; +handle_info({tcp_closed, _Port}, #st{} = S) -> + ?LOG_DEBUG("got tcp_closed", []), + disconnected(S#st.id), + S1 = case S#st.auto_connect of + true -> start_reconnect_timer(S#st{econn = undefined}); + false -> S#st{econn = undefined} + end, + {noreply, S1}; +handle_info(Msg, S) -> + ?LOG_DEBUG("Discarding msg (auto_connect=~p): ~p", [S#st.auto_connect, Msg]), + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_FromVsn, S, _Extra) -> + {ok, S}. + +%% try_connect() -> +%% try_connect(#{}, #st{}). + +%% try_connect(#st{} = S) -> +%% try_connect(#{}, S). + +try_connect(Opts, S) -> + try try_connect_(Opts, S) + catch + error:E:T -> + ?LOG_ERROR("Unexpected error connecting: ~p / ~p", [E, T]), + {error, E} + end. + +try_connect_(Opts0, S) -> + case eureka_get_host_port() of + {error, _} = Error -> + Error; + PoolOpts when is_map(PoolOpts) -> + case try_noise_connect(maps:merge(Opts0, PoolOpts)) of + {ok, EConn, Opts1} -> + S1 = protocol_connect(Opts1, S#st{ econn = EConn + , reconnect_timer = undefined }), + {ok, S1}; + {error, _} = Error -> + Error + end + end. + +eureka_get_host_port() -> + case gmhc_eureka:get_pool_address() of + #{<<"address">> := Host, + <<"port">> := Port, + <<"pool_id">> := PoolId} -> + #{host => binary_to_list(Host), + port => Port, + pool_id => binary_to_list(PoolId)}; + {error, _} = Error -> + Error + end. + + +try_noise_connect(Opts) -> + Host = binary_to_list(gmhc_config:get_config([<<"pool">>, <<"host">>])), + Port = gmhc_config:get_config([<<"pool">>, <<"port">>]), + noise_connect(maps:merge(#{host => Host, port => Port}, Opts)). + +noise_connect(#{host := Host, port := Port} = Opts) -> + TcpOpts = maps:get(tcp_opts, Opts, default_tcp_opts()), + Timeout = maps:get(connect_timeout, Opts, 5000), + ?LOG_DEBUG("TCP connect: Host=~p, Port=~p, Timeout=~p, Opts=~p", + [Host,Port,Timeout,TcpOpts]), + case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of + {ok, TcpSock} -> + ?LOG_DEBUG("Connected, TcpSock = ~p", [TcpSock]), + EnoiseOpts = maps:get(enoise_opts, Opts, enoise_opts()), + case enoise:connect(TcpSock, EnoiseOpts) of + {ok, EConn, _FinalSt} -> + {ok, EConn, Opts#{ tcp_opts => TcpOpts + , timeout => Timeout + , enoise_opts => EnoiseOpts }}; + {error, _} = Err -> + Err + end; + {error, _} = TcpErr -> + ?LOG_DEBUG("TCP connection failed: ~p", [TcpErr]), + TcpErr + end. + +default_tcp_opts() -> + [ {active, true} + , {reuseaddr, true} + , {mode, binary} + ]. + +enoise_opts() -> + [{noise, <<"Noise_NN_25519_ChaChaPoly_BLAKE2b">>}]. + +start_reconnect_timer(#st{} = S) -> + start_reconnect_timer(S, #{}). + +start_reconnect_timer(#st{} = S, Opts) -> + case deadline_reached(Opts) of + {true, D} -> + ?LOG_DEBUG("timer deadline reached, not restarting timer", []), + notify_deadline(D, S#st{reconnect_timer = undefined}); + false -> + ?LOG_DEBUG("starting reconnect timer ...", []), + TRef = start_timer(1000, Opts), + S#st{reconnect_timer = {TRef, 10, 1000, Opts}} + end. + +restart_reconnect_timer(#st{reconnect_timer = {_, 0, T, Opts}} = S) -> + NewT = max(T * 2, ?MAX_RETRY_INTERVAL), + TRef = start_timer(NewT, Opts), + S#st{reconnect_timer = {TRef, 10, NewT, Opts}}; +restart_reconnect_timer(#st{reconnect_timer = {_, N, T, Opts}} = S) -> + TRef = start_timer(T, Opts), + S#st{reconnect_timer = {TRef, N-1, T, Opts}}. + +deadline_reached(#{deadline := D}) -> + case erlang:monotonic_time(millisecond) > D of + true -> {true, D}; + false -> false + end; +deadline_reached(_) -> + false. + +notify_deadline(D, #st{awaiting_connect = Waiters} = S) -> + Waiters1 = + lists:foldr(fun({From, D1}, Acc) when D1 == D -> + gen_server:reply(From, {error, timeout}), + Acc; + (Other, Acc) -> + [Other | Acc] + end, [], Waiters), + S#st{awaiting_connect = Waiters1}. + +notify_connected(#st{id = Id, awaiting_connect = Waiters} = S) -> + gmhc_events:publish(connected, #{id => Id}), + [gen_server:reply(From, ok) || {From, _} <- Waiters], + gmhc_handler:pool_connected(S#st.id, S#st.opts), + S#st{awaiting_connect = []}. + +cancel_reconnect_timer(#st{reconnect_timer = T} = S) -> + case T of + undefined -> S; + {TRef, _, _, _} -> + erlang:cancel_timer(TRef), + S#st{reconnect_timer = undefined} + end. + +start_timer(T, Opts0) -> + Opts = retry_opts(Opts0), + erlang:start_timer(T, self(), {reconnect, Opts}). + +retry_opts(Opts) -> + maps:with([deadline], Opts). + +protocol_connect(Opts, #st{econn = EConn} = S) -> + Pubkey = to_bin(opt(pubkey, Opts, [<<"pubkey">>])), + Extra = [to_bin(X) || X <- opt(extra_pubkeys, Opts, [<<"extra_pubkeys">>])], + PoolId = to_bin(opt(pool_id, Opts, [<<"pool">>, <<"id">>])), + Type = to_atom(opt(type, Opts, [<<"type">>])), + RId = erlang:unique_integer(), + Vsns = gmhp_msgs:versions(), + Protocols = gmhp_msgs:protocols(hd(Vsns)), + ConnectReq = #{ protocols => Protocols + , versions => Vsns + , pool_id => PoolId + , pubkey => Pubkey + , extra_pubkeys => Extra + , type => Type + , nonces => gmhc_server:total_nonces() + , signature => ""}, + ?LOG_DEBUG("ConnectReq = ~p", [ConnectReq]), + Msg = gmhp_msgs:encode_connect(ConnectReq, RId), + enoise:send(EConn, Msg), + receive + {noise, EConn, Data} -> + case gmhp_msgs:decode_connect_ack(Data) of + #{reply := #{ id := RId + , result := #{connect_ack := #{ protocol := P + , version := V }} + }} -> + connected(S#st.id, Type), + Opts1 = Opts#{ pubkey => Pubkey + , extra => Extra + , pool_id => PoolId + , type => Type }, + notify_connected(S#st{protocol = P, version = V, opts = Opts1}); + #{error := #{message := Msg}} -> + ?LOG_ERROR("Connect error: ~s", [Msg]), + error(protocol_connect) + end + after 10000 -> + error(protocol_connect_timeout) + end. + +to_bin(A) when is_atom(A) -> + atom_to_binary(A, utf8); +to_bin(S) -> + iolist_to_binary(S). + +to_atom(A) when is_atom(A) -> A; +to_atom(S) -> + binary_to_existing_atom(iolist_to_binary(S), utf8). + +connected(Id, Type) when Type==miner; Type==monitor -> + gmhc_server:connected(Id, Type). + +disconnected(Id) -> + gmhc_events:publish(disconnected, #{id => Id}), + gmhc_server:disconnected(Id). + +opt_autoconnect(#{auto_connect := Bool}) when is_boolean(Bool) -> + Bool; +opt_autoconnect(#{id := Id}) -> + case Id of + 1 -> + application:get_env(gmhive_client, auto_connect, true); + _ -> + true + end. + +opt(OptsK, Opts, SchemaPath) -> + case maps:find(OptsK, Opts) of + error -> + gmhc_config:get_config(SchemaPath); + {ok, V} -> + V + end. diff --git a/src/gmhc_connectors_sup.erl b/src/gmhc_connectors_sup.erl new file mode 100644 index 0000000..45c15f0 --- /dev/null +++ b/src/gmhc_connectors_sup.erl @@ -0,0 +1,34 @@ +-module(gmhc_connectors_sup). +-behavior(supervisor). + +-export([ start_link/0 + , init/1 ]). + +-export([ start_first_connector/0 + , start_connector/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_first_connector() -> + start_connector(#{}). + +start_connector(Opts0) -> + Opts = case maps:is_key(id, Opts0) of + true -> Opts0; + false -> Opts0#{id => gmhc_counters:add_read(connector)} + end, + supervisor:start_child(?MODULE, [Opts]). + +init([]) -> + Mod = gmhc_connector, + SupFlags = #{ strategy => simple_one_for_one + , intensity => 3 + , period => 10 }, + ChildSpecs = [ #{ id => Mod + , start => {Mod, start_link, []} + , type => worker + , restart => transient + , shutdown => 5000 + , modules => [Mod] } ], + {ok, {SupFlags, ChildSpecs}}. diff --git a/src/gmhc_counters.erl b/src/gmhc_counters.erl new file mode 100644 index 0000000..9855f20 --- /dev/null +++ b/src/gmhc_counters.erl @@ -0,0 +1,53 @@ +-module(gmhc_counters). + +-export([ initialize/0 ]). + +-export([ add/1 + , add/2 + , add_read/1 + , add_read/2 + , get_value/1 + , values/0 ]). + +counters() -> + #{ connector => 1 }. + +initialize() -> + Counters = counters(), + Size = map_size(Counters), + CRef = counters:new(Size, []), + put_counters(#{ref => CRef, counters => Counters}). + +add(Counter) -> + add(Counter, 1). + +add(Counter, Incr) when is_integer(Incr), Incr >= 0 -> + {CRef, Ix} = counter_ix(Counter), + counters:add(CRef, Ix, Incr). + +add_read(Counter) -> + add_read(Counter, 1). + +add_read(Counter, Incr) when is_integer(Incr), Incr >= 0 -> + {CRef, Ix} = counter_ix(Counter), + counters:add(CRef, Ix, Incr), + counters:get(CRef, Ix). + +get_value(Counter) -> + {CRef, Ix} = counter_ix(Counter), + counters:get(CRef, Ix). + +values() -> + #{ref := CRef, counters := Counters} = get_counters(), + maps:map(fun(_,Ix) -> counters:get(CRef, Ix) end, Counters). + + +counter_ix(Counter) -> + #{ref := CRef, counters := #{Counter := Ix}} = get_counters(), + {CRef, Ix}. + +put_counters(C) -> + persistent_term:put(?MODULE, C). + +get_counters() -> + persistent_term:get(?MODULE). diff --git a/src/gmhc_eureka.erl b/src/gmhc_eureka.erl new file mode 100644 index 0000000..090c661 --- /dev/null +++ b/src/gmhc_eureka.erl @@ -0,0 +1,145 @@ +-module(gmhc_eureka). + +-export([get_pool_address/0]). + +-include_lib("kernel/include/logger.hrl"). +-include("gmhc_events.hrl"). + +get_pool_address() -> + URL0 = gmhc_config:get_config([<<"pool_admin">>, <<"url">>]), + case expand_url(URL0) of + <<"local">> -> + #{<<"address">> => <<"127.0.0.1">>, + <<"port">> => gmconfig:get_config([<<"pool">>, <<"port">>], [schema_default]), + <<"pool_id">> => gmhc_config:get_config([<<"pool">>, <<"id">>]) }; + URL -> + ?LOG_INFO("Trying to connect to ~p", [URL]), + connect1(URL) + end. + +connect1(URL0) -> + URL = binary_to_list(URL0), + Res = request(get, URL), + ?LOG_DEBUG("Res = ~p", [Res]), + case Res of + {ok, Body} -> + try get_host_port(json:decode(iolist_to_binary(Body))) + catch + error:_ -> + gmhc_events:publish(error, ?ERR_EVT(#{error => invalid_json, + data => Body})), + {error, invalid_json} + end; + {error, _} = Error -> + gmhc_events:publish(error, ?ERR_EVT(#{error => connect_failure, + data => Error})), + Error + end. + +get_host_port(Data) -> + ?LOG_DEBUG("Data = ~p", [Data]), + maps:with([<<"address">>, <<"port">>, <<"pool_id">>], Data). + +request(get, URL) -> + case request(get, URL, []) of + {ok, #{body := Body}} -> + {ok, Body}; + Other -> + %% TODO: perhaps return a more informative reason? + gmhc_events:publish(error, ?ERR_EVT(#{error => get_failed, + url => URL, + data => Other })), + {error, failed} + end. + +request(get, URL, []) -> + Headers = [], + HttpOpts = [{timeout, 15000}], + Opts = [], + Profile = default, + request_result(httpc:request(get, {URL, Headers}, HttpOpts, Opts, Profile)); +request(post, URL, Body) -> + post_request(URL, Body). + +expand_url(URL) -> + case re:run(URL, <<"{[^}]+}">>, []) of + {match, _} -> + expand_vars(URL); + nomatch -> + URL + end. + +expand_vars(S) -> + expand_vars(S, <<>>). + +expand_vars(<<"{", Rest/binary>>, Acc) -> + {Var, Rest1} = get_var_name(Rest), + expand_vars(Rest1, <>); +expand_vars(<>, Acc) -> + expand_vars(T, <>); +expand_vars(<<>>, Acc) -> + Acc. + +expand_var(<<"CLIENT_ID">>) -> + gmhc_config:get_config([<<"pubkey">>]). + +get_var_name(S) -> + get_var_name(S, <<>>). + +get_var_name(<<"}", Rest/binary>>, Acc) -> + {Acc, Rest}; +get_var_name(<>, Acc) -> + get_var_name(T, <>). + +%% From hz.erl ========================================================== + +% This is Bitcoin's variable-length unsigned integer encoding +% See: https://en.bitcoin.it/wiki/Protocol_documentation#Variable_length_integer +%% vencode(N) when N =< 0 -> +%% {error, {non_pos_N, N}}; +%% vencode(N) when N < 16#FD -> +%% {ok, <>}; +%% vencode(N) when N =< 16#FFFF -> +%% NBytes = eu(N, 2), +%% {ok, <<16#FD, NBytes/binary>>}; +%% vencode(N) when N =< 16#FFFF_FFFF -> +%% NBytes = eu(N, 4), +%% {ok, <<16#FE, NBytes/binary>>}; +%% vencode(N) when N < (2 bsl 64) -> +%% NBytes = eu(N, 8), +%% {ok, <<16#FF, NBytes/binary>>}. + + +% eu = encode unsigned (little endian with a given byte width) +% means add zero bytes to the end as needed +%% eu(N, Size) -> +%% Bytes = binary:encode_unsigned(N, little), +%% NExtraZeros = Size - byte_size(Bytes), +%% ExtraZeros = << <<0>> || _ <- lists:seq(1, NExtraZeros) >>, +%% <>. + +%% ====================================================================== + +%% From gmplugin_web_demo_handler.erl =================================== + +post_request(URL, Map) -> + ?LOG_DEBUG("Map = ~p", [Map]), + Body = json:encode(Map), + ?LOG_DEBUG("Body = ~s", [Body]), + PostRes = httpc:request(post, {URL, [], "application/json", Body}, [], []), + request_result(PostRes). + +%% ====================================================================== + +request_result(Result) -> + ?LOG_DEBUG("Request result: ~p", [Result]), + request_result_(Result). + +request_result_({ok, {{_, C200, Ok}, _Hdrs, Body}}) when C200 >= 200, C200 < 300 -> + {ok, #{code => C200, msg => Ok, body => Body}}; +request_result_({ok, {{_, C200, Ok}, Body}}) when C200 >= 200, C200 < 300 -> + {ok, #{code => C200, msg => Ok, body => Body}}; +request_result_({ok, {{_, Code, Error}, _Hdrs, Body}}) -> + {error, #{code => Code, msg => Error, body => Body}}; +request_result_(_) -> + {error, #{code => 500, msg => <<"Internal error">>, body => <<>>}}. diff --git a/src/gmhc_events.erl b/src/gmhc_events.erl new file mode 100644 index 0000000..d2d1110 --- /dev/null +++ b/src/gmhc_events.erl @@ -0,0 +1,76 @@ +-module(gmhc_events). + +-export([subscribe/1, + ensure_subscribed/1, + unsubscribe/1, + ensure_unsubscribed/1, + publish/2]). + +-export([debug/0]). + +-export_type([event/0]). + +-type event() :: pool_notification + | {pool_notification, atom()} + | error + | connected + | disconnected. + +-spec publish(event(), any()) -> ok. +publish(Event, Info) -> + Data = #{sender => self(), + time => os:timestamp(), + info => Info}, + _ = gproc_ps:publish(l, Event, Data), + ok. + +-spec subscribe(event()) -> true. +subscribe(Event) -> + gproc_ps:subscribe(l, Event). + +-spec ensure_subscribed(event()) -> true. +%% @doc Subscribes to Event. Will not crash if called more than once. +ensure_subscribed(Event) -> + try subscribe(Event) + catch + error:badarg -> + %% This assertion should not be needed, since there is + %% no other scenario that would cause subscribe/1 to fail, + %% other than gproc not running (would also cause a badarg) + _ = gproc:get_value({p,l,{gproc_ps_event, Event}}, self()), + true + end. + +-spec unsubscribe(event()) -> true. +unsubscribe(Event) -> + gproc_ps:unsubscribe(l, Event). + +-spec ensure_unsubscribed(event()) -> true. +ensure_unsubscribed(Event) -> + case lists:member(self(), gproc_ps:list_subs(l,Event)) of + true -> + unsubscribe(Event); + false -> + true + end. + + +debug() -> + ok = application:ensure_started(gproc), + spawn(fun() -> + subscribe(pool_notification), + subscribe({pool_notification, new_generation}), + subscribe(connected), + subscribe(error), + subscribe(disconnected), + loop() + end). + +loop() -> + receive + stop -> ok; + {gproc_ps_event, E, Data} -> + io:fwrite("EVENT ~p: ~p~n", [E, Data]), + loop() + end. + diff --git a/src/gmhc_events.hrl b/src/gmhc_events.hrl new file mode 100644 index 0000000..bae0ff6 --- /dev/null +++ b/src/gmhc_events.hrl @@ -0,0 +1,2 @@ + +-define(ERR_EVT(Info), #{module => ?MODULE, line => ?LINE, info => Info}). diff --git a/src/gmhc_handler.erl b/src/gmhc_handler.erl new file mode 100644 index 0000000..9a58c75 --- /dev/null +++ b/src/gmhc_handler.erl @@ -0,0 +1,170 @@ +-module(gmhc_handler). +-behavior(gen_server). + +-export([ start_link/0 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([ call/1 + , notify/1 + , pool_connected/2 + , from_pool/1 ]). + +-record(pool, { id :: gmhc_connector:id() + , pid :: pid() | 'undefined' + , mref :: reference() + , connected = false :: boolean() + , keep = true :: boolean() + , host :: string() + , port :: pos_integer() + , opts = #{} :: map() }). + +-record(st, {pools = [], opts = #{}}). + +-define(CALL_TIMEOUT, 5000). + +-include_lib("kernel/include/logger.hrl"). + +call(Req) -> + try gen_server:call(?MODULE, {call, Req}, ?CALL_TIMEOUT) + catch + exit:Reason -> + {error, Reason} + end. + +notify(Msg) -> + gen_server:cast(?MODULE, {notify, Msg}). + +pool_connected(Id, Opts) -> + gen_server:cast(?MODULE, {pool_connected, Id, self(), Opts}). + +from_pool(Msg) -> + ToSend = {from_pool, Msg}, + ?MODULE ! ToSend. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #st{}}. + +handle_call({call, Req}, _From, #st{} = S) -> + {reply, call_connector(Req), S}; +handle_call(_Req, _From, S) -> + {reply, {error, unknown_method}, S}. + +handle_cast({pool_connected, Id, Pid, Opts}, #st{pools = Pools} = S) -> + MRef = erlang:monitor(process, Pid), + case lists:keyfind(Id, #pool.id, Pools) of + #pool{} = P -> + P1 = P#pool{ connected = true + , pid = Pid + , mref = MRef }, + Pools1 = lists:keyreplace(Id, #pool.id, Pools, P1), + {noreply, S#st{pools = Pools1}}; + false -> + P = #pool{ id = Id + , pid = Pid + , mref = MRef + , host = maps:get(host, Opts) + , port = maps:get(port, Opts) + , opts = Opts }, + {noreply, S#st{pools = [P | Pools]}} + end; +handle_cast({notify, Msg}, #st{} = S) -> + notify_connector(Msg), + {noreply, S}; +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info({from_pool, Msg}, S) -> + maybe_publish(Msg), + case Msg of + #{notification := #{new_server := #{ host := _ + , port := _ + , keep := _ } = Server}} -> + {noreply, start_pool_connector(Server, S)}; + _ -> + gmhc_server:from_pool(Msg), + {noreply, S} + end; +handle_info(Msg, S) -> + ?LOG_DEBUG("Unknown msg: ~p", [Msg]), + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_FromVsn, S, _Extra) -> + {ok, S}. + +maybe_publish(#{notification := Msg} = N) -> + Info = maybe_via(N, #{msg => Msg}), + gmhc_events:publish(pool_notification, Info), + if map_size(Msg) == 1 -> + [Tag] = maps:keys(Msg), + gmhc_events:publish({pool_notification, Tag}, Info); + true -> + ok + end; +maybe_publish(_) -> + ok. + +maybe_via(#{via := Via}, Info) -> + Info#{via => Via}. + +call_connector(Req0) -> + {ViaId, Req} = maps:take(via, Req0), + case gmhc_connector:whereis_id(ViaId) of + undefined -> + {error, no_connection}; + Pid when is_pid(Pid) -> + Id = erlang:unique_integer(), + MRef = erlang:monitor(process, Pid), + gmhc_connector:send(ViaId, #{call => Req#{ id => Id }}), + receive + {from_pool, #{reply := #{ id := Id, result := Result }}} -> + Result; + {from_pool, #{error := #{ id := Id } = Error}} -> + {error, maps:remove(id, Error)}; + {'DOWN', MRef, _, _, _} -> + {error, no_connection} + after 5000 -> + {error, {timeout, process_info(self(), messages)}} + end + end. + +notify_connector(Msg0) -> + {Via, Msg} = maps:take(via, Msg0), + gmhc_connector:send(Via, #{notification => #{msg => Msg}}). + +start_pool_connector(#{ host := Host + , port := Port + , keep := Keep }, #st{pools = Pools, opts = Opts} = S) -> + case [P || #pool{host = H1, port = P1} = P <- Pools, + H1 == Host, + P1 == Port] of + [] -> + case gmhive_client:connect(Opts#{ host => Host + , port => Port + , nowait => true }) of + {ok, CId} -> + Pid = gmhc_connector:whereis_id(CId), + MRef = erlang:monitor(process, Pid), + P = #pool{ id = CId, host = Host, port = Port, + keep = Keep, pid = Pid, mref = MRef }, + S#st{pools = [P | Pools]}; + {error, _} = Error -> + ?LOG_WARNING("Could not start connector to ~p:~p - ~p", + [Host, Port, Error]), + S + end; + [#pool{} = _P] -> + ?LOG_DEBUG("Already have a pool entry for ~p:~p", [Host, Port]), + S + end. diff --git a/src/gmhc_server.erl b/src/gmhc_server.erl new file mode 100644 index 0000000..e7ae439 --- /dev/null +++ b/src/gmhc_server.erl @@ -0,0 +1,392 @@ +-module(gmhc_server). + +-behaviour(gen_server). + +-export([ connected/2 + , disconnected/1 + , from_pool/1 + , new_candidate/1 + ]). + +-export([ total_nonces/0 ]). + +-export([ + start_link/0 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-include_lib("kernel/include/logger.hrl"). +-include("gmhc_events.hrl"). + +-record(worker, { config + , nonces = 0 + , index + , pid + , mref + , cand + , nonce + , errors = 0}). +-type worker() :: #worker{}. +-type type() :: monitor | worker. + +-record(st, { + connected = #{} :: #{non_neg_integer() => {pid(), type()}} + , working = false :: boolean() + , candidate :: map() | 'undefined' + , nonces = 1 :: pos_integer() + , workers = [] :: [worker()] + }). + +-define(CONNECTED(S), map_size(S#st.connected) > 0). + +-define(MAX_ERRORS, 5). + +connected(Id, Type) -> + gen_server:call(?MODULE, {connected, Id, Type}). + +disconnected(Id) -> + gen_server:cast(?MODULE, {disconnected, Id}). + +from_pool(Msg) -> + ToSend = {from_pool, Msg}, + %% ?LOG_DEBUG("Sending to server: ~p", [ToSend]), + gen_server:cast(?MODULE, ToSend). + +new_candidate(Cand) -> + gen_server:cast(?MODULE, {new_candidate, Cand}). + +total_nonces() -> + gen_server:call(?MODULE, total_nonces). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + WorkerConfigs = gmhc_workers:get_worker_configs(), + ?LOG_DEBUG("WorkerConfigs = ~p", [WorkerConfigs]), + %% IdleWorkers = [#worker{executable = E} || E <- Instances], + {IdleWorkers,_} = lists:mapfoldl( + fun(C, N) -> + NNonces = calc_nonces(C), + {#worker{index = N, config = C, nonces = NNonces}, N+1} + end, 1, WorkerConfigs), + TotalNonces = lists:foldl(fun(#worker{nonces = N}, Acc) -> + N + Acc + end, 0, IdleWorkers), + process_flag(trap_exit, true), + {ok, #st{workers = IdleWorkers, nonces = TotalNonces}}. + +handle_call(total_nonces, _From, #st{nonces = Nonces} = S) -> + {reply, Nonces, S}; +handle_call({connected, Id, Type}, {Pid,_}, #st{connected = Conn} = S) -> + ?LOG_DEBUG("connected: ~p, ~p", [Id, Type]), + erlang:monitor(process, Pid), + S1 = S#st{connected = Conn#{Id => {Pid, Type}}}, + S2 = case Type of + monitor -> + stop_workers(S1#st.workers), % shouldn't be any running + S1#st{workers = [], working = false}; + worker -> S1#st{working = true} + end, + {reply, ok, S2}; +handle_call(_Req, _From, S) -> + {reply, unknown_call, S}. + +handle_cast({from_pool, #{via := Connector, + notification := + #{candidate := Cand0}}}, + #st{workers = Workers} = S) -> + Cand = maps:put(via, Connector, decode_candidate_hash(Cand0)), + %% ?LOG_DEBUG("Got new candidate; will mine it: ~p", [Cand]), + %% For now, stop all workers, restart with new candidate + Workers1 = stop_workers(Workers), + {Workers2, Cand1} = assign_nonces(Workers1, Cand), + #st{candidate = Cand2} = S1 = maybe_request_nonces(S#st{candidate = Cand1}), + NewWorkers = [spawn_worker(W, Cand2) || W <- Workers2], + {noreply, S1#st{workers = NewWorkers}}; +handle_cast({disconnected, Id}, #st{connected = Conn} = S) -> + ?LOG_DEBUG("disconnected: ~p", [Id]), + Conn1 = maps:remove(Id, Conn), + S1 = if map_size(Conn1) == 0 -> + Ws = stop_workers(S#st.workers), + S#st{connected = Conn1, workers = Ws}; + true -> S#st{connected = Conn1} + end, + {noreply, S1}; +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info({'DOWN', MRef, process, Pid, Reason}, #st{ workers = Workers + , connected = Connected + , working = Working} = S) + when ?CONNECTED(S), Working -> + %% ?LOG_DEBUG("DOWN from ~p: ~p", [Pid, Reason]), + case lists:keyfind(Pid, #worker.pid, Workers) of + #worker{mref = MRef} = W -> + S1 = handle_worker_result(Reason, W, S), + {noreply, S1}; + false -> + Conn1 = maps:filter(fun(_, {P,_}) -> P =/= Pid end, Connected), + {noreply, S#st{connected = Conn1}} + end; +handle_info({'EXIT', Pid, Reason}, #st{ workers = Workers + , working = Working} = S) + when ?CONNECTED(S), Working -> + case lists:keyfind(Pid, #worker.pid, Workers) of + #worker{} = W -> + %% ?LOG_DEBUG("EXIT from worker ~p: ~p", [W#worker.index, Reason]), + gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error, + data => Reason})), + Ws1 = incr_worker_error(W, Workers), + {noreply, S#st{workers = Ws1}}; + false -> + %% ?LOG_DEBUG("EXIT apparently not from worker?? (~p)", [Pid]), + {noreply, S} + end; +handle_info(Msg, St) -> + ?LOG_DEBUG("Unknown msg: ~p", [Msg]), + {noreply, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_FromVsn, S, _Extra) -> + {ok, S}. + +report_solutions(Solutions, W, #st{} = S) when ?CONNECTED(S) -> + #{via := Via, seq := Seq} = W#worker.cand, + gmhc_handler:call( + #{via => Via, + solutions => #{ seq => Seq + , found => [#{ nonce => Nonce + , evidence => Evd } + || {Nonce, Evd} <- Solutions] }}). + +%% report_solution(Nonce, Solution, W, #st{connected = true}) -> +%% #{seq := Seq} = W#worker.cand, +%% gmhc_handler:call(#{solution => #{ seq => Seq +%% , nonce => Nonce +%% , evidence => Solution }}). + +report_no_solution(Nonce, W, #st{} = S) when ?CONNECTED(S) -> + #{via := Via, seq := Seq} = W#worker.cand, + %% ?LOG_DEBUG("report no_solution Seq = ~p, Nonce = ~p", [Seq, Nonce]), + gmhc_handler:call(#{via => Via, + no_solution => #{ seq => Seq + , nonce => Nonce}}). + +maybe_request_nonces(#st{ candidate = #{via := Via, seq := Seq, nonces := Nonces} + , nonces = N} = S) when ?CONNECTED(S) -> + case Nonces == [] of + true -> + %% ?LOG_DEBUG("Request more nonces, Seq = ~p, N = ~p", [Seq, N]), + Res = gmhc_handler:call(#{via => Via, + get_nonces => #{ seq => Seq + , n => N }}), + nonces_result(Res, Seq, S); + false -> + S + end; +maybe_request_nonces(S) -> + S. + +nonces_result(#{nonces := #{seq := Seq, nonces := Nonces}}, Seq0, S) -> + case Seq == Seq0 of + true -> + %% ?LOG_DEBUG("Got nonces = ~p", [Nonces]), + #st{candidate = Cand} = S, + S#st{candidate = Cand#{nonces => Nonces}}; + false -> + ?LOG_DEBUG("Seq mismatch - wtf?? ~p - ~p", [Seq, Seq0]), + S + end; +nonces_result({error, #{message := <<"outdated">>}}, Seq0, S) -> + Workers = stop_workers_for_seq(Seq0, S#st.workers), + S#st{workers = Workers}. + +handle_worker_result({worker_result, Result}, W, S) -> + %% ?LOG_DEBUG("worker result: ~p", [Result]), + case Result of + {solutions, Solutions} -> + {Cont, S1} = report_solutions_(Solutions, W, S), + maybe_continue(Cont, W, S1); + {solution, Nonce, Solution} -> + %% report_solution(Nonce, Solution, W, S), + {Cont, S1} = report_solutions_([{Nonce, Solution}], W, S), + maybe_continue(Cont, W, S1); + {no_solution, Nonce} -> + report_no_solution(Nonce, W, S), + maybe_restart_worker(W, S); + {error, S} -> + ?LOG_DEBUG("Worker ~p reported error as normal", [W#worker.index]), + gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error, + data => Result})), + Ws = incr_worker_error(W, S#st.workers), + S#st{workers = Ws} + end; +handle_worker_result(Error, W, S) -> + ?LOG_DEBUG("Got worker error from ~p: ~p", [W#worker.index, Error]), + gmhc_events:publish(error, ?ERR_EVT(#{error => worker_error, + data => Error})), + Ws = incr_worker_error(W, S#st.workers), + S#st{workers = Ws}. + +report_solutions_(Solutions, W, S) -> + case report_solutions(Solutions, W, S) of + ok -> + Ws = reset_worker(W, S#st.workers), + Ws1 = stop_workers(Ws), + {stopped, S#st{workers = Ws1}}; + continue -> + {continue, S}; + {error, _} -> + {error, S} + end. + +reset_worker(#worker{index = I} = W, Ws) -> + W1 = reset_worker_(W), + lists:keyreplace(I, #worker.index, Ws, W1). + +reset_worker_(W) -> + %% ?LOG_DEBUG("reset_worker ~p", [W#worker.index]), + W#worker{pid = undefined, mref = undefined, + nonce = undefined, cand = undefined}. + +incr_worker_error(#worker{errors = Es, index = I} = W, Ws) -> + %% ?LOG_DEBUG("Increment worker (~p) error count: ~p", [I, Es+1]), + W1 = reset_worker_(W#worker{errors = Es+1}), + lists:keyreplace(I, #worker.index, Ws, W1). + +maybe_continue(stopped, _, S) -> + S; +maybe_continue(continue, W, S) -> + maybe_restart_worker(W, S); +maybe_continue(error, W, S) -> + ?LOG_INFO("Won't restart worker ~p due to error", [W#worker.index]), + S. + +maybe_restart_worker(#worker{index = I} = W, #st{candidate = C} = S) -> + case maps:get(nonces, C) of + [] -> + %% Waiting for nonces + Ws = reset_worker(W, S#st.workers), + S#st{workers = Ws}; + Ns -> + {Nonce, Ns1} = pick_nonce(W#worker.nonces, Ns), + %% ?LOG_DEBUG("restart worker ~p with nonce ~p", [I, Nonce]), + W1 = reset_worker_(W), + W2 = spawn_worker(W1#worker{nonce = Nonce}, C), + Ws = lists:keyreplace(I, #worker.index, S#st.workers, W2), + S1 = S#st{candidate = C#{nonces => Ns1}, workers = Ws}, + maybe_request_nonces(S1) + end. + +%% In a Gajumaru node, a typical worker config might look like this: +%% "cuckoo": { +%% "edge_bits": 29, +%% "miners": [{"executable": "mean29-avx2"}, +%% {"executable": "lean29-avx2"}, +%% {"executable": "lean29-avx2"}, +%% {"executable": "lean29-avx2"}] +%% } +stop_workers(Workers) -> + [stop_worker(W) || W <- Workers]. + +stop_workers_for_seq(Seq, Workers) -> + [stop_worker(W) || #worker{cand = #{seq := Seq1}} = W <- Workers, + Seq1 =:= Seq]. + +stop_worker(#worker{pid = Pid, mref = MRef} = W) when is_pid(Pid) -> + exit(Pid, kill), + receive + {'EXIT', Pid, _} -> ok; + {'DOWN', MRef, process, Pid, _} -> ok + end, + W#worker{pid = undefined, mref = undefined, nonce = undefined}; +stop_worker(W) -> + W. + +assign_nonces(Ws, #{nonces := Ns} = C) -> + {Ws1, Nonces1} = assign_nonces_(Ws, Ns, []), + {Ws1, C#{nonces => Nonces1}}. + +assign_nonces_([W | Ws], [], Acc) -> + assign_nonces_(Ws, [], [W#worker{nonce = undefined}|Acc]); +assign_nonces_([#worker{nonces = N} = W | Ws], Ns, Acc) -> + {Nonce, Ns1} = pick_nonce(N, Ns), + assign_nonces_(Ws, Ns1, [W#worker{nonce = Nonce}|Acc]); +assign_nonces_([], Ns, Acc) -> + {lists:reverse(Acc), Ns}. + +-spec calc_nonces(gmhw_pow_cuckoo:config()) -> non_neg_integer(). +calc_nonces(Cfg) -> + NInstances = case gmhw_pow_cuckoo:addressed_instances(Cfg) of + undefined -> 1; + L -> length(L) + end, + Repeats = gmhw_pow_cuckoo:repeats(Cfg), + Repeats * NInstances. + +pick_nonce(_, [A, A]) -> + %% ?LOG_DEBUG("last nonce (~p)", [A]), + {A, []}; +pick_nonce(N, [A, B]) when A < B -> + A1 = A + N, + New = if A1 > B -> []; + true -> [A1, B] + end, + %% ?LOG_DEBUG("Remanining nonces: ~p", [New]), + {A, New}. + +%% Dialyzer doesn't like that the fun passed to spawn_link/1 +%% doesn't have a local return (it communicates its result via the exit reason). +-dialyzer({no_return, spawn_worker/2}). + +spawn_worker(#worker{nonce = undefined} = W, _) -> + W; +spawn_worker(#worker{errors = Es} = W, _) when Es >= ?MAX_ERRORS -> + ?LOG_DEBUG("Won't start worker - reached max error count: ~p", [W]), + W; +spawn_worker(#worker{pid = undefined, nonce = Nonce, config = Cfg} = W, Cand) -> + Me = self(), + #{candidate := Data, target := Target, edge_bits := EdgeBits} = Cand, + Pid = spawn_link( + fun() -> + Cfg1 = gmhw_pow_cuckoo:set_edge_bits(EdgeBits, Cfg), + init_worker(Data, Nonce, Target, Cfg1, Me) + end), + MRef = erlang:monitor(process, Pid), + W#worker{pid = Pid, mref = MRef, cand = Cand, nonce = Nonce}. + +-spec init_worker(binary(), integer(), integer(), tuple(), pid()) -> no_return(). +init_worker(Data, Nonce, Target, Config, Parent) -> + Res = gmhc_workers:generate_from_hash(Data, Target, Nonce, Config, undefined), + %% ?LOG_DEBUG("worker result: ~p", [Res]), + case Res of + {ok, Solutions} when is_list(Solutions) -> + worker_result(Parent, {solutions, Solutions}); + %% {ok, {Nonce1, Solution}} -> + %% worker_result(Parent, {solution, Nonce1, Solution}); + {error, no_solution} -> + %% TODO: If we are using repeats, then we might report + %% no_solution for each nonce tried. + worker_result(Parent, {no_solution, Nonce}); + {error, Other} -> + ?LOG_ERROR("Bad worker! {error, ~p}", [Other]), + gmhc_events:publish(error, ?ERR_EVT(#{error => cannot_start_worker, + data => {error, Other}})), + exit(Other) + end. + +worker_result(Pid, Result) -> + unlink(Pid), + exit({worker_result, Result}). + +decode_candidate_hash(#{candidate := C} = Cand) -> + {ok, Hash} = gmser_api_encoder:safe_decode(bytearray, C), + Cand#{candidate := Hash}. diff --git a/src/gmhc_sup.erl b/src/gmhc_sup.erl new file mode 100644 index 0000000..3a20791 --- /dev/null +++ b/src/gmhc_sup.erl @@ -0,0 +1,37 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- +-module(gmhc_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ChildSpecs = [ worker(gmhc_server) + , worker(gmhc_handler) + , supervisor(gmhc_connectors_sup) ], + SupFlags = #{ strategy => one_for_one + , intensity => 1 + , period => 5 + , auto_shutdown => never }, + {ok, {SupFlags, ChildSpecs}}. + + +worker (Mod) -> child(Mod, worker). +supervisor(Mod) -> child(Mod, supervisor). + +child(Mod, Type) -> + #{ id => Mod + , type => Type + , start => {Mod, start_link, []} + , restart => permanent + , shutdown => 5000 + , significant => false + , modules => [Mod] }. diff --git a/src/gmhc_workers.erl b/src/gmhc_workers.erl new file mode 100644 index 0000000..2d2eaff --- /dev/null +++ b/src/gmhc_workers.erl @@ -0,0 +1,74 @@ +%%%------------------------------------------------------------------- +%%% @copyright (C) 2025, QPQ AG +%%% @copyright (C) 2017, Aeternity Anstalt +%%% +%%% @doc Hive worker configuration logic, based on aec_mining.erl +%%% in the Gajumaru platform (https://git.qpq.swiss/QPQ-AG/gajumaru) +%%% @end +%%%------------------------------------------------------------------- + +-module(gmhc_workers). + +-export([ + get_worker_configs/0 + , generate_from_hash/5 + ]). + +-include_lib("kernel/include/logger.hrl"). + +-define(DEFAULT_EXECUTABLE_GROUP , <<"gajumine">>). +-define(DEFAULT_EXTRA_ARGS , <<>>). +-define(DEFAULT_HEX_ENCODED_HEADER , false). +-define(DEFAULT_REPEATS , 1). +-define(DEFAULT_EDGE_BITS , 29). + +%%------------------------------------------------------------------------------ +%% Read and parse worker configs. +%% +%% Workers defined in gajumaru.{json,yaml} user config file take precedence. +%% If there are no workers defined in the user config, sys.config cuckoo +%% workers are read. If there are neither user config nor sys.config workers +%% ?DEFAULT_CUCKOO_ENV is used as the last resort option (i.e. mean29-generic +%% without any extra args). +%%------------------------------------------------------------------------------ +-spec get_worker_configs() -> [gmhw_pow_cuckoo:config()]. +get_worker_configs() -> + ConfigMaps = worker_config_map(), + ?LOG_DEBUG("ConfigMaps = ~p", [ConfigMaps]), + lists:foldl( + fun(Cfg, Configs) -> + [build_worker_config(Cfg) | Configs] + end, [], ConfigMaps). + +-spec generate_from_hash(gmhw_pow_cuckoo:hash(), gmhw_pow:sci_target(), + gmhw_pow:nonce(), gmhw_pow_cuckoo:config(), + gmhw_pow:instance() | undefined) -> + {ok, [{gmhw_pow:nonce(), gmhw_pow_cuckoo:solution()}]} | {error, term()}. +generate_from_hash(Hash, Target, Nonce, Config, WorkerInstance) -> + gmhw_pow_cuckoo:generate_from_hash(Hash, Target, Nonce, Config, WorkerInstance, true). + +%% Internal functions. + +%%------------------------------------------------------------------------------ +%% Config handling +%%------------------------------------------------------------------------------ + +build_worker_config(Config) when is_map(Config) -> + Exec = maps:get(<<"executable">>, Config), + ExecGroup = maps:get(<<"executable_group">>, Config, ?DEFAULT_EXECUTABLE_GROUP), + ExtraArgs = maps:get(<<"extra_args">>, Config, ?DEFAULT_EXTRA_ARGS), + HexEncHdr = maps:get(<<"hex_encoded_header">>, Config, + hex_encoding_default(ExecGroup, Exec)), + Repeats = maps:get(<<"repeats">>, Config, ?DEFAULT_REPEATS), + Instances = maps:get(<<"addressed_instances">>, Config, undefined), + EdgeBits = ?DEFAULT_EDGE_BITS, + gmhw_pow_cuckoo:config(Exec, ExecGroup, ExtraArgs, HexEncHdr, Repeats, EdgeBits, Instances); +build_worker_config({Exec, ExtraArgs, HexEncHdr, Repeats, Instances, ExecGroup}) -> + EdgeBits = ?DEFAULT_EDGE_BITS, + gmhw_pow_cuckoo:config(Exec, ExecGroup, ExtraArgs, HexEncHdr, Repeats, EdgeBits, Instances). + +worker_config_map() -> + gmhc_config:get_config([<<"workers">>]). + +hex_encoding_default(_, <<"cuda29">>) -> true; +hex_encoding_default(_, _) -> ?DEFAULT_HEX_ENCODED_HEADER. diff --git a/src/gmhive_client.app.src b/src/gmhive_client.app.src new file mode 100644 index 0000000..7a66063 --- /dev/null +++ b/src/gmhive_client.app.src @@ -0,0 +1,26 @@ +%% -*- mode: erlang; erlang-indent-level: 4; indent-tabs-mode: nil -*- +{application, gmhive_client, + [{description, "Gajumaru Hive Client"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, + [ + kernel + , stdlib + , sasl + , gproc + , inets + , ssl + , enoise + , gmconfig + , gmhive_protocol + , gmhive_worker + ]}, + {mod, {gmhc_app, []}}, + {start_phases, [ {connect_to_primary, []} ]}, + {env, []}, + {modules, []}, + {maintainers, ["QPQ IaaS AG"]}, + {licensens, ["ISC"]}, + {links, [{"gitea", "https://git.qpq.swiss/gmhive_client"}]} + ]}. diff --git a/src/gmhive_client.erl b/src/gmhive_client.erl new file mode 100644 index 0000000..765ebd5 --- /dev/null +++ b/src/gmhive_client.erl @@ -0,0 +1,36 @@ +-module(gmhive_client). + +-export([ connect/1 + , disconnect/1 + , status/0 + , status/1 ]). + +-type connect_opts() :: gmhc_connector:connect_opts(). +-type id() :: non_neg_integer(). +-type status() :: connected | disconnected. + +-export_type([ connect_opts/0 ]). + + +-spec connect(connect_opts()) -> {ok, id()} | {error, any()}. +connect(Opts) when is_map(Opts) -> + Id = gmhc_counters:add_read(connector), + case gmhc_connectors_sup:start_connector(Opts#{id => Id}) of + {ok, _Pid} -> + {ok, Id}; + {error, _} = Error -> + Error + end. + %% gmhc_connector:connect(Opts). + +-spec disconnect(id()) -> ok. +disconnect(Id) -> + gmhc_connector:disconnect(Id). + +-spec status() -> [{id(), status()}]. +status() -> + gmhc_connector:status(). + +-spec status(id()) -> status(). +status(Id) -> + gmhc_connector:status(Id). diff --git a/zomp.ignore b/zomp.ignore new file mode 100644 index 0000000..0f9ec96 --- /dev/null +++ b/zomp.ignore @@ -0,0 +1,4 @@ +_build +_checkouts +*config.eterm +rebar.* \ No newline at end of file diff --git a/zomp.meta b/zomp.meta new file mode 100644 index 0000000..4d11000 --- /dev/null +++ b/zomp.meta @@ -0,0 +1,17 @@ +{name,"gmhive_client"}. +{type,app}. +{modules,[]}. +{prefix,"gmhc"}. +{author,"Ulf Wiger, QPQ AG"}. +{desc,"Gajumaru Hive Client"}. +{package_id,{"uwiger","gmhive_client",{0,1,0}}}. +{deps,[]}. +{key_name,none}. +{a_email,"ulf@wiger.net"}. +{c_email,"ulf@wiger.net"}. +{copyright,"Ulf Wiger, QPQ AG"}. +{file_exts,[]}. +{license,"LGPL-3.0-or-later"}. +{repo_url,"https://git.qpq.swiss/QPQ-AG/gmhive_client"}. +{tags,[]}. +{ws_url,[]}. diff --git a/zompify.sh b/zompify.sh new file mode 100755 index 0000000..66aca1c --- /dev/null +++ b/zompify.sh @@ -0,0 +1,42 @@ +#!/bin/sh +set -e + +APP=$(basename "$PWD") + +SRC="_build/default/lib/$APP" +DST="$PWD/_build/zomp/lib/$APP" +IGNORE_FILE="zomp.ignore" + +mkdir -p "$DST" + +# Remove broken symlinks +find "$SRC" -type l ! -exec test -e {} \; -delete || true + +# Build ignore matcher +IGNORE_TEMP=$(mktemp) +trap "rm -f $IGNORE_TEMP" EXIT + +# Expand globs in zomp.ignore to patterns suitable for grep +if [ -e "$IGNORE_FILE" ]; then + grep -v '^\s*#' "$IGNORE_FILE" | sed 's#/#\\/#g' | sed 's/\./\\./g' | sed 's/\*/.*/g' > "$IGNORE_TEMP" +fi + +# Copy Git-tracked and Zomp-allowed files +git ls-files -z | while IFS= read -r -d '' file; do + # Skip if ignored + echo "$file" | grep -Eq -f "$IGNORE_TEMP" && continue + # Only copy if file exists in the build dir + if [ -e "$SRC/$file" ]; then + mkdir -p "$DST/$(dirname "$file")" + cp -a "$SRC/$file" "$DST/$file" + fi +done + +rm "$IGNORE_TEMP" + +# Copy metadata +cp "$PWD/zomp.meta" "$DST/" +cp "$PWD/Emakefile" "$DST/" + +# Clean up beam files just in case +[ -d "$DST/ebin" ] && find "$DST/ebin" -name '*.beam' -exec rm -f {} + || true