Skip to content

Commit

Permalink
fix(api): SQL: use = instead of ANY where possible in events-related …
Browse files Browse the repository at this point in the history
…queries (#1346)

We have the following request:

```
	FROM events 
	WHERE (miniblock_number >= $6) AND (miniblock_number <= $7) AND (address = ANY($1)) AND (topic1 = ANY($2)) AND (topic2 = ANY($3)) AND (topic3 = ANY($4)) 
	ORDER BY miniblock_number ASC, event_index_in_block ASC 
```

currently regardless of amount of elements in every filter, we apply ANY
clause. This PR changes this so that if there is exactly one element,
`ANY` is replaced with `=`.


**WHY**

we have a hypotesis that this will significantly improve DB performance
of this query
  • Loading branch information
RomanBrodetski committed Mar 6, 2024
1 parent dcb52bb commit ce39dff
Showing 1 changed file with 139 additions and 19 deletions.
158 changes: 139 additions & 19 deletions core/lib/dal/src/events_web3_dal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use sqlx::Row;
use sqlx::{
postgres::PgArguments,
query::{Query, QueryAs},
Postgres, Row,
};
use zksync_types::{
api::{GetLogsFilter, Log},
Address, MiniblockNumber, H256,
Expand Down Expand Up @@ -37,13 +41,17 @@ impl EventsWeb3Dal<'_, '_> {

let mut query = sqlx::query(&query);

if !filter.addresses.is_empty() {
let addresses: Vec<_> = filter.addresses.iter().map(Address::as_bytes).collect();
query = query.bind(addresses);
}
// Bind address params - noop if there are no addresses
query = Self::bind_params_for_optional_filter_query(
query,
filter.addresses.iter().map(Address::as_bytes).collect(),
);
for (_, topics) in &filter.topics {
let topics: Vec<_> = topics.iter().map(H256::as_bytes).collect();
query = query.bind(topics);
// Bind topic params - noop if there are no topics
query = Self::bind_params_for_optional_filter_query(
query,
topics.iter().map(H256::as_bytes).collect(),
);
}
query = query.bind(offset as i32);
let log = query
Expand Down Expand Up @@ -89,13 +97,18 @@ impl EventsWeb3Dal<'_, '_> {
);

let mut query = sqlx::query_as(&query);
if !filter.addresses.is_empty() {
let addresses: Vec<_> = filter.addresses.iter().map(Address::as_bytes).collect();
query = query.bind(addresses);
}

// Bind address params - noop if there are no addresses
query = Self::bind_params_for_optional_filter_query_as(
query,
filter.addresses.iter().map(Address::as_bytes).collect(),
);
for (_, topics) in &filter.topics {
let topics: Vec<_> = topics.iter().map(H256::as_bytes).collect();
query = query.bind(topics);
// Bind topic params - noop if there are no topics
query = Self::bind_params_for_optional_filter_query_as(
query,
topics.iter().map(H256::as_bytes).collect(),
);
}
query = query.bind(limit as i32);

Expand All @@ -118,18 +131,69 @@ impl EventsWeb3Dal<'_, '_> {

where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0 as i64);

if !filter.addresses.is_empty() {
where_sql += &format!(" AND (address = ANY(${}))", arg_index);
// Add filters for address (like `address = ANY($1)` or `address = $1`)
if let Some(filter_sql) =
Self::build_sql_filter(filter.addresses.len() as u32, "address", arg_index)
{
where_sql += &filter_sql;
arg_index += 1;
}
for (topic_index, _) in filter.topics.iter() {
where_sql += &format!(" AND (topic{} = ANY(${}))", topic_index, arg_index);
arg_index += 1;

// Add filters for topics (like `topic0 = ANY($2)`)
for (topic_index, topics) in filter.topics.iter() {
if let Some(filter_sql) = Self::build_sql_filter(
topics.len() as u32,
&format!("topic{}", topic_index),
arg_index,
) {
where_sql += &filter_sql;
arg_index += 1;
}
}

(where_sql, arg_index)
}

// Builds SQL filter for optional filter (like address or topics).
fn build_sql_filter(
number_of_entities: u32,
field_name: &str,
arg_index: u8,
) -> Option<String> {
match number_of_entities {
0 => None,
1 => Some(format!(" AND ({} = ${})", field_name, arg_index)),
_ => Some(format!(" AND ({} = ANY(${}))", field_name, arg_index)),
}
}

// Binds parameters for optional filter (like address or topics).
// Noop if there are no values.
// Assumes `=$1` syntax for single value and `=ANY($1)` for multiple values.
// See the method above for details.
fn bind_params_for_optional_filter_query_as<'q, O>(
query: QueryAs<'q, Postgres, O, PgArguments>,
values: Vec<&'q [u8]>,
) -> QueryAs<'q, Postgres, O, PgArguments> {
match values.len() {
0 => query,
1 => query.bind(values[0]),
_ => query.bind(values),
}
}

// Same as `bind_params_for_optional_filter_query_as` but for `Query` instead of `QueryAs`.
fn bind_params_for_optional_filter_query<'q>(
query: Query<'q, Postgres, PgArguments>,
values: Vec<&'q [u8]>,
) -> Query<'q, Postgres, PgArguments> {
match values.len() {
0 => query,
1 => query.bind(values[0]),
_ => query.bind(values),
}
}

pub async fn get_all_logs(
&mut self,
from_block: MiniblockNumber,
Expand Down Expand Up @@ -210,12 +274,68 @@ mod tests {
topics: vec![(0, vec![H256::from_low_u64_be(456)])],
};

let expected_sql = "(miniblock_number >= 100) AND (miniblock_number <= 200) AND (address = ANY($1)) AND (topic0 = ANY($2))";
let expected_sql = "(miniblock_number >= 100) AND (miniblock_number <= 200) AND (address = $1) AND (topic0 = $2)";
let expected_arg_index = 3;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
}

#[tokio::test]
async fn test_build_get_logs_with_multiple_topics_where_clause() {
let connection_pool = ConnectionPool::test_pool().await;
let storage = &mut connection_pool.access_storage().await.unwrap();
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![
Address::from_low_u64_be(123),
Address::from_low_u64_be(1233),
],
topics: vec![
(
0,
vec![
H256::from_low_u64_be(456),
H256::from_low_u64_be(789),
H256::from_low_u64_be(101),
],
),
(2, vec![H256::from_low_u64_be(789)]),
],
};

let expected_sql = "(miniblock_number >= 10) AND (miniblock_number <= 400) AND (address = ANY($1)) AND (topic0 = ANY($2)) AND (topic2 = $3)";
let expected_arg_index = 4;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
}

#[tokio::test]
async fn test_build_get_logs_with_no_address_where_clause() {
let connection_pool = ConnectionPool::test_pool().await;
let storage = &mut connection_pool.access_storage().await.unwrap();
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![],
topics: vec![(2, vec![H256::from_low_u64_be(789)])],
};

let expected_sql =
"(miniblock_number >= 10) AND (miniblock_number <= 400) AND (topic2 = $1)";
let expected_arg_index = 2;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
}
}

0 comments on commit ce39dff

Please sign in to comment.