Skip to content

Commit

Permalink
Wipe expired messages
Browse files Browse the repository at this point in the history
  • Loading branch information
p1gp1g committed Dec 19, 2024
1 parent 39a6d09 commit 7404e34
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 13 deletions.
1 change: 1 addition & 0 deletions autopush-common/src/db/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server
/// `autopush/channels/{uaid}` List to store the list of the channels of the user
/// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user
/// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independant expiry date
/// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store
/// the content of the messages
/// `autopush/topic/{uaid}/{chid}/{topic}` String to store the (last) message id of a given topic
Expand Down
103 changes: 90 additions & 13 deletions autopush-common/src/db/redis/redis_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl RedisClientImpl {
format!("autopush/msgs/{}", uaid.as_hyphenated())
}

fn message_exp_list_key(&self, uaid: &Uuid) -> String {
format!("autopush/msgs_exp/{}", uaid.as_hyphenated())
}

fn message_key(&self, uaid: &Uuid, chidmessageid: &str) -> String {
format!("autopush/msg/{}/{}", uaid.as_hyphenated(), chidmessageid)
}
Expand Down Expand Up @@ -192,11 +196,13 @@ impl DbClient for RedisClientImpl {
let co_key = self.last_co_key(&uaid);
let chan_list_key = self.channel_list_key(&uaid);
let msg_list_key = self.message_list_key(&uaid);
let exp_list_key = self.message_exp_list_key(&uaid);
redis::pipe()
.del(&user_key)
.del(&co_key)
.del(&chan_list_key)
.del(&msg_list_key)
.del(&exp_list_key)
.exec(&mut con)
.unwrap();
Ok(())
Expand Down Expand Up @@ -287,6 +293,7 @@ impl DbClient for RedisClientImpl {
async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
let mut con = self.connection()?;
let msg_list_key = self.message_list_key(&uaid);
let exp_list_key = self.message_exp_list_key(&uaid);
let msg_key = self.message_key(&uaid, &message.chidmessageid());
// message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL)
// see autoendpoint/src/extractors/notification_headers.rs
Expand All @@ -300,14 +307,11 @@ impl DbClient for RedisClientImpl {

// Remember, `timestamp` is effectively the time to kill the message, not the
// current time.
let expiry = SystemTime::now() + Duration::from_secs(message.ttl);
trace!(
"🉑 Message Expiry {}",
expiry
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
let expiry = (SystemTime::now() + Duration::from_secs(message.ttl))
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
trace!("🉑 Message Expiry {}", expiry);

let mut pipe = redis::pipe();

Expand All @@ -318,8 +322,11 @@ impl DbClient for RedisClientImpl {
// If a message is already stored for that topic, we remove it
if let Some(id) = old_msg_id {
trace!("🉑 The topic had a message: {}", &id);
pipe.zrem(&msg_list_key, &id)
.del(self.message_key(&uaid, &id));
// We remove the id from the exp list at the end, to be sure
// it can't be removed from the list before the message is removed
pipe.del(self.message_key(&uaid, &id))
.zrem(&msg_list_key, &id)
.zrem(&exp_list_key, &id);
}
// Setting the key replace the old one if any
pipe.set_options(&topic_key, &message.chidmessageid(), opts);
Expand All @@ -339,6 +346,7 @@ impl DbClient for RedisClientImpl {
)
// The function [fecth_timestamp_messages] takes a timestamp in input,
// here we use the timestamp of the record (in ms)
.zadd(&exp_list_key, &msg_id, expiry)
.zadd(&msg_list_key, &msg_id, ms_since_epoch());

let _: () = pipe.exec(&mut con).unwrap();
Expand All @@ -362,8 +370,22 @@ impl DbClient for RedisClientImpl {
Ok(())
}

/// Doesn't seem to be useful for redis
async fn increment_storage(&self, _uaid: &Uuid, _timestamp: u64) -> DbResult<()> {
/// Delete expired messages
async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
debug!("🉑🔥 Incrementing storage to {}", timestamp);
let msg_list_key = self.message_list_key(&uaid);
let exp_list_key = self.message_exp_list_key(&uaid);
let mut con = self.connection()?;
let exp_id_list: Vec<String> = con.zrangebyscore(&exp_list_key, 0, timestamp).unwrap();
if exp_id_list.len() > 0 {
trace!("🉑🔥 Deleting {} expired msgs", exp_id_list.len());
redis::pipe()
.del(&exp_id_list)
.zrem(&msg_list_key, &exp_id_list)
.zrem(&exp_list_key, &exp_id_list)
.exec(&mut con)
.unwrap();
}
Ok(())
}

Expand All @@ -376,11 +398,15 @@ impl DbClient for RedisClientImpl {
);
let msg_key = self.message_key(&uaid, &chidmessageid);
let msg_list_key = self.message_list_key(&uaid);
let exp_list_key = self.message_exp_list_key(&uaid);
debug!("🉑🔥 Deleting message {}", &msg_key);
let mut con = self.connection()?;
// We remove the id from the exp list at the end, to be sure
// it can't be removed from the list before the message is removed
redis::pipe()
.zrem(&msg_list_key, &chidmessageid)
.del(&msg_key)
.zrem(&msg_list_key, &chidmessageid)
.zrem(&exp_list_key, &chidmessageid)
.exec(&mut con)
.unwrap();
self.metrics
Expand Down Expand Up @@ -533,6 +559,57 @@ mod tests {
assert!(result.unwrap());
}

/// Test if [increment_storage] correctly wipe expired messages
#[actix_rt::test]
async fn wipe_expired() -> DbResult<()> {
init_test_logging();
let client = new_client()?;

let connected_at = ms_since_epoch();

let uaid = Uuid::parse_str(TEST_USER).unwrap();
let chid = Uuid::parse_str(TEST_CHID).unwrap();

let node_id = "test_node".to_owned();

// purge the user record if it exists.
let _ = client.remove_user(&uaid).await;

let test_user = User {
uaid,
router_type: "webpush".to_owned(),
connected_at,
router_data: None,
node_id: Some(node_id.clone()),
..Default::default()
};

// purge the old user (if present)
// in case a prior test failed for whatever reason.
let _ = client.remove_user(&uaid).await;

// can we add the user?
let timestamp = now();
let fetch_timestamp = ms_since_epoch();
client.add_user(&test_user).await?;
let test_notification = crate::db::Notification {
channel_id: chid,
version: "test".to_owned(),
ttl: 1,
timestamp,
data: Some("Encrypted".into()),
sortkey_timestamp: Some(timestamp),
..Default::default()
};
client.save_message(&uaid, test_notification).await?;
client
.increment_storage(&uaid, fetch_timestamp + 10000)
.await?;
let msgs = client.fetch_timestamp_messages(&uaid, None, 999).await?;
assert_eq!(msgs.messages.len(), 0);
Ok(())
}

/// run a gauntlet of testing. These are a bit linear because they need
/// to run in sequence.
#[actix_rt::test]
Expand Down

0 comments on commit 7404e34

Please sign in to comment.