Skip to content

Commit

Permalink
feat: aggregate in the same slot (#394)
Browse files Browse the repository at this point in the history
* update aggregation logic to aggregate in the same slot

* Add features.h to .gitignore

* Refactor test_upd_aggregate.rs: Clean up imports and formatting, update test cases

* add prev_twap_, prev_twac_ and prev_price_cumulative

* remove twap and twac from test_up_aggregate

* fix test_sizes

* fix borrow reference bug

* fix build

* fix logic

* fix test_publish

* format

* fix test_publish_batch

* fix test_upd_price_no_fail_on_error

* fix test_upd_price_v2

* fix logic

* refactor

* update function desc

* fix logic

* add comments

* fix tests

* add ema test

* reduce PC_NUM_COMP_PYTHNET to 64

* fix tests

* add comments

* revert to use PriceComponentArrayWrapper

* refactor

* update c format

* revert format

* revert format

* gitignore .clang-format

* remove clang-format

* revert format

* revert format

* format

* revert format

* revert format

* fix comment

* remove comment

* add comment

* refactor

* add guard for first price update after deployment

* add back deleted test in test_publish

* add tests for prev values to test_upd_price

* update comment

* add test

* use last_slot_ instead of agg_.pub_slot_

* add more asserts

* remove crank again asserts

* fix

* address comments

* address comments

* remove print statement

* add test to simulate program upgrade

* refactor

* address comments

* address comments

* address comments

* address comments

* merge test_upd_price with test_upd_price_v2

* add asserts
  • Loading branch information
cctdaniel authored Apr 16, 2024
1 parent 14fbdef commit 65949e0
Show file tree
Hide file tree
Showing 16 changed files with 701 additions and 667 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ target
cmake-build-*
/venv

# CMake files
features.h

# IntelliJ / CLion configuration
.idea
*.iml

# CMake files
features.h
# Clang format
.clang-format
2 changes: 1 addition & 1 deletion program/c/src/oracle/oracle.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern "C" {
#define PC_MAP_TABLE_SIZE 640

// Total price component slots available
#define PC_NUM_COMP_PYTHNET 128
#define PC_NUM_COMP_PYTHNET 127

// PC_NUM_COMP - number of price components in use
// Not whole PC_NUM_COMP_PYTHNET because of stack issues appearing in upd_aggregate()
Expand Down
8 changes: 0 additions & 8 deletions program/c/src/oracle/upd_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,6 @@ static inline void upd_twap(
// update aggregate price
static inline bool upd_aggregate( pc_price_t *ptr, uint64_t slot, int64_t timestamp )
{
// Update the value of the previous price, if it had TRADING status.
if ( ptr->agg_.status_ == PC_STATUS_TRADING ) {
ptr->prev_slot_ = ptr->agg_.pub_slot_;
ptr->prev_price_ = ptr->agg_.price_;
ptr->prev_conf_ = ptr->agg_.conf_;
ptr->prev_timestamp_ = ptr->timestamp_;
}

// update aggregate details ready for next slot
ptr->valid_slot_ = ptr->agg_.pub_slot_;// valid slot-time of agg. price
ptr->agg_.pub_slot_ = slot; // publish slot-time of agg. price
Expand Down
99 changes: 75 additions & 24 deletions program/rust/src/accounts/price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,62 +31,113 @@ mod price_pythnet {
},
error::OracleError,
},
std::ops::{
Deref,
DerefMut,
Index,
IndexMut,
},
};

#[repr(C)]
#[derive(Copy, Clone)]
pub struct PriceComponentArrayWrapper([PriceComponent; PC_NUM_COMP_PYTHNET as usize]);

// Implementing Index and IndexMut allows PriceComponentArrayWrapper to use array indexing directly,
// such as price_account.comp_[i], making it behave more like a native array or slice.
impl Index<usize> for PriceComponentArrayWrapper {
type Output = PriceComponent;

fn index(&self, index: usize) -> &Self::Output {
&self.0[index]
}
}
impl IndexMut<usize> for PriceComponentArrayWrapper {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.0[index]
}
}

// Implementing Deref and DerefMut allows PriceComponentArrayWrapper to use slice methods directly,
// such as len(), making it behave more like a native array or slice.
impl Deref for PriceComponentArrayWrapper {
type Target = [PriceComponent];

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for PriceComponentArrayWrapper {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

unsafe impl Pod for PriceComponentArrayWrapper {
}
unsafe impl Zeroable for PriceComponentArrayWrapper {
}

/// Pythnet-only extended price account format. This extension is
/// an append-only change that adds extra publisher slots and
/// PriceCumulative for TWAP processing.
#[repr(C)]
#[derive(Copy, Clone, Pod, Zeroable)]
pub struct PriceAccountPythnet {
pub header: AccountHeader,
pub header: AccountHeader,
/// Type of the price account
pub price_type: u32,
pub price_type: u32,
/// Exponent for the published prices
pub exponent: i32,
pub exponent: i32,
/// Current number of authorized publishers
pub num_: u32,
pub num_: u32,
/// Number of valid quotes for the last aggregation
pub num_qt_: u32,
pub num_qt_: u32,
/// Last slot with a succesful aggregation (status : TRADING)
pub last_slot_: u64,
pub last_slot_: u64,
/// Second to last slot where aggregation was attempted
pub valid_slot_: u64,
pub valid_slot_: u64,
/// Ema for price
pub twap_: PriceEma,
pub twap_: PriceEma,
/// Ema for confidence
pub twac_: PriceEma,
pub twac_: PriceEma,
/// Last time aggregation was attempted
pub timestamp_: i64,
pub timestamp_: i64,
/// Minimum valid publisher quotes for a succesful aggregation
pub min_pub_: u8,
pub message_sent_: u8,
pub min_pub_: u8,
pub message_sent_: u8,
/// Configurable max latency in slots between send and receive
pub max_latency_: u8,
pub max_latency_: u8,
/// Unused placeholder for alignment
pub unused_2_: i8,
pub unused_3_: i32,
pub unused_2_: i8,
pub unused_3_: i32,
/// Corresponding product account
pub product_account: Pubkey,
pub product_account: Pubkey,
/// Next price account in the list
pub next_price_account: Pubkey,
pub next_price_account: Pubkey,
/// Second to last slot where aggregation was succesful (i.e. status : TRADING)
pub prev_slot_: u64,
pub prev_slot_: u64,
/// Aggregate price at prev_slot_
pub prev_price_: i64,
pub prev_price_: i64,
/// Confidence interval at prev_slot_
pub prev_conf_: u64,
pub prev_conf_: u64,
/// Timestamp of prev_slot_
pub prev_timestamp_: i64,
pub prev_timestamp_: i64,
/// Last attempted aggregate results
pub agg_: PriceInfo,
pub agg_: PriceInfo,
/// Publishers' price components. NOTE(2023-10-06): On Pythnet, not all
/// PC_NUM_COMP_PYTHNET slots are used due to stack size
/// issues in the C code. For iterating over price components,
/// PC_NUM_COMP must be used.
pub comp_: [PriceComponent; PC_NUM_COMP_PYTHNET as usize],
pub comp_: PriceComponentArrayWrapper,
/// Previous EMA for price and confidence
pub prev_twap_: PriceEma,
pub prev_twac_: PriceEma,
/// Previous TWAP cumulative values
pub prev_price_cumulative: PriceCumulative,
/// Cumulative sums of aggregative price and confidence used to compute arithmetic moving averages
pub price_cumulative: PriceCumulative,
pub price_cumulative: PriceCumulative,
}

impl PriceAccountPythnet {
Expand Down
109 changes: 73 additions & 36 deletions program/rust/src/processor/upd_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use {
crate::{
accounts::{
PriceAccount,
PriceInfo,
PythOracleSerialize,
UPD_PRICE_WRITE_SEED,
},
c_oracle_header::PC_STATUS_TRADING,
deserialize::{
load,
load_checked,
Expand Down Expand Up @@ -128,7 +128,8 @@ pub fn upd_price(
let clock = Clock::from_account_info(clock_account)?;

let mut publisher_index: usize = 0;
let latest_aggregate_price: PriceInfo;
let slots_since_last_successful_aggregate: u64;
let noninitial_price_update_after_program_upgrade: bool;

// The price_data borrow happens in a scope because it must be
// dropped before we borrow again as raw data pointer for the C
Expand All @@ -149,7 +150,16 @@ pub fn upd_price(
OracleError::PermissionViolation.into(),
)?;

latest_aggregate_price = price_data.agg_;
// We use last_slot_ to calculate slots_since_last_successful_aggregate. This is because last_slot_ is updated after the aggregate price is updated successfully.
slots_since_last_successful_aggregate = clock.slot - price_data.last_slot_;

// Check if the program upgrade has happened in the current slot and aggregate price has been updated, if so, use the old logic to update twap/twac/price_cumulative.
// This is to ensure that twap/twac/price_cumulative are calculated correctly during the migration.
// We check if prev_twap_.denom_ is == 0 because when the program upgrade has happened, denom_ is initialized to 0 and it can only stay the same or increase while numer_ can be negative if prices are negative.
// And we check if slots_since_last_successful_aggregate == 0 to check if the aggregate price has been updated in the current slot.
noninitial_price_update_after_program_upgrade =
price_data.prev_twap_.denom_ == 0 && slots_since_last_successful_aggregate == 0;

let latest_publisher_price = price_data.comp_[publisher_index].latest_;

// Check that publisher is publishing a more recent price
Expand All @@ -161,10 +171,38 @@ pub fn upd_price(
)?;
}

// Try to update the aggregate
#[allow(unused_variables)]
if clock.slot > latest_aggregate_price.pub_slot_ {
let updated = unsafe {
// Extend the scope of the mutable borrow of price_data
{
let mut price_data = load_checked::<PriceAccount>(price_account, cmd_args.header.version)?;

// Update the publisher's price
if is_component_update(cmd_args)? {
let status: u32 = get_status_for_conf_price_ratio(
cmd_args.price,
cmd_args.confidence,
cmd_args.status,
)?;
let publisher_price = &mut price_data.comp_[publisher_index].latest_;
publisher_price.price_ = cmd_args.price;
publisher_price.conf_ = cmd_args.confidence;
publisher_price.status_ = status;
publisher_price.pub_slot_ = cmd_args.publishing_slot;
}

// If the price update is the first in the slot and the aggregate is trading, update the previous slot, price, conf, and timestamp.
if slots_since_last_successful_aggregate > 0 && price_data.agg_.status_ == PC_STATUS_TRADING
{
price_data.prev_slot_ = price_data.agg_.pub_slot_;
price_data.prev_price_ = price_data.agg_.price_;
price_data.prev_conf_ = price_data.agg_.conf_;
price_data.prev_timestamp_ = price_data.timestamp_;
}
}

let updated = unsafe {
if noninitial_price_update_after_program_upgrade {
false
} else {
// NOTE: c_upd_aggregate must use a raw pointer to price
// data. Solana's `<account>.borrow_*` methods require exclusive
// access, i.e. no other borrow can exist for the account.
Expand All @@ -173,25 +211,41 @@ pub fn upd_price(
clock.slot,
clock.unix_timestamp,
)
};

// If the aggregate was successfully updated, calculate the difference and update TWAP.
if updated {
let agg_diff = (clock.slot as i64)
- load_checked::<PriceAccount>(price_account, cmd_args.header.version)?.prev_slot_
as i64;
// Encapsulate TWAP update logic in a function to minimize unsafe block scope.
unsafe {
c_upd_twap(price_account.try_borrow_mut_data()?.as_mut_ptr(), agg_diff);
}
}
};

// If the aggregate was successfully updated, calculate the difference and update TWAP.
if updated {
{
let mut price_data =
load_checked::<PriceAccount>(price_account, cmd_args.header.version)?;

// Multiple price updates may occur within the same slot. Updates within the same slot will
// use the previously calculated values (prev_twap, prev_twac, and prev_price_cumulative)
// from the last successful aggregated price update as their basis for recalculation. This
// ensures that each update within a slot builds upon the last and not the twap/twac/price_cumulative
// that is calculated right after the publishers' individual price updates.
if slots_since_last_successful_aggregate > 0 {
price_data.prev_twap_ = price_data.twap_;
price_data.prev_twac_ = price_data.twac_;
price_data.prev_price_cumulative = price_data.price_cumulative;
}
price_data.twap_ = price_data.prev_twap_;
price_data.twac_ = price_data.prev_twac_;
price_data.price_cumulative = price_data.prev_price_cumulative;

price_data.update_price_cumulative()?;
// We want to send a message every time the aggregate price updates. However, during the migration,
// not every publisher will necessarily provide the accumulator accounts. The message_sent_ flag
// ensures that after every aggregate update, the next publisher who provides the accumulator accounts
// will send the message.
price_data.message_sent_ = 0;
price_data.update_price_cumulative()?;
}
let agg_diff = (clock.slot as i64)
- load_checked::<PriceAccount>(price_account, cmd_args.header.version)?.prev_slot_
as i64;
unsafe {
c_upd_twap(price_account.try_borrow_mut_data()?.as_mut_ptr(), agg_diff);
}
}

Expand Down Expand Up @@ -261,23 +315,6 @@ pub fn upd_price(
}
}

// Try to update the publisher's price
if is_component_update(cmd_args)? {
// IMPORTANT: If the publisher does not meet the price/conf
// ratio condition, its price will not count for the next
// aggregate.
let status: u32 =
get_status_for_conf_price_ratio(cmd_args.price, cmd_args.confidence, cmd_args.status)?;

{
let publisher_price = &mut price_data.comp_[publisher_index].latest_;
publisher_price.price_ = cmd_args.price;
publisher_price.conf_ = cmd_args.confidence;
publisher_price.status_ = status;
publisher_price.pub_slot_ = cmd_args.publishing_slot;
}
}

Ok(())
}

Expand Down
5 changes: 1 addition & 4 deletions program/rust/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ mod test_publish_batch;
mod test_set_max_latency;
mod test_set_min_pub;
mod test_sizes;
mod test_twap;
mod test_upd_aggregate;
mod test_upd_permissions;
mod test_upd_price;
mod test_upd_price_no_fail_on_error;
mod test_upd_product;
mod test_utils;


mod test_twap;
mod test_upd_price_v2;
Loading

0 comments on commit 65949e0

Please sign in to comment.