Skip to content

Commit

Permalink
feat: fetch api (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead authored Mar 27, 2023
1 parent 1fb0f78 commit b454c5b
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 29 deletions.
41 changes: 38 additions & 3 deletions relay_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ use {
crate::{ConnectionOptions, Error},
relay_rpc::{
domain::{SubscriptionId, Topic},
rpc::{BatchSubscribe, BatchUnsubscribe, Publish, Subscribe, Subscription, Unsubscribe},
rpc::{
BatchFetchMessages,
BatchSubscribe,
BatchUnsubscribe,
FetchMessages,
Publish,
Subscribe,
Subscription,
Unsubscribe,
},
},
std::{sync::Arc, time::Duration},
tokio::sync::{
Expand All @@ -12,9 +21,10 @@ use {
},
tokio_tungstenite::tungstenite::protocol::CloseFrame,
};
pub use {inbound::*, outbound::*, stream::*};
pub use {fetch::*, inbound::*, outbound::*, stream::*};

mod connection;
mod fetch;
mod inbound;
mod outbound;
mod stream;
Expand Down Expand Up @@ -131,6 +141,20 @@ impl Client {
EmptyResponseFuture::new(response)
}

/// Fetch mailbox messages for a specific topic.
pub fn fetch(&self, topic: Topic) -> ResponseFuture<FetchMessages> {
let (request, response) = create_request(FetchMessages { topic });

self.request(request);

response
}

/// Fetch mailbox messages for a specific topic. Returns a [`Stream`].
pub fn fetch_stream(&self, topics: impl Into<Vec<Topic>>) -> FetchMessageStream {
FetchMessageStream::new(self.clone(), topics.into())
}

/// Subscribes on multiple topics to receive messages.
pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
Expand All @@ -156,6 +180,17 @@ impl Client {
EmptyResponseFuture::new(response)
}

/// Fetch mailbox messages for multiple topics.
pub fn batch_fetch(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchFetchMessages> {
let (request, response) = create_request(BatchFetchMessages {
topics: topics.into(),
});

self.request(request);

response
}

/// Opens a connection to the Relay.
pub async fn connect(&self, opts: ConnectionOptions) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -187,7 +222,7 @@ impl Client {
}
}

fn request(&self, request: OutboundRequest) {
pub(crate) fn request(&self, request: OutboundRequest) {
if let Err(err) = self
.control_tx
.send(ConnectionControl::OutboundRequest(request))
Expand Down
104 changes: 104 additions & 0 deletions relay_client/src/client/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use {
crate::{create_request, Client, Error, ResponseFuture},
futures_util::{FutureExt, Stream},
relay_rpc::{
domain::Topic,
rpc::{BatchFetchMessages, SubscriptionData},
},
std::{
pin::Pin,
task::{Context, Poll},
},
};

/// Stream that uses the `irn_batchFetch` RPC method to retrieve messages from
/// the Relay.
pub struct FetchMessageStream {
client: Client,
request: BatchFetchMessages,
batch: Option<std::vec::IntoIter<SubscriptionData>>,
batch_fut: Option<ResponseFuture<BatchFetchMessages>>,
has_more: bool,
}

impl FetchMessageStream {
pub(super) fn new(client: Client, topics: impl Into<Vec<Topic>>) -> Self {
let request = BatchFetchMessages {
topics: topics.into(),
};

Self {
client,
request,
batch: None,
batch_fut: None,
has_more: true,
}
}

/// Clears all internal state so that on the next stream poll it returns
/// `None` and finishes data streaming.
#[inline]
fn clear(&mut self) {
self.batch = None;
self.batch_fut = None;
self.has_more = false;
}
}

impl Stream for FetchMessageStream {
type Item = Result<SubscriptionData, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(batch) = &mut self.batch {
// Drain the items from the batch, if we have one.
match batch.next() {
Some(data) => {
return Poll::Ready(Some(Ok(data)));
}

None => {
// No more items in the batch, fetch the next batch.
self.batch = None;
}
}
} else if let Some(batch_fut) = &mut self.batch_fut {
// Waiting for the next batch to arrive.
match batch_fut.poll_unpin(cx) {
// The next batch is ready. Update `has_more` flag and clear the batch future.
Poll::Ready(Ok(response)) => {
self.batch = Some(response.messages.into_iter());
self.batch_fut = None;
self.has_more = response.has_more;
}

// Error receiving the next batch. This is unrecoverable, so clear the state and
// end the stream.
Poll::Ready(Err(err)) => {
self.clear();

return Poll::Ready(Some(Err(err)));
}

// The batch is not ready yet.
Poll::Pending => {
return Poll::Pending;
}
};
} else if self.has_more {
// We have neither a batch, or a batch future, but `has_more` flag is set. Set
// up a future to receive the next batch.
let (request, batch_fut) = create_request(self.request.clone());

self.client.request(request);
self.batch_fut = Some(batch_fut);
} else {
// The stream can't produce any more items, since it doesn't have neither a
// batch of data or a future for receiving the next batch, and `has_more` flag
// is not set.
return Poll::Ready(None);
}
}
}
}
2 changes: 1 addition & 1 deletion relay_rpc/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub fn encode_auth_token(
let iss = {
let client_id = DecodedClientId(*key.public_key().as_bytes());

format!("{DID_PREFIX}{DID_DELIMITER}{DID_METHOD}{DID_DELIMITER}{client_id}",)
format!("{DID_PREFIX}{DID_DELIMITER}{DID_METHOD}{DID_DELIMITER}{client_id}")
};

let claims = {
Expand Down
126 changes: 112 additions & 14 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ pub static JSON_RPC_VERSION: once_cell::sync::Lazy<Arc<str>> =
/// See <https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/servers/relay/relay-server-rpc.md>
pub const MAX_SUBSCRIPTION_BATCH_SIZE: usize = 500;

/// The maximum number of topics allowed for a batch fetch request.
///
/// See <https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/servers/relay/relay-server-rpc.md>
pub const MAX_FETCH_BATCH_SIZE: usize = 500;

type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Errors covering payload validation problems.
Expand All @@ -35,14 +40,11 @@ pub enum ValidationError {
#[error("Invalid JSON RPC version")]
JsonRpcVersion,

#[error(
"The batch contains too many items. Maximum number of subscriptions is {}",
MAX_SUBSCRIPTION_BATCH_SIZE
)]
BatchSubscriptionLimit,
#[error("The batch contains too many items ({actual}). Maximum number of items is {limit}")]
BatchLimitExceeded { limit: usize, actual: usize },

#[error("The batch contains no items")]
BatchSubscriptionListEmpty,
BatchEmpty,
}

/// Errors caught while processing the request. These are meant to be serialized
Expand Down Expand Up @@ -333,6 +335,42 @@ impl RequestPayload for Unsubscribe {
}
}

/// Data structure representing fetch request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FetchMessages {
/// The topic of the messages to fetch.
pub topic: Topic,
}

impl RequestPayload for FetchMessages {
type Error = GenericError;
type Response = FetchResponse;

fn validate(&self) -> Result<(), ValidationError> {
self.topic
.decode()
.map_err(ValidationError::TopicDecoding)?;

Ok(())
}

fn into_params(self) -> Params {
Params::FetchMessages(self)
}
}

/// Data structure representing fetch response.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
/// Array of messages fetched from the mailbox.
pub messages: Vec<SubscriptionData>,

/// Flag that indicates whether the client should keep fetching the
/// messages.
pub has_more: bool,
}

/// Multi-topic subscription request parameters.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchSubscribe {
Expand All @@ -345,12 +383,17 @@ impl RequestPayload for BatchSubscribe {
type Response = Vec<SubscriptionId>;

fn validate(&self) -> Result<(), ValidationError> {
if self.topics.is_empty() {
return Err(ValidationError::BatchSubscriptionListEmpty);
let batch_size = self.topics.len();

if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}

if self.topics.len() > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchSubscriptionLimit);
if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_SUBSCRIPTION_BATCH_SIZE,
actual: batch_size,
});
}

for topic in &self.topics {
Expand All @@ -377,12 +420,17 @@ impl RequestPayload for BatchUnsubscribe {
type Response = bool;

fn validate(&self) -> Result<(), ValidationError> {
if self.subscriptions.is_empty() {
return Err(ValidationError::BatchSubscriptionListEmpty);
let batch_size = self.subscriptions.len();

if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}

if self.subscriptions.len() > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchSubscriptionLimit);
if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_SUBSCRIPTION_BATCH_SIZE,
actual: batch_size,
});
}

for sub in &self.subscriptions {
Expand All @@ -397,6 +445,43 @@ impl RequestPayload for BatchUnsubscribe {
}
}

/// Data structure representing batch fetch request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BatchFetchMessages {
/// The topics of the messages to fetch.
pub topics: Vec<Topic>,
}

impl RequestPayload for BatchFetchMessages {
type Error = GenericError;
type Response = FetchResponse;

fn validate(&self) -> Result<(), ValidationError> {
let batch_size = self.topics.len();

if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}

if batch_size > MAX_FETCH_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_FETCH_BATCH_SIZE,
actual: batch_size,
});
}

for topic in &self.topics {
topic.decode().map_err(ValidationError::TopicDecoding)?;
}

Ok(())
}

fn into_params(self) -> Params {
Params::BatchFetchMessages(self)
}
}

/// Data structure representing publish request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Publish {
Expand Down Expand Up @@ -550,6 +635,10 @@ pub enum Params {
#[serde(rename = "irn_unsubscribe", alias = "iridium_unsubscribe")]
Unsubscribe(Unsubscribe),

/// Parameters to fetch.
#[serde(rename = "irn_fetchMessages", alias = "iridium_fetchMessages")]
FetchMessages(FetchMessages),

/// Parameters to batch subscribe.
#[serde(rename = "irn_batchSubscribe", alias = "iridium_batchSubscribe")]
BatchSubscribe(BatchSubscribe),
Expand All @@ -558,6 +647,13 @@ pub enum Params {
#[serde(rename = "irn_batchUnsubscribe", alias = "iridium_batchUnsubscribe")]
BatchUnsubscribe(BatchUnsubscribe),

/// Parameters to batch fetch.
#[serde(
rename = "irn_batchFetchMessages",
alias = "iridium_batchFetchMessages"
)]
BatchFetchMessages(BatchFetchMessages),

/// Parameters to publish.
#[serde(rename = "irn_publish", alias = "iridium_publish")]
Publish(Publish),
Expand Down Expand Up @@ -603,8 +699,10 @@ impl Request {
match &self.params {
Params::Subscribe(params) => params.validate(),
Params::Unsubscribe(params) => params.validate(),
Params::FetchMessages(params) => params.validate(),
Params::BatchSubscribe(params) => params.validate(),
Params::BatchUnsubscribe(params) => params.validate(),
Params::BatchFetchMessages(params) => params.validate(),
Params::Publish(params) => params.validate(),
Params::Subscription(params) => params.validate(),
}
Expand Down
Loading

0 comments on commit b454c5b

Please sign in to comment.