Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into devnet
Browse files Browse the repository at this point in the history
  • Loading branch information
dragonfrond committed Aug 12, 2021
2 parents fe43353 + b958865 commit 521c617
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
- run: docker build -f docker/Dockerfile --build-arg SOLANA_VERSION=${{ env.SOLANA_VERSION }} --tag ${{ env.DOCKER_IMAGE }} .
# publish to docker.io
- run: echo "${{ secrets.DOCKER_IO_PASS }}" | docker login docker.io -u ${{ secrets.DOCKER_IO_USER }} --password-stdin
if: startsWith( github.ref, 'refs/tags/' )
- run: docker image tag ${DOCKER_IMAGE} docker.io/pythfoundation/pyth-client:${DOCKER_TAG}
if: startsWith( github.ref, 'refs/tags/' )
- run: docker image push docker.io/pythfoundation/pyth-client:${DOCKER_TAG}
if: startsWith( github.ref, 'refs/tags/' )
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ target_link_libraries( pyth_tx ${PC_DEP} )
#

install( TARGETS pc DESTINATION lib )
install( TARGETS pyth pythd pyth_csv DESTINATION bin )
install( TARGETS pyth pythd pyth_csv pyth_tx DESTINATION bin )
install( FILES ${PC_HDR} DESTINATION include/pc )
install( FILES program/src/oracle/oracle.h DESTINATION include/oracle )

Expand Down
5 changes: 4 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ ARG SOLANA_VERSION
FROM solanalabs/solana:v${SOLANA_VERSION}

RUN apt-get update
RUN apt-get install -y cmake curl g++ git libzstd1 libzstd-dev zlib1g zlib1g-dev
RUN apt-get install -y cmake curl g++ git libzstd1 libzstd-dev sudo zlib1g zlib1g-dev

# Grant sudo access to pyth user
RUN echo "pyth ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

RUN useradd -m pyth
USER pyth
Expand Down
42 changes: 33 additions & 9 deletions pc/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ manager::manager()
do_cap_( false ),
do_tx_( true ),
is_pub_( false ),
cmt_( commitment::e_confirmed )
cmt_( commitment::e_confirmed ),
sreq_{ { commitment::e_processed } }
{
tconn_.set_sub( this );
breq_->set_sub( this );
Expand Down Expand Up @@ -433,6 +434,15 @@ void manager::poll( bool do_wait )
// get current time
curr_ts_ = get_now();

// get current slot
if ( curr_ts_ - slot_ts_ > 200 * PC_NSECS_IN_MSEC ) {
if ( sreq_->get_is_recv() ) {
if ( has_status( PC_PYTH_RPC_CONNECTED ) ) {
clnt_.send( sreq_ );
}
}
}

// try to (re)connect to tx proxy
if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) {
tconn_.reconnect();
Expand Down Expand Up @@ -489,8 +499,9 @@ void manager::reconnect_rpc()
kidx_ = 0;
ctimeout_ = PC_NSECS_IN_SEC;
pub_ts_ = 0L;
slot_cnt_ = 0UL;
slot_ = 0L;
slot_cnt_ = 0UL;
slot_ts_ = 0L;
num_sub_ = 0;
clnt_.reset();
for(;;) {
Expand All @@ -507,9 +518,15 @@ void manager::reconnect_rpc()
clnt_.send( sreq_ );

// subscribe to program updates
preq_->set_commitment( get_commitment() );
preq_->set_program( get_program_pub_key() );
clnt_.send( preq_ );
pub_key *gpub = get_program_pub_key();
if ( !gpub ) {
set_err_msg( "missing or invalid program public key [" +
get_program_pub_key_file() + "]" );
} else {
preq_->set_commitment( get_commitment() );
preq_->set_program( gpub );
clnt_.send( preq_ );
}

// gather latest info on mapping accounts
for( get_mapping *mptr: mvec_ ) {
Expand Down Expand Up @@ -634,23 +651,30 @@ void manager::schedule( price_sched *kptr )
}
}

void manager::on_response( rpc::slot_subscribe *res )
void manager::on_response( rpc::get_slot *res )
{
// check error
if ( PC_UNLIKELY( res->get_is_err() ) ) {
set_err_msg( "failed to slot_subscribe ["
set_err_msg( "failed to get slot ["
+ res->get_err_msg() + "]" );
return;
}

// ignore slots that go back in time
uint64_t slot = res->get_slot();
uint64_t slot = res->get_current_slot();
int64_t ts = res->get_recv_time();
if ( slot <= slot_ ) {
return;
}
slot_ = slot;
PC_LOG_DBG( "receive slot" ).add( "slot", slot_ ).end();
slot_ts_ = ts;

int64_t ack_ts = res->get_recv_time() - res->get_sent_time();

PC_LOG_DBG( "received get_slot" )
.add( "slot", slot_ )
.add( "rount_trip_time(ms)", 1e-6*ack_ts )
.end();

// submit block hash every N slots
if ( slot_cnt_++ % PC_BLOCKHASH_TIMEOUT == 0 ) {
Expand Down
7 changes: 4 additions & 3 deletions pc/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace pc
public net_accept,
public tx_sub,
public rpc_sub,
public rpc_sub_i<rpc::slot_subscribe>,
public rpc_sub_i<rpc::get_slot>,
public rpc_sub_i<rpc::get_recent_block_hash>,
public rpc_sub_i<rpc::program_subscribe>
{
Expand Down Expand Up @@ -164,7 +164,7 @@ namespace pc
bool get_is_tx_send() const;

// rpc callbacks
void on_response( rpc::slot_subscribe * ) override;
void on_response( rpc::get_slot * ) override;
void on_response( rpc::get_recent_block_hash * ) override;
void on_response( rpc::program_subscribe * ) override;
void set_status( int );
Expand Down Expand Up @@ -228,6 +228,7 @@ namespace pc
int64_t ctimeout_; // connection timeout
uint64_t slot_; // current slot
uint64_t slot_cnt_; // slot count
int64_t slot_ts_; // current slot time
int64_t curr_ts_; // current time
int64_t pub_ts_; // start publish time
int64_t pub_int_; // publish interval
Expand All @@ -241,7 +242,7 @@ namespace pc
commitment cmt_; // account get/subscribe commitment

// requests
rpc::slot_subscribe sreq_[1]; // slot subscription
rpc::get_slot sreq_[1]; // slot subscription
rpc::get_recent_block_hash breq_[1]; // block hash request
rpc::program_subscribe preq_[1]; // program account subscription
};
Expand Down
10 changes: 6 additions & 4 deletions pc/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2145,10 +2145,12 @@ void price::update_pub()
// update publishing index
pub_idx_ = (unsigned)-1;
pub_key *pkey = get_manager()->get_publish_pub_key();
for( unsigned i=0; i != pptr_->num_; ++i ) {
if ( pc_pub_key_equal( &pptr_->comp_[i].pub_, (pc_pub_key_t*)pkey ) ) {
pub_idx_ = i;
break;
if ( pkey ) {
for( unsigned i=0; i != pptr_->num_; ++i ) {
if ( pc_pub_key_equal( &pptr_->comp_[i].pub_, (pc_pub_key_t*)pkey ) ) {
pub_idx_ = i;
break;
}
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions pc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,38 @@ void rpc::get_cluster_nodes::response( const jtree& jt )
on_response( this );
}

///////////////////////////////////////////////////////////////////////////
// get_slot

rpc::get_slot::get_slot( commitment const cmt )
: cmt_{ cmt }
, cslot_( 0UL )
{
}

uint64_t rpc::get_slot::get_current_slot() const
{
return cslot_;
}

void rpc::get_slot::request( json_wtr& msg )
{
msg.add_key( "method", "getSlot" );
msg.add_key( "params", json_wtr::e_arr );
msg.add_val( json_wtr::e_obj );
msg.add_key( "commitment", commitment_to_str( cmt_ ) );
msg.pop();
msg.pop();
}

void rpc::get_slot::response( const jtree& jt )
{
if ( on_error( jt, this ) ) return;
uint32_t rtok = jt.find_val( 1, "result" );
cslot_ = jt.get_uint( rtok );
on_response( this );
}

///////////////////////////////////////////////////////////////////////////
// get_slot_leaders

Expand Down
15 changes: 14 additions & 1 deletion pc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ namespace pc
node_map_t nmap_;
};

// get current slot
class get_slot : public rpc_request
{
public:
get_slot( commitment = e_finalized );
uint64_t get_current_slot() const;
void request( json_wtr& ) override;
void response( const jtree& ) override;
private:
commitment const cmt_; // param
uint64_t cslot_; // result
};

// get id of leader node by slot
class get_slot_leaders : public rpc_request
{
Expand All @@ -376,7 +389,7 @@ namespace pc
pub_key *get_leader( uint64_t );
uint64_t get_last_slot() const;
void request( json_wtr& ) override;
void response( const jtree&p) override;
void response( const jtree& ) override;
private:
typedef std::vector<pub_key> ldr_vec_t;
uint64_t rslot_;
Expand Down
80 changes: 79 additions & 1 deletion pcapps/pyth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ int usage()
std::cerr << " get_block <slot_number> [options]" << std::endl;
std::cerr << " get_product <prod_key> [options]" << std::endl;
std::cerr << " get_product_list [options]" << std::endl;
std::cerr << " get_all_products [options]" << std::endl;
std::cerr << " get_pub_key <key_pair_file>" << std::endl;
std::cerr << " version" << std::endl;
std::cerr << std::endl;
Expand All @@ -74,6 +75,8 @@ int usage()
std::cerr << " -j\n"
<< " Output results in json format where applicable\n"
<< std::endl;
std::cerr << " -d" << std::endl;
std::cerr << " Turn on debug logging\n" << std::endl;
return 1;
}

Expand Down Expand Up @@ -311,12 +314,13 @@ int on_get_balance( int argc, char **argv )
commitment cmt = commitment::e_confirmed;
std::string rpc_host = get_rpc_host();
std::string key_dir = get_key_store();
while( (opt = ::getopt(argc,argv, "r:k:p:c:h" )) != -1 ) {
while( (opt = ::getopt(argc,argv, "r:k:p:c:dh" )) != -1 ) {
switch(opt) {
case 'r': rpc_host = optarg; break;
case 'k': key_dir = optarg; break;
case 'p': knm = optarg; break;
case 'c': cmt = str_to_commitment(optarg); break;
case 'd': log::set_level( PC_LOG_DBG_LVL ); break;
default: return usage();
}
}
Expand Down Expand Up @@ -1011,6 +1015,11 @@ int on_get_product_list( int argc, char **argv )
std::cerr << "pyth: " << mgr.get_err_msg() << std::endl;
return 1;
}
if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) {
std::cerr << "pyth: mapping not ready, check mapping key ["
<< mgr.get_mapping_pub_key_file() << "]" << std::endl;
return 1;
}
// list key/symbol pairs
if ( !do_json ) {
std::string astr;
Expand Down Expand Up @@ -1203,6 +1212,11 @@ int on_get_product( int argc, char **argv )
std::cerr << "pyth: " << mgr.get_err_msg() << std::endl;
return 1;
}
if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) {
std::cerr << "pyth: mapping not ready, check mapping key ["
<< mgr.get_mapping_pub_key_file() << "]" << std::endl;
return 1;
}
// get product and serialize to stdout
product *prod = mgr.get_product( pub );
if ( !prod ) {
Expand All @@ -1217,6 +1231,68 @@ int on_get_product( int argc, char **argv )
return 0;
}

int on_get_all_products( int argc, char **argv )
{
int opt = 0;
bool do_json = false;
commitment cmt = commitment::e_confirmed;
std::string rpc_host = get_rpc_host();
std::string key_dir = get_key_store();
while( (opt = ::getopt(argc,argv, "r:k:c:djh" )) != -1 ) {
switch(opt) {
case 'r': rpc_host = optarg; break;
case 'k': key_dir = optarg; break;
case 'd': log::set_level( PC_LOG_DBG_LVL ); break;
case 'j': do_json = true; break;
case 'c': cmt = str_to_commitment(optarg); break;
default: return usage();
}
}
if ( cmt == commitment::e_unknown ) {
std::cerr << "pyth: unknown commitment level" << std::endl;
return usage();
}

// initialize connection to block-chain
manager mgr;
mgr.set_rpc_host( rpc_host );
mgr.set_dir( key_dir );
mgr.set_do_tx( false );
mgr.set_commitment( cmt );
if ( !mgr.init() || !mgr.bootstrap() ) {
std::cerr << "pyth: " << mgr.get_err_msg() << std::endl;
return 1;
}
if ( !mgr.has_status( PC_PYTH_HAS_MAPPING ) ) {
std::cerr << "pyth: mapping not ready, check mapping key ["
<< mgr.get_mapping_pub_key_file() << "]" << std::endl;
return 1;
}

// get all products and serialize to stdout
if ( !do_json ) {
for (unsigned i=0; i != mgr.get_num_product(); ++i ) {
product *prod = mgr.get_product(i);
print_product( prod );
std::cout << std::endl;
}
} else {
std::cout << "[";
bool first = true;
for (unsigned i=0; i != mgr.get_num_product(); ++i ) {
product *prod = mgr.get_product(i);
if ( !first ) {
std::cout << ",";
}
print_product_json( prod );
first = false;
}
std::cout << "]";
}

return 0;
}

class get_block_print : public get_block
{
public:
Expand Down Expand Up @@ -1434,6 +1510,8 @@ int main(int argc, char **argv)
rc = on_get_product( argc, argv );
} else if ( cmd == "get_product_list" ) {
rc = on_get_product_list( argc, argv );
} else if ( cmd == "get_all_products" ) {
rc = on_get_all_products( argc, argv );
} else if ( cmd == "get_block" ) {
rc = on_get_block( argc, argv );
} else if ( cmd == "version" ) {
Expand Down
6 changes: 3 additions & 3 deletions pctest/slots_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class manager_slot : public manager,
{
public:
manager_slot();
void on_response( rpc::slot_subscribe * ) override;
void on_response( rpc::get_slot * ) override;
void on_response( rpc::get_slot_leaders * ) override;
private:
rpc::get_slot_leaders ldr_[1];
Expand All @@ -27,10 +27,10 @@ manager_slot::manager_slot()
ldr_->set_limit( PC_LEADER_MAX );
}

void manager_slot::on_response( rpc::slot_subscribe *res )
void manager_slot::on_response( rpc::get_slot *res )
{
manager::on_response( res );
uint64_t slot = get_slot();
uint64_t const slot = get_slot();
if ( slot != last_ ) {
// request next slot leader schedule
if ( PC_UNLIKELY( ldr_->get_is_recv() &&
Expand Down

0 comments on commit 521c617

Please sign in to comment.