Skip to content

Commit

Permalink
Ocean: Catchup to tip on startup (#3105)
Browse files Browse the repository at this point in the history
* Ocean tries to catchup on startup if below tip height

* Keep hold of lock

* Fix RemoveOracle indexing

* Small perf improvements

* Revert "Keep hold of lock"

This reverts commit 07552f7.

* Create base txid out of loop

* Fix printf args order

* Don't hold ocean invalidation

* Revert "Revert "Keep hold of lock""

This reverts commit 29a423c.

* Bump setgovheight dftx buffer size

* Ocean: fix rm/update oracle index (#3107)

* fix

* update ocean ci

* revert to direct ffi:get_pp

* Remove clone

---------

Co-authored-by: canonbrother <[email protected]>
  • Loading branch information
Jouzo and canonbrother authored Nov 5, 2024
1 parent 2af7598 commit 53535ce
Show file tree
Hide file tree
Showing 19 changed files with 350 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests-ocean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- ocean-refinements # TODO(): remove before merge to master
- ocean-catchup-on-startup

concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-dftx/src/types/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Decodable for RawBytes {
fn consensus_decode<R: bitcoin::io::Read + ?Sized>(
reader: &mut R,
) -> Result<Self, bitcoin::consensus::encode::Error> {
let mut buf = [0u8; 512];
let mut buf = [0u8; 4096];
let v = reader.read(&mut buf)?;
Ok(Self(buf[..v].to_vec()))
}
Expand Down
2 changes: 2 additions & 0 deletions lib/ain-dftx/tests/data/setgovernanceheight.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
6a4d1b01446654786a144c505f4c4f414e5f544f4b454e5f53504c4954531c1180f0fa020000000012a0f64a0000000000190ce0360000000000205c4f13000000000021cc2a1d000000000023581416000000000024f834280000000000268c6e64000000000027587b3b00000000002850430d000000000029b8af0600000000002a54361600000000002bd4010c00000000002c18b30700000000002ddc9e0c00000000002ed82a0c00000000003550620c000000000036fcb921000000000037948d2e00000000003898401800000000003d98d71c00000000003e00ae1500000000003f80920b000000000040d4a81e000000000045ac930b000000000046305f0c000000000047785019000000000048c4d510000000000070f21a00
// {"ATTRIBUTES": {"v0/poolpairs/17/token_a_fee_pct": "0.005"},"startHeight": 1896000}
6a2b446654786a0a4154545249425554455301000000007011000000610100000020a107000000000040ee1c
// TX: 944634a0ebaecfef099460015e2e4288fbf84cd7c8d5bb8716a50bb1bd1164d9, block: 4163636
6a4d580a446654786a0a415454524942555445538e000000007021000000620100000020a1070000000000000000007021000000640800000000000000007023000000620100000020a1070000000000000000007023000000640800000000000000007024000000620100000020a1070000000000000000007024000000640800000000000000007026000000620100000020a1070000000000000000007026000000640800000000000000007027000000620100000020a1070000000000000000007027000000640800000000000000007028000000620100000020a1070000000000000000007028000000640800000000000000007029000000620100000020a107000000000000000000702900000064080000000000000000702a000000620100000020a107000000000000000000702a00000064080000000000000000702b000000620100000020a107000000000000000000702b00000064080000000000000000702c000000620100000020a107000000000000000000702c00000064080000000000000000702d000000620100000020a107000000000000000000702d00000064080000000000000000702e000000620100000020a107000000000000000000702e000000640800000000000000007035000000620100000020a1070000000000000000007035000000640800000000000000007037000000620100000020a1070000000000000000007037000000640800000000000000007038000000620100000020a107000000000000000000703800000064080000000000000000703d000000620100000020a107000000000000000000703d00000064080000000000000000703e000000620100000020a107000000000000000000703e00000064080000000000000000703f000000620100000020a107000000000000000000703f000000640800000000000000007040000000620100000020a1070000000000000000007040000000640800000000000000007045000000620100000020a1070000000000000000007045000000640800000000000000007046000000620100000020a1070000000000000000007046000000640800000000000000007047000000620100000020a1070000000000000000007047000000640800000000000000007048000000620100000020a107000000000000000000704800000064080000000000000000704d000000620100000020a107000000000000000000704d00000064080000000000000000704e000000620100000020a107000000000000000000704e00000064080000000000000000704f000000620100000020a107000000000000000000704f000000640800000000000000007050000000620100000020a1070000000000000000007050000000640800000000000000007055000000620100000020a1070000000000000000007055000000640800000000000000007056000000620100000020a1070000000000000000007056000000640800000000000000007057000000620100000020a1070000000000000000007057000000640800000000000000007058000000620100000020a107000000000000000000705800000064080000000000000000705a000000620100000020a107000000000000000000705a00000064080000000000000000705f000000620100000020a107000000000000000000705f000000640800000000000000007060000000620100000020a1070000000000000000007060000000640800000000000000007061000000620100000020a1070000000000000000007061000000640800000000000000007062000000620100000020a1070000000000000000007062000000640800000000000000007064000000620100000020a1070000000000000000007064000000640800000000000000007068000000620100000020a107000000000000000000706800000064080000000000000000706d000000620100000020a107000000000000000000706d00000064080000000000000000706e000000620100000020a107000000000000000000706e00000064080000000000000000706f000000620100000020a107000000000000000000706f000000640800000000000000007070000000620100000020a1070000000000000000007070000000640800000000000000007072000000620100000020a1070000000000000000007072000000640800000000000000007077000000620100000020a1070000000000000000007077000000640800000000000000007078000000620100000020a1070000000000000000007078000000640800000000000000007079000000620100000020a107000000000000000000707900000064080000000000000000707a000000620100000020a107000000000000000000707a0000006408000000000000000070cc000000620100000020a10700000000000000000070cc0000006408000000000000000070cd000000620100000020a10700000000000000000070cd0000006408000000000000000070ce000000620100000020a10700000000000000000070ce0000006408000000000000000070cf000000620100000020a10700000000000000000070cf0000006408000000000000000070d4000000620100000020a10700000000000000000070d40000006408000000000000000070d5000000620100000020a10700000000000000000070d50000006408000000000000000070d6000000620100000020a10700000000000000000070d60000006408000000000000000070d7000000620100000020a10700000000000000000070d70000006408000000000000000070ef000000620100000020a10700000000000000000070ef0000006408000000000000000070fe000000620100000020a10700000000000000000070fe0000006408000000000000000070ff000000620100000020a10700000000000000000070ff000000640800000000000000007000010000620100000020a1070000000000000000007000010000640800000000000000007001010000620100000020a1070000000000000000007001010000640800000000000000007002010000620100000020a1070000000000000000007002010000640800000000000000007003010000620100000020a1070000000000000000007003010000640800000000000000007004010000620100000020a1070000000000000000007004010000640800000000000000007005010000620100000020a1070000000000000000007005010000640800000000000000007006010000620100000020a1070000000000000000007006010000640800000000000000007007010000620100000020a1070000000000000000007007010000640800000000000000007008010000620100000020a1070000000000000000007008010000640800000000000000007009010000620100000020a107000000000000000000700901000064080000000000000000700a010000620100000020a107000000000000000000700a01000064080000000000000000700b010000620100000020a107000000000000000000700b010000640800000000000000007014010000620100000020a1070000000000000000007014010000640800000000b48e3f00
29 changes: 14 additions & 15 deletions lib/ain-ocean/src/api/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,34 +96,33 @@ async fn get_feed(

let key = (token, currency, txid);

let price_feed_list = ctx
let oracle_price_feeds = ctx
.services
.oracle_price_feed
.by_id
.list(None, SortOrder::Descending)?
.paginate(&query)
.flatten()
.collect::<Vec<_>>();

let mut oracle_price_feeds = Vec::new();

for ((token, currency, oracle_id, txid), feed) in &price_feed_list {
if key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id) {
.into_iter()
.filter(|((token, currency, oracle_id, _), _)| {
key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id)
})
.map(|((token, currency, oracle_id, txid), feed)| {
let amount = Decimal::from(feed.amount) / Decimal::from(COIN);
oracle_price_feeds.push(OraclePriceFeedResponse {
OraclePriceFeedResponse {
id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid),
key: format!("{}-{}-{}", token, currency, oracle_id),
sort: hex::encode(feed.block.height.to_string() + &txid.to_string()),
token: token.to_owned(),
currency: currency.to_owned(),
oracle_id: oracle_id.to_owned(),
txid: *txid,
token,
currency,
oracle_id,
txid,
time: feed.time,
amount: amount.normalize().to_string(),
block: feed.block.clone(),
});
}
}
}
})
.collect::<Vec<_>>();

Ok(ApiPagedResponse::of(
oracle_price_feeds,
Expand Down
4 changes: 2 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ async fn list_pool_swaps_verbose(
_ => true,
})
.map(|item| async {
let (_, swap) = item?;
let (key, swap) = item?;
let from = find_swap_from(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &key, &swap).await?;

let swap_type = check_swap_type(&ctx, &swap).await?;

Expand Down
16 changes: 14 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
NotFoundKind, OtherSnafu,
},
indexer::PoolSwapAggregatedInterval,
model::{PoolSwap, PoolSwapAggregatedAggregated},
model::{PoolSwap, PoolSwapAggregatedAggregated, PoolSwapKey},
storage::{RepositoryOps, SecondaryIndex, SortOrder},
Result,
};
Expand Down Expand Up @@ -673,6 +673,7 @@ pub async fn find_swap_from(

pub async fn find_swap_to(
ctx: &Arc<AppContext>,
swap_key: &PoolSwapKey,
swap: &PoolSwap,
) -> Result<Option<PoolSwapFromToData>> {
let PoolSwap {
Expand All @@ -689,9 +690,20 @@ pub async fn find_swap_to(

let display_symbol = parse_display_symbol(&to_token);

// TODO Index to_amount if missing
if to_amount.is_none() {
let amount = 0;
let swap = PoolSwap {
to_amount: Some(amount),
..swap.clone()
};
ctx.services.pool.by_id.put(swap_key, &swap)?;
}

Ok(Some(PoolSwapFromToData {
address: to_address,
amount: Decimal::new(to_amount.to_owned(), 8).to_string(),
// amount: Decimal::new(to_amount.to_owned(), 8).to_string(), // Need fallback
amount: Decimal::new(to_amount.to_owned().unwrap_or_default(), 8).to_string(),
symbol: to_token.symbol,
display_symbol,
}))
Expand Down
20 changes: 15 additions & 5 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ fn index_script(services: &Arc<Services>, ctx: &Context, txs: &[Transaction]) ->

return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "Index script TransactionVout".to_string(),
r#type: "Script TransactionVout".to_string(),
id: format!("{}-{}", vin.txid, vin.vout),
});
};
Expand Down Expand Up @@ -386,8 +386,8 @@ fn invalidate_script(services: &Arc<Services>, ctx: &Context, txs: &[Transaction
};

return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "Index script TransactionVout".to_string(),
action: IndexAction::Invalidate,
r#type: "Script TransactionVout".to_string(),
id: format!("{}-{}", vin.txid, vin.vout),
});
};
Expand Down Expand Up @@ -428,15 +428,15 @@ fn invalidate_script_unspent_vin(
let Some(transaction) = services.transaction.by_id.get(&vin.txid)? else {
return Err(Error::NotFoundIndex {
action: IndexAction::Invalidate,
r#type: "Transaction".to_string(),
r#type: "ScriptUnspentVin Transaction".to_string(),
id: vin.txid.to_string(),
});
};

let Some(vout) = services.transaction.vout_by_id.get(&(vin.txid, vin.vout))? else {
return Err(Error::NotFoundIndex {
action: IndexAction::Invalidate,
r#type: "TransactionVout".to_string(),
r#type: "ScriptUnspentVin TransactionVout".to_string(),
id: format!("{}{}", vin.txid, vin.vout),
});
};
Expand Down Expand Up @@ -530,6 +530,14 @@ fn invalidate_script_activity_vout(
Ok(())
}

pub fn get_block_height(services: &Arc<Services>) -> Result<u32> {
Ok(services
.block
.by_height
.get_highest()?
.map_or(0, |block| block.height))
}

pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
trace!("[index_block] Indexing block...");
let start = Instant::now();
Expand Down Expand Up @@ -601,6 +609,7 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
DfTx::SetLoanToken(data) => data.index(services, ctx)?,
DfTx::CompositeSwap(data) => data.index(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Indexed dftx");
Expand Down Expand Up @@ -712,6 +721,7 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
DfTx::SetLoanToken(data) => data.invalidate(services, ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Invalidate dftx");
Expand Down
81 changes: 44 additions & 37 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use snafu::OptionExt;

use crate::{
error::{
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu,
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, NotFoundIndexSnafu,
ToPrimitiveSnafu,
},
indexer::{Context, Index, Result},
model::{
Expand Down Expand Up @@ -105,27 +106,36 @@ impl Index for AppointOracle {
}

impl Index for RemoveOracle {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
let oracle_id = ctx.tx.txid;
fn index(self, services: &Arc<Services>, _ctx: &Context) -> Result<()> {
let oracle_id = self.oracle_id;
services.oracle.by_id.delete(&oracle_id)?;

let (_, previous) = get_previous_oracle(services, oracle_id)?;
let (_, mut previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Index,
r#type: "RemoveOracle".to_string(),
id: oracle_id.to_string(),
})?;

for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
price_feed.currency.to_owned(),
oracle_id,
))?;
for PriceFeed { token, currency } in previous.price_feeds.drain(..) {
services
.oracle_token_currency
.by_id
.delete(&(token, currency, oracle_id))?;
}

Ok(())
}

fn invalidate(&self, services: &Arc<Services>, context: &Context) -> Result<()> {
fn invalidate(&self, services: &Arc<Services>, _ctx: &Context) -> Result<()> {
trace!("[RemoveOracle] Invalidating...");
let oracle_id = context.tx.txid;
let (_, previous) = get_previous_oracle(services, oracle_id)?;
let oracle_id = self.oracle_id;
let (_, previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Invalidate,
r#type: "RemoveOracle".to_string(),
id: oracle_id.to_string(),
})?;

let oracle = Oracle {
owner_address: previous.owner_address,
Expand Down Expand Up @@ -154,7 +164,7 @@ impl Index for RemoveOracle {

impl Index for UpdateOracle {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
let oracle_id = ctx.tx.txid;
let oracle_id = self.oracle_id;
let price_feeds = self
.price_feeds
.iter()
Expand All @@ -176,7 +186,12 @@ impl Index for UpdateOracle {
.by_id
.put(&(oracle_id, ctx.block.height), &oracle)?;

let (_, previous) = get_previous_oracle(services, oracle_id)?;
let (_, previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Index,
r#type: "UpdateOracle".to_string(),
id: oracle_id.to_string(),
})?;
for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
Expand All @@ -201,7 +216,7 @@ impl Index for UpdateOracle {

fn invalidate(&self, services: &Arc<Services>, context: &Context) -> Result<()> {
trace!("[UpdateOracle] Invalidating...");
let oracle_id = context.tx.txid;
let oracle_id = self.oracle_id;
services
.oracle_history
.by_id
Expand All @@ -215,7 +230,12 @@ impl Index for UpdateOracle {
self.oracle_id,
))?;
}
let ((prev_oracle_id, _), previous) = get_previous_oracle(services, oracle_id)?;
let ((prev_oracle_id, _), previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Invalidate,
r#type: "UpdateOracle".to_string(),
id: oracle_id.to_string(),
})?;

let prev_oracle = Oracle {
owner_address: previous.owner_address,
Expand Down Expand Up @@ -262,17 +282,15 @@ fn map_price_aggregated(
)),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == token.clone() && k.1 == currency.clone(),
_ => true,
})
.take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency))
.flatten()
.collect::<Vec<_>>();

let mut aggregated_total = Decimal::zero();
let mut aggregated_count = Decimal::zero();
let mut aggregated_weightage = Decimal::zero();

let base_id = Txid::from_byte_array([0xffu8; 32]);
let oracles_len = oracles.len();
for (id, oracle) in oracles {
if oracle.weightage == 0 {
Expand All @@ -283,10 +301,7 @@ fn map_price_aggregated(
let feed = services
.oracle_price_feed
.by_id
.list(
Some((id.0, id.1, id.2, Txid::from_byte_array([0xffu8; 32]))),
SortOrder::Descending,
)?
.list(Some((id.0, id.1, id.2, base_id)), SortOrder::Descending)?
.next()
.transpose()?;

Expand Down Expand Up @@ -364,8 +379,8 @@ fn index_set_oracle_data(
let key = (
price_aggregated.aggregated.oracles.total,
price_aggregated.block.height,
token.clone(),
currency.clone(),
token,
currency,
);
ticker_repo.by_key.put(&key, pair)?;
ticker_repo.by_id.put(
Expand Down Expand Up @@ -542,7 +557,7 @@ pub fn index_interval_mapper(

if block.median_time - aggregated.block.median_time > interval.clone() as i64 {
return start_new_bucket(services, block, token, currency, aggregated, interval);
}
};

forward_aggregate(services, previous, aggregated)?;

Expand Down Expand Up @@ -709,21 +724,13 @@ fn backward_aggregate_value(
fn get_previous_oracle(
services: &Arc<Services>,
oracle_id: Txid,
) -> Result<(OracleHistoryId, Oracle)> {
) -> Result<Option<(OracleHistoryId, Oracle)>> {
let previous = services
.oracle_history
.by_id
.list(Some((oracle_id, u32::MAX)), SortOrder::Descending)?
.next()
.transpose()?;

let Some(previous) = previous else {
return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "OracleHistory".to_string(),
id: oracle_id.to_string(),
});
};

Ok(previous)
}
Loading

0 comments on commit 53535ce

Please sign in to comment.