diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index bfe6af5f8..af9042698 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -20,6 +20,8 @@ jobs: - run: docker build -f docker/Dockerfile --build-arg SOLANA_VERSION=${{ env.SOLANA_VERSION }} --tag ${{ env.DOCKER_IMAGE }} . # publish to docker.io - run: echo "${{ secrets.DOCKER_IO_PASS }}" | docker login docker.io -u ${{ secrets.DOCKER_IO_USER }} --password-stdin + if: startsWith( github.ref, 'refs/tags/' ) - run: docker image tag ${DOCKER_IMAGE} docker.io/pythfoundation/pyth-client:${DOCKER_TAG} + if: startsWith( github.ref, 'refs/tags/' ) - run: docker image push docker.io/pythfoundation/pyth-client:${DOCKER_TAG} if: startsWith( github.ref, 'refs/tags/' ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1090c2aa3..16eb20a85 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,7 +81,7 @@ target_link_libraries( pyth_tx ${PC_DEP} ) # install( TARGETS pc DESTINATION lib ) -install( TARGETS pyth pythd pyth_csv DESTINATION bin ) +install( TARGETS pyth pythd pyth_csv pyth_tx DESTINATION bin ) install( FILES ${PC_HDR} DESTINATION include/pc ) install( FILES program/src/oracle/oracle.h DESTINATION include/oracle ) diff --git a/docker/Dockerfile b/docker/Dockerfile index 1cf7e8f49..3548ddaea 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -3,7 +3,10 @@ ARG SOLANA_VERSION FROM solanalabs/solana:v${SOLANA_VERSION} RUN apt-get update -RUN apt-get install -y cmake curl g++ git libzstd1 libzstd-dev zlib1g zlib1g-dev +RUN apt-get install -y cmake curl g++ git libzstd1 libzstd-dev sudo zlib1g zlib1g-dev + +# Grant sudo access to pyth user +RUN echo "pyth ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers RUN useradd -m pyth USER pyth diff --git a/pc/manager.cpp b/pc/manager.cpp index 2b9b308e3..09748ee31 100644 --- a/pc/manager.cpp +++ b/pc/manager.cpp @@ -62,7 +62,8 @@ manager::manager() do_cap_( false ), do_tx_( true ), is_pub_( false ), - cmt_( commitment::e_confirmed ) + cmt_( commitment::e_confirmed ), + sreq_{ { commitment::e_processed } } { tconn_.set_sub( this ); breq_->set_sub( this ); @@ -433,6 +434,15 @@ void manager::poll( bool do_wait ) // get current time curr_ts_ = get_now(); + // get current slot + if ( curr_ts_ - slot_ts_ > 200 * PC_NSECS_IN_MSEC ) { + if ( sreq_->get_is_recv() ) { + if ( has_status( PC_PYTH_RPC_CONNECTED ) ) { + clnt_.send( sreq_ ); + } + } + } + // try to (re)connect to tx proxy if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) { tconn_.reconnect(); @@ -489,8 +499,9 @@ void manager::reconnect_rpc() kidx_ = 0; ctimeout_ = PC_NSECS_IN_SEC; pub_ts_ = 0L; - slot_cnt_ = 0UL; slot_ = 0L; + slot_cnt_ = 0UL; + slot_ts_ = 0L; num_sub_ = 0; clnt_.reset(); for(;;) { @@ -507,9 +518,15 @@ void manager::reconnect_rpc() clnt_.send( sreq_ ); // subscribe to program updates - preq_->set_commitment( get_commitment() ); - preq_->set_program( get_program_pub_key() ); - clnt_.send( preq_ ); + pub_key *gpub = get_program_pub_key(); + if ( !gpub ) { + set_err_msg( "missing or invalid program public key [" + + get_program_pub_key_file() + "]" ); + } else { + preq_->set_commitment( get_commitment() ); + preq_->set_program( gpub ); + clnt_.send( preq_ ); + } // gather latest info on mapping accounts for( get_mapping *mptr: mvec_ ) { @@ -634,23 +651,30 @@ void manager::schedule( price_sched *kptr ) } } -void manager::on_response( rpc::slot_subscribe *res ) +void manager::on_response( rpc::get_slot *res ) { // check error if ( PC_UNLIKELY( res->get_is_err() ) ) { - set_err_msg( "failed to slot_subscribe [" + set_err_msg( "failed to get slot [" + res->get_err_msg() + "]" ); return; } // ignore slots that go back in time - uint64_t slot = res->get_slot(); + uint64_t slot = res->get_current_slot(); int64_t ts = res->get_recv_time(); if ( slot <= slot_ ) { return; } slot_ = slot; - PC_LOG_DBG( "receive slot" ).add( "slot", slot_ ).end(); + slot_ts_ = ts; + + int64_t ack_ts = res->get_recv_time() - res->get_sent_time(); + + PC_LOG_DBG( "received get_slot" ) + .add( "slot", slot_ ) + .add( "rount_trip_time(ms)", 1e-6*ack_ts ) + .end(); // submit block hash every N slots if ( slot_cnt_++ % PC_BLOCKHASH_TIMEOUT == 0 ) { diff --git a/pc/manager.hpp b/pc/manager.hpp index 3244c4867..d11b6660e 100644 --- a/pc/manager.hpp +++ b/pc/manager.hpp @@ -48,7 +48,7 @@ namespace pc public net_accept, public tx_sub, public rpc_sub, - public rpc_sub_i, + public rpc_sub_i, public rpc_sub_i, public rpc_sub_i { @@ -164,7 +164,7 @@ namespace pc bool get_is_tx_send() const; // rpc callbacks - void on_response( rpc::slot_subscribe * ) override; + void on_response( rpc::get_slot * ) override; void on_response( rpc::get_recent_block_hash * ) override; void on_response( rpc::program_subscribe * ) override; void set_status( int ); @@ -228,6 +228,7 @@ namespace pc int64_t ctimeout_; // connection timeout uint64_t slot_; // current slot uint64_t slot_cnt_; // slot count + int64_t slot_ts_; // current slot time int64_t curr_ts_; // current time int64_t pub_ts_; // start publish time int64_t pub_int_; // publish interval @@ -241,7 +242,7 @@ namespace pc commitment cmt_; // account get/subscribe commitment // requests - rpc::slot_subscribe sreq_[1]; // slot subscription + rpc::get_slot sreq_[1]; // slot subscription rpc::get_recent_block_hash breq_[1]; // block hash request rpc::program_subscribe preq_[1]; // program account subscription }; diff --git a/pc/request.cpp b/pc/request.cpp index fbd39be9d..88af42072 100644 --- a/pc/request.cpp +++ b/pc/request.cpp @@ -2145,10 +2145,12 @@ void price::update_pub() // update publishing index pub_idx_ = (unsigned)-1; pub_key *pkey = get_manager()->get_publish_pub_key(); - for( unsigned i=0; i != pptr_->num_; ++i ) { - if ( pc_pub_key_equal( &pptr_->comp_[i].pub_, (pc_pub_key_t*)pkey ) ) { - pub_idx_ = i; - break; + if ( pkey ) { + for( unsigned i=0; i != pptr_->num_; ++i ) { + if ( pc_pub_key_equal( &pptr_->comp_[i].pub_, (pc_pub_key_t*)pkey ) ) { + pub_idx_ = i; + break; + } } } } diff --git a/pc/rpc_client.cpp b/pc/rpc_client.cpp index bf850fb96..7d55e3e6b 100644 --- a/pc/rpc_client.cpp +++ b/pc/rpc_client.cpp @@ -637,6 +637,38 @@ void rpc::get_cluster_nodes::response( const jtree& jt ) on_response( this ); } +/////////////////////////////////////////////////////////////////////////// +// get_slot + +rpc::get_slot::get_slot( commitment const cmt ) +: cmt_{ cmt } +, cslot_( 0UL ) +{ +} + +uint64_t rpc::get_slot::get_current_slot() const +{ + return cslot_; +} + +void rpc::get_slot::request( json_wtr& msg ) +{ + msg.add_key( "method", "getSlot" ); + msg.add_key( "params", json_wtr::e_arr ); + msg.add_val( json_wtr::e_obj ); + msg.add_key( "commitment", commitment_to_str( cmt_ ) ); + msg.pop(); + msg.pop(); +} + +void rpc::get_slot::response( const jtree& jt ) +{ + if ( on_error( jt, this ) ) return; + uint32_t rtok = jt.find_val( 1, "result" ); + cslot_ = jt.get_uint( rtok ); + on_response( this ); +} + /////////////////////////////////////////////////////////////////////////// // get_slot_leaders diff --git a/pc/rpc_client.hpp b/pc/rpc_client.hpp index b14f9f820..86b29c470 100644 --- a/pc/rpc_client.hpp +++ b/pc/rpc_client.hpp @@ -366,6 +366,19 @@ namespace pc node_map_t nmap_; }; + // get current slot + class get_slot : public rpc_request + { + public: + get_slot( commitment = e_finalized ); + uint64_t get_current_slot() const; + void request( json_wtr& ) override; + void response( const jtree& ) override; + private: + commitment const cmt_; // param + uint64_t cslot_; // result + }; + // get id of leader node by slot class get_slot_leaders : public rpc_request { @@ -376,7 +389,7 @@ namespace pc pub_key *get_leader( uint64_t ); uint64_t get_last_slot() const; void request( json_wtr& ) override; - void response( const jtree&p) override; + void response( const jtree& ) override; private: typedef std::vector ldr_vec_t; uint64_t rslot_; diff --git a/pcapps/pyth.cpp b/pcapps/pyth.cpp index 8b391886b..15e36e00a 100644 --- a/pcapps/pyth.cpp +++ b/pcapps/pyth.cpp @@ -55,6 +55,7 @@ int usage() std::cerr << " get_block [options]" << std::endl; std::cerr << " get_product [options]" << std::endl; std::cerr << " get_product_list [options]" << std::endl; + std::cerr << " get_all_products [options]" << std::endl; std::cerr << " get_pub_key " << std::endl; std::cerr << " version" << std::endl; std::cerr << std::endl; @@ -74,6 +75,8 @@ int usage() std::cerr << " -j\n" << " Output results in json format where applicable\n" << std::endl; + std::cerr << " -d" << std::endl; + std::cerr << " Turn on debug logging\n" << std::endl; return 1; } @@ -311,12 +314,13 @@ int on_get_balance( int argc, char **argv ) commitment cmt = commitment::e_confirmed; std::string rpc_host = get_rpc_host(); std::string key_dir = get_key_store(); - while( (opt = ::getopt(argc,argv, "r:k:p:c:h" )) != -1 ) { + while( (opt = ::getopt(argc,argv, "r:k:p:c:dh" )) != -1 ) { switch(opt) { case 'r': rpc_host = optarg; break; case 'k': key_dir = optarg; break; case 'p': knm = optarg; break; case 'c': cmt = str_to_commitment(optarg); break; + case 'd': log::set_level( PC_LOG_DBG_LVL ); break; default: return usage(); } } @@ -1011,6 +1015,11 @@ int on_get_product_list( int argc, char **argv ) std::cerr << "pyth: " << mgr.get_err_msg() << std::endl; return 1; } + if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) { + std::cerr << "pyth: mapping not ready, check mapping key [" + << mgr.get_mapping_pub_key_file() << "]" << std::endl; + return 1; + } // list key/symbol pairs if ( !do_json ) { std::string astr; @@ -1203,6 +1212,11 @@ int on_get_product( int argc, char **argv ) std::cerr << "pyth: " << mgr.get_err_msg() << std::endl; return 1; } + if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) { + std::cerr << "pyth: mapping not ready, check mapping key [" + << mgr.get_mapping_pub_key_file() << "]" << std::endl; + return 1; + } // get product and serialize to stdout product *prod = mgr.get_product( pub ); if ( !prod ) { @@ -1217,6 +1231,68 @@ int on_get_product( int argc, char **argv ) return 0; } +int on_get_all_products( int argc, char **argv ) +{ + int opt = 0; + bool do_json = false; + commitment cmt = commitment::e_confirmed; + std::string rpc_host = get_rpc_host(); + std::string key_dir = get_key_store(); + while( (opt = ::getopt(argc,argv, "r:k:c:djh" )) != -1 ) { + switch(opt) { + case 'r': rpc_host = optarg; break; + case 'k': key_dir = optarg; break; + case 'd': log::set_level( PC_LOG_DBG_LVL ); break; + case 'j': do_json = true; break; + case 'c': cmt = str_to_commitment(optarg); break; + default: return usage(); + } + } + if ( cmt == commitment::e_unknown ) { + std::cerr << "pyth: unknown commitment level" << std::endl; + return usage(); + } + + // initialize connection to block-chain + manager mgr; + mgr.set_rpc_host( rpc_host ); + mgr.set_dir( key_dir ); + mgr.set_do_tx( false ); + mgr.set_commitment( cmt ); + if ( !mgr.init() || !mgr.bootstrap() ) { + std::cerr << "pyth: " << mgr.get_err_msg() << std::endl; + return 1; + } + if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) { + std::cerr << "pyth: mapping not ready, check mapping key [" + << mgr.get_mapping_pub_key_file() << "]" << std::endl; + return 1; + } + + // get all products and serialize to stdout + if ( !do_json ) { + for (unsigned i=0; i != mgr.get_num_product(); ++i ) { + product *prod = mgr.get_product(i); + print_product( prod ); + std::cout << std::endl; + } + } else { + std::cout << "["; + bool first = true; + for (unsigned i=0; i != mgr.get_num_product(); ++i ) { + product *prod = mgr.get_product(i); + if ( !first ) { + std::cout << ","; + } + print_product_json( prod ); + first = false; + } + std::cout << "]"; + } + + return 0; +} + class get_block_print : public get_block { public: @@ -1434,6 +1510,8 @@ int main(int argc, char **argv) rc = on_get_product( argc, argv ); } else if ( cmd == "get_product_list" ) { rc = on_get_product_list( argc, argv ); + } else if ( cmd == "get_all_products" ) { + rc = on_get_all_products( argc, argv ); } else if ( cmd == "get_block" ) { rc = on_get_block( argc, argv ); } else if ( cmd == "version" ) { diff --git a/pctest/slots_info.cpp b/pctest/slots_info.cpp index 8bf17e93b..1060cb504 100644 --- a/pctest/slots_info.cpp +++ b/pctest/slots_info.cpp @@ -13,7 +13,7 @@ class manager_slot : public manager, { public: manager_slot(); - void on_response( rpc::slot_subscribe * ) override; + void on_response( rpc::get_slot * ) override; void on_response( rpc::get_slot_leaders * ) override; private: rpc::get_slot_leaders ldr_[1]; @@ -27,10 +27,10 @@ manager_slot::manager_slot() ldr_->set_limit( PC_LEADER_MAX ); } -void manager_slot::on_response( rpc::slot_subscribe *res ) +void manager_slot::on_response( rpc::get_slot *res ) { manager::on_response( res ); - uint64_t slot = get_slot(); + uint64_t const slot = get_slot(); if ( slot != last_ ) { // request next slot leader schedule if ( PC_UNLIKELY( ldr_->get_is_recv() &&