Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for Command filter #168

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ readme = "README.md"
name = "hello"
crate-type = ["cdylib"]

[[example]]
name = "command_filter"
crate-type = ["cdylib"]

[[example]]
name = "keys_pos"
crate-type = ["cdylib"]
Expand Down
40 changes: 40 additions & 0 deletions examples/command_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#[macro_use]
extern crate redis_module;

use redis_module::{CommandFilterContext, RedisString};
use std::time::SystemTime;

fn filter(ctx: &CommandFilterContext) {
// Prints every command to the console
let cmd = ctx.args_get(0).unwrap();
eprint!("{} ", cmd);
let count = ctx.args_count();
for index in 1..count {
eprint!("{} ", ctx.args_get(index).unwrap());
}
eprintln!("");

// Add time field for every Hash
if let Ok("HSET") = cmd.try_as_str() {
oshadmi marked this conversation as resolved.
Show resolved Hide resolved
ctx.args_insert(count, RedisString::create(std::ptr::null_mut(), "__TIME__"));
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
ctx.args_insert(
count + 1,
RedisString::create(std::ptr::null_mut(), &format!("{}", now.as_millis())),
);
}
}

//////////////////////////////////////////////////////

redis_module! {
name: "command_filter",
version: 1,
data_types: [],
commands: [],
filters:[
[filter, 0],
]
}
65 changes: 65 additions & 0 deletions src/commandfilter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::raw;
use crate::RedisString;
use crate::Status;
use std::os::raw::c_int;
use std::ptr;

pub struct CommandFilterContext {
pub ctx: *mut raw::RedisModuleCommandFilterCtx,
}

impl CommandFilterContext {
pub fn new(ctx: *mut raw::RedisModuleCommandFilterCtx) -> Self {
CommandFilterContext { ctx }
}

pub fn args_count(&self) -> usize {
unsafe { raw::RedisModule_CommandFilterArgsCount.unwrap()(self.ctx) as usize }
}

pub fn args_get(&self, pos: usize) -> Option<RedisString> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is inconsistent with the Redis APIs: "args" vs "arg". Although the APIs are themselves a bit inconsistently named, let's at least not add even more to the confusion and use the same names here.

In any case, it's probably best to wrap this low-level API wrapper with a higher level idiomatic Rust API (e.g. providing an iterator over the args), but that's not urgent.

let arg = unsafe {
raw::RedisModule_CommandFilterArgGet.unwrap()(self.ctx, pos as c_int)
as *mut raw::RedisModuleString
};
if arg.is_null() {
None
} else {
Some(RedisString::new(ptr::null_mut(), arg))
}
}

pub fn args_insert(&self, pos: usize, arg: RedisString) -> Status {
// retain arg since RedisModule_CommandFilterArgInsert going to release it too
raw::string_retain_string(std::ptr::null_mut(), arg.inner);
gkorland marked this conversation as resolved.
Show resolved Hide resolved
let status = unsafe {
raw::RedisModule_CommandFilterArgInsert.unwrap()(self.ctx, pos as c_int, arg.inner)
.into()
};

// If the string wasn't inserted we have to release it ourself
if status == Status::Err {
unsafe { raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), arg.inner) };
}
status
}

pub fn args_replace(&self, pos: usize, arg: RedisString) -> Status {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The insert and replace functions are mostly identical; it may make sense to unify the common code (freeing the strings etc.)

// retain arg since RedisModule_CommandFilterArgReplace going to release it too
raw::string_retain_string(std::ptr::null_mut(), arg.inner);
gkorland marked this conversation as resolved.
Show resolved Hide resolved
let status = unsafe {
raw::RedisModule_CommandFilterArgReplace.unwrap()(self.ctx, pos as c_int, arg.inner)
.into()
};

// If the string wasn't replaced we have to release it ourself
if status == Status::Err {
unsafe { raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), arg.inner) };
}
status
}

pub fn args_delete(&self, pos: usize) -> Status {
unsafe { raw::RedisModule_CommandFilterArgDelete.unwrap()(self.ctx, pos as c_int).into() }
}
}
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ mod redismodule;
pub mod redisraw;
pub mod redisvalue;

#[cfg(feature = "experimental-api")]
mod commandfilter;
mod context;
pub mod key;
pub mod logging;
mod macros;

#[cfg(feature = "experimental-api")]
pub use crate::commandfilter::CommandFilterContext;
#[cfg(feature = "experimental-api")]
pub use crate::context::blocked::BlockedClient;
#[cfg(feature = "experimental-api")]
pub use crate::context::thread_safe::{DetachedFromClient, ThreadSafeContext};
pub use crate::context::Context;
#[cfg(feature = "experimental-api")]
pub use crate::raw::NotifyEvent;

pub use crate::context::Context;
pub use crate::raw::Status;
pub use crate::redismodule::*;

Expand Down
37 changes: 37 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ macro_rules! redis_event_handler {
}};
}

#[cfg(feature = "experimental-api")]
#[macro_export]
macro_rules! redis_commnad_filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo:

Suggested change
macro_rules! redis_commnad_filter {
macro_rules! redis_command_filter {

($ctx:expr,
$filter_handler:expr,
$filter_flags:expr) => {{
/////////////////////
extern "C" fn __do_filter(ctx: *mut $crate::raw::RedisModuleCommandFilterCtx) {
let context = $crate::CommandFilterContext::new(ctx);
$filter_handler(&context);
}
/////////////////////
if unsafe {
$crate::raw::RedisModule_RegisterCommandFilter.unwrap()(
$ctx,
Some(__do_filter),
$filter_flags as c_int,
)
} == std::ptr::null_mut()
{
return $crate::raw::Status::Err as c_int;
}
}};
}

#[macro_export]
macro_rules! redis_module {
(
Expand All @@ -102,6 +127,12 @@ macro_rules! redis_module {
$keystep:expr
]),* $(,)*
] $(,)*
$(filters: [
$([
$filter_handler:expr,
$filter_flags:expr
]),* $(,)*
] $(,)*)?
$(event_handlers: [
$([
$(@$event_type:ident) +:
Expand Down Expand Up @@ -162,6 +193,12 @@ macro_rules! redis_module {
redis_command!(ctx, $name, $command, $flags, $firstkey, $lastkey, $keystep);
)*

$(
$(
redis_commnad_filter!(ctx, $filter_handler, $filter_flags);
)*
)?

$(
$(
redis_event_handler!(ctx, $(raw::NotifyEvent::$event_type |)+ raw::NotifyEvent::empty(), $event_handler);
Expand Down
2 changes: 1 addition & 1 deletion src/redismodule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
}

#[inline]
fn next_i64(&mut self) -> Result<i64, RedisError> {
fn next_i64(&mut self) -> Result<i64, RedisError> {
self.next()
.map_or(Err(RedisError::WrongArity), |v| v.parse_integer())
}
Expand Down
4 changes: 2 additions & 2 deletions src/redisvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ mod tests {

#[test]
fn from_vec() {
let v : Vec<u8> = vec![0,3,5,21,255];
let v: Vec<u8> = vec![0, 3, 5, 21, 255];
assert_eq!(
RedisValue::from(v),
RedisValue::StringBuffer(vec![0,3,5,21,255])
RedisValue::StringBuffer(vec![0, 3, 5, 21, 255])
);
}

Expand Down
25 changes: 25 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,28 @@ fn test_keys_pos() -> Result<()> {

Ok(())
}

#[cfg(feature = "experimental-api")]
#[test]
fn test_command_filter() -> Result<()> {
let _guards = vec![start_redis_server_with_module("command_filter", 6481)
.with_context(|| "failed to start redis server")?];
let mut con =
get_redis_connection(6481).with_context(|| "failed to connect to redis server")?;

let res: i32 = redis::cmd("HSET")
.arg(&["mykey", "first", "Don"])
.query(&mut con)
.with_context(|| "failed to run HSET")?;
assert_eq!(res, 2);

let res: Vec<String> = redis::cmd("HGETALL")
.arg(&["mykey"])
.query(&mut con)
.with_context(|| "failed to run HSET")?;

assert_eq!(res.len(), 4);
assert_eq!(res[0..3], ["first", "Don", "__TIME__"]);

Ok(())
}