Skip to content

Commit

Permalink
Some cleaning for unnecessary code to set the floor for next PR
Browse files Browse the repository at this point in the history
Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Oct 20, 2024
1 parent 56b4401 commit b6629b0
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 183 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"node/rust-client/Cargo.toml",
"logger_core/Cargo.toml",
"csharp/lib/Cargo.toml",
"submodules/redis-rs/Cargo.toml",
"glide-core/redis-rs/Cargo.toml",
"benchmarks/rust/Cargo.toml",
"java/Cargo.toml"
],
Expand Down
31 changes: 24 additions & 7 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,42 @@ authors = ["Valkey GLIDE Maintainers"]
[dependencies]
bytes = "1"
futures = "^0.3"
redis = { path = "./redis-rs/redis", features = ["aio", "tokio-comp", "tokio-rustls-comp", "connection-manager","cluster", "cluster-async"] }
redis = { path = "./redis-rs/redis", features = [
"aio",
"tokio-comp",
"tokio-rustls-comp",
"connection-manager",
"cluster",
"cluster-async",
] }
tokio = { version = "1", features = ["macros", "time"] }
logger_core = {path = "../logger_core"}
logger_core = { path = "../logger_core" }
dispose = "0.5.0"
tokio-util = {version = "^0.7", features = ["rt"], optional = true}
tokio-util = { version = "^0.7", features = ["rt"], optional = true }
num_cpus = { version = "^1.15", optional = true }
tokio-retry = "0.3.0"
protobuf = { version= "3", features = ["bytes", "with-bytes"], optional = true }
protobuf = { version = "3", features = [
"bytes",
"with-bytes",
], optional = true }
integer-encoding = { version = "4.0.0", optional = true }
thiserror = "1"
rand = { version = "0.8.5" }
futures-intrusive = "0.5.0"
directories = { version = "4.0", optional = true }
once_cell = "1.18.0"
arcstr = "1.1.5"
sha1_smol = "1.0.0"
nanoid = "0.4.0"
async-trait = { version = "0.1.24" }

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util"]
socket-layer = [
"directories",
"integer-encoding",
"num_cpus",
"protobuf",
"tokio-util",
]
standalone_heartbeat = []

[dev-dependencies]
Expand All @@ -45,7 +60,9 @@ ctor = "0.2.2"
redis = { path = "./redis-rs/redis", features = ["tls-rustls-insecure"] }
iai-callgrind = "0.9"
tokio = { version = "1", features = ["rt-multi-thread"] }
glide-core = { path = ".", features = ["socket-layer"] } # always enable this feature in tests.
glide-core = { path = ".", features = [
"socket-layer",
] } # always enable this feature in tests.

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(standalone_heartbeat)'] }
Expand Down
41 changes: 33 additions & 8 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@ dashmap = { version = "6.0", optional = true }
async-trait = { version = "0.1.24", optional = true }

# Only needed for tokio support
backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = ["tokio"] }
backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = [
"tokio",
] }

# Only needed for native tls
native-tls = { version = "0.2", optional = true }
tokio-native-tls = { version = "0.3", optional = true }
async-native-tls = { version = "0.4", optional = true }

# Only needed for rustls
rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.26", optional = true }
rustls-native-certs = { version = "0.7", optional = true }
tokio-rustls = { version = "0.25", optional = true }
futures-rustls = { version = "0.25", optional = true }
rustls-pemfile = { version = "2", optional = true }
rustls-pki-types = { version = "1", optional = true }

Expand All @@ -98,7 +98,6 @@ num-bigint = "0.4.4"
ahash = { version = "0.8.11", optional = true }

tracing = "0.1"
arcstr = "1.1.5"

# Optional uuid support
uuid = { version = "1.6.1", optional = true }
Expand All @@ -114,16 +113,34 @@ default = [
"tokio-rustls-comp",
"connection-manager",
"cluster",
"cluster-async"
"cluster-async",
]
acl = []
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio", "async-trait", "fast-math", "dispose"]
aio = [
"bytes",
"pin-project-lite",
"futures-util",
"futures-util/alloc",
"futures-util/sink",
"tokio/io-util",
"tokio-util",
"tokio-util/codec",
"combine/tokio",
"async-trait",
"fast-math",
"dispose",
]
geospatial = []
json = ["serde", "serde/derive", "serde_json"]
cluster = ["crc16", "rand", "derivative"]
script = ["sha1_smol"]
tls-native-tls = ["native-tls"]
tls-rustls = ["rustls", "rustls-native-certs", "rustls-pemfile", "rustls-pki-types"]
tls-rustls = [
"rustls",
"rustls-native-certs",
"rustls-pemfile",
"rustls-pki-types",
]
tls-rustls-insecure = ["tls-rustls"]
tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"]
tokio-comp = ["aio", "tokio/net", "backoff-tokio"]
Expand Down Expand Up @@ -154,7 +171,12 @@ futures-time = "3"
criterion = "0.4"
partial-io = { version = "0.5", features = ["tokio", "quickcheck1"] }
quickcheck = "1.0.3"
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread", "time"] }
tokio = { version = "1", features = [
"rt",
"macros",
"rt-multi-thread",
"time",
] }
tempfile = "=3.6.0"
once_cell = "1"
anyhow = "1"
Expand Down Expand Up @@ -225,3 +247,6 @@ required-features = ["connection-manager"]
[[example]]
name = "streams"
required-features = ["streams"]

[package.metadata.cargo-machete]
ignored = ["strum"]
32 changes: 0 additions & 32 deletions glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,6 @@ fn test() {
assert_sync::<Connection>();
}

impl<C> Connection<C> {
pub(crate) fn map<D>(self, f: impl FnOnce(C) -> D) -> Connection<D> {
let Self {
con,
buf,
decoder,
db,
pubsub,
protocol,
} = self;
Connection {
con: f(con),
buf,
decoder,
db,
pubsub,
protocol,
}
}
}

impl<C> Connection<C>
where
C: Unpin + AsyncRead + AsyncWrite + Send,
Expand Down Expand Up @@ -190,17 +169,6 @@ where
}
}

pub(crate) async fn connect<C>(
connection_info: &ConnectionInfo,
socket_addr: Option<SocketAddr>,
) -> RedisResult<Connection<C>>
where
C: Unpin + RedisRuntime + AsyncRead + AsyncWrite + Send,
{
let (con, _ip) = connect_simple::<C>(connection_info, socket_addr).await?;
Connection::new(&connection_info.redis, con).await
}

impl<C> ConnectionLike for Connection<C>
where
C: Unpin + AsyncRead + AsyncWrite + Send,
Expand Down
102 changes: 0 additions & 102 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,6 @@ impl Client {
crate::aio::Connection::new(&self.connection_info.redis, con).await
}

/// Returns an async connection from the client.
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
#[deprecated(
note = "aio::Connection is deprecated. Use client::get_multiplexed_tokio_connection instead."
)]
#[allow(deprecated)]
pub async fn get_tokio_connection(&self) -> RedisResult<crate::aio::Connection> {
use crate::aio::RedisRuntime;
Ok(
crate::aio::connect::<crate::aio::tokio::Tokio>(&self.connection_info, None)
.await?
.map(RedisRuntime::boxed),
)
}

/// Returns an async connection from the client.
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
Expand Down Expand Up @@ -245,53 +229,6 @@ impl Client {
.await
}

/// Returns an async multiplexed connection from the client and a future which must be polled
/// to drive any requests submitted to it (see `get_multiplexed_tokio_connection`).
///
/// A multiplexed connection can be cloned, allowing requests to be be sent concurrently
/// on the same underlying connection (tcp/unix socket).
/// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub async fn create_multiplexed_tokio_connection_with_response_timeout(
&self,
response_timeout: std::time::Duration,
glide_connection_options: GlideConnectionOptions,
) -> RedisResult<(
crate::aio::MultiplexedConnection,
impl std::future::Future<Output = ()>,
)> {
self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
response_timeout,
None,
glide_connection_options,
)
.await
.map(|(conn, driver, _ip)| (conn, driver))
}

/// Returns an async multiplexed connection from the client and a future which must be polled
/// to drive any requests submitted to it (see `get_multiplexed_tokio_connection`).
///
/// A multiplexed connection can be cloned, allowing requests to be be sent concurrently
/// on the same underlying connection (tcp/unix socket).
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub async fn create_multiplexed_tokio_connection(
&self,
glide_connection_options: GlideConnectionOptions,
) -> RedisResult<(
crate::aio::MultiplexedConnection,
impl std::future::Future<Output = ()>,
)> {
self.create_multiplexed_tokio_connection_with_response_timeout(
std::time::Duration::MAX,
glide_connection_options,
)
.await
.map(|conn_res| (conn_res.0, conn_res.1))
}

/// Returns an async [`ConnectionManager`][connection-manager] from the client.
///
/// The connection manager wraps a
Expand Down Expand Up @@ -375,45 +312,6 @@ impl Client {
.await
}

/// Returns an async [`ConnectionManager`][connection-manager] from the client.
///
/// The connection manager wraps a
/// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
/// connection fails with a connection error, then a new connection is
/// established in the background and the error is returned to the caller.
///
/// This means that on connection loss at least one command will fail, but
/// the connection will be re-established automatically if possible. Please
/// refer to the [`ConnectionManager`][connection-manager] docs for
/// detailed reconnecting behavior.
///
/// A connection manager can be cloned, allowing requests to be be sent concurrently
/// on the same underlying connection (tcp/unix socket).
///
/// [connection-manager]: aio/struct.ConnectionManager.html
/// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
#[cfg(feature = "connection-manager")]
#[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
#[deprecated(note = "use get_connection_manager_with_backoff_and_timeouts instead")]
pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
&self,
exponent_base: u64,
factor: u64,
number_of_retries: usize,
response_timeout: std::time::Duration,
connection_timeout: std::time::Duration,
) -> RedisResult<crate::aio::ConnectionManager> {
crate::aio::ConnectionManager::new_with_backoff_and_timeouts(
self.clone(),
exponent_base,
factor,
number_of_retries,
response_timeout,
connection_timeout,
)
.await
}

/// Returns an async [`ConnectionManager`][connection-manager] from the client.
///
/// The connection manager wraps a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ mod tests {
}

fn create_container_with_strategy(
stragey: ReadFromReplicaStrategy,
strategy: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
Expand Down Expand Up @@ -409,7 +409,7 @@ mod tests {
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: stragey,
read_from_replica_strategy: strategy,
topology_hash: 0,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
Connect,
};
use crate::{
aio::{ConnectionLike, DisconnectNotifier, Runtime},
aio::{ConnectionLike, DisconnectNotifier},
client::GlideConnectionOptions,
cluster::get_connection_info,
cluster_client::ClusterParams,
Expand Down Expand Up @@ -462,9 +462,7 @@ async fn check_connection<C>(conn: &mut C, timeout: std::time::Duration) -> Redi
where
C: ConnectionLike + Send + 'static,
{
Runtime::locate()
.timeout(timeout, crate::cmd("PING").query_async::<_, String>(conn))
.await??;
tokio::time::timeout(timeout, crate::cmd("PING").query_async::<_, String>(conn)).await??;
Ok(())
}

Expand Down
Loading

0 comments on commit b6629b0

Please sign in to comment.