Skip to content

Commit

Permalink
Python: Use RESP3 by default, allow RESP2 opt-in. (#731)
Browse files Browse the repository at this point in the history
* Add more type conversions and commands.

* Python: Use RESP3 by default, allow RESP2 opt-in.

---------

Co-authored-by: nihohit <[email protected]>
  • Loading branch information
shachlanAmazon and nihohit authored Dec 31, 2023
1 parent 078068b commit 724edc9
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 46 deletions.
20 changes: 17 additions & 3 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ enum ExpectedReturnType {
Map,
Double,
Boolean,
Set,
}

fn convert_to_expected_type(
Expand Down Expand Up @@ -148,19 +149,32 @@ fn convert_to_expected_type(
)
.into()),
},
ExpectedReturnType::Set => match value {
Value::Nil => Ok(value),
Value::Set(_) => Ok(value),
Value::Array(array) => Ok(Value::Set(array)),
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to set",
format!("(response was {:?})", value),
)
.into()),
},
ExpectedReturnType::Double => Ok(Value::Double(from_redis_value::<f64>(&value)?.into())),
ExpectedReturnType::Boolean => Ok(Value::Boolean(from_redis_value::<bool>(&value)?)),
}
}

fn expected_type_for_cmd(cmd: &redis::Cmd) -> Option<ExpectedReturnType> {
let command = cmd.arg_idx(0)?;
match command {
b"HGETALL" | b"XREAD" => Some(ExpectedReturnType::Map),
let command = cmd.command()?;

match command.as_slice() {
b"HGETALL" | b"XREAD" | b"CONFIG GET" | b"HELLO" => Some(ExpectedReturnType::Map),
b"INCRBYFLOAT" | b"HINCRBYFLOAT" => Some(ExpectedReturnType::Double),
b"HEXISTS" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT" => {
Some(ExpectedReturnType::Boolean)
}
b"SMEMBERS" => Some(ExpectedReturnType::Set),
_ => None,
}
}
Expand Down
16 changes: 8 additions & 8 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import List, Mapping, Optional, cast
from typing import Dict, List, Mapping, Optional, cast

from glide.async_commands.core import CoreCommands, InfoSection
from glide.async_commands.transaction import BaseTransaction, ClusterTransaction
Expand Down Expand Up @@ -159,7 +159,7 @@ async def ping(

async def config_get(
self, parameters: List[str], route: Optional[Route] = None
) -> TClusterResponse[List[str]]:
) -> TClusterResponse[Dict[str, str]]:
"""Get the values of configuration parameters.
See https://redis.io/commands/config-get/ for details.
Expand All @@ -170,19 +170,19 @@ async def config_get(
in which case the client will route the command to the nodes defined by `route`.
Returns:
TClusterResponse[List[str]]: A list of values corresponding to the
TClusterResponse[Dict[str, str]]: A dictionary of values corresponding to the
configuration parameters.
When specifying a route other than a single node, response will be : {Address (str) : response (List[str]) , ... }
with type of Dict[str, List[str]].
When specifying a route other than a single node, response will be : {Address (str) : response (Dict[str, str]) , ... }
with type of Dict[str, Dict[str, str]].
Examples:
>>> await client.config_get(["timeout"] , RandomNode())
['timeout', '1000']
{'timeout': '1000'}
>>> await client.config_get(["timeout" , "maxmemory"])
['timeout', '1000', "maxmemory", "1GB"]
{'timeout': '1000', "maxmemory": "1GB"}
"""
return cast(
TClusterResponse[List[str]],
TClusterResponse[Dict[str, str]],
await self._execute_command(RequestType.ConfigGet, parameters, route),
)

Expand Down
9 changes: 5 additions & 4 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Mapping,
Optional,
Protocol,
Set,
Tuple,
Type,
Union,
Expand Down Expand Up @@ -732,23 +733,23 @@ async def srem(self, key: str, members: List[str]) -> int:
"""
return cast(int, await self._execute_command(RequestType.SRem, [key] + members))

async def smembers(self, key: str) -> List[str]:
async def smembers(self, key: str) -> Set[str]:
"""Retrieve all the members of the set value stored at `key`.
See https://redis.io/commands/smembers/ for details.
Args:
key (str): The key from which to retrieve the set members.
Returns:
List[str]: A list of all members of the set.
Set[str]: A set of all members of the set.
If `key` does not exist an empty list will be returned.
If `key` holds a value that is not a set, an error is returned.
Examples:
>>> await client.smembers("my_set")
["member1", "member2", "member3"]
{"member1", "member2", "member3"}
"""
return cast(List[str], await self._execute_command(RequestType.SMembers, [key]))
return cast(Set[str], await self._execute_command(RequestType.SMembers, [key]))

async def scard(self, key: str) -> int:
"""Retrieve the set cardinality (number of elements) of the set stored at `key`.
Expand Down
17 changes: 9 additions & 8 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import List, Mapping, Optional, cast
from typing import Dict, List, Mapping, Optional, cast

from glide.async_commands.core import CoreCommands, InfoSection
from glide.async_commands.transaction import BaseTransaction, Transaction
Expand Down Expand Up @@ -120,25 +120,26 @@ async def ping(self, message: Optional[str] = None) -> str:
argument = [] if message is None else [message]
return cast(str, await self._execute_command(RequestType.Ping, argument))

async def config_get(self, parameters: List[str]) -> List[str]:
async def config_get(self, parameters: List[str]) -> Dict[str, str]:
"""Get the values of configuration parameters.
See https://redis.io/commands/config-get/ for details.
Args:
parameters (List[str]): A list of configuration parameter names to retrieve values for.
Returns:
List[str]: A list of values corresponding to the configuration parameters.
Dict[str, str]: A dictionary of values corresponding to the configuration parameters.
Examples:
>>> config_get(["timeout"])
["timeout", "1000"]
>>> config_get(["timeout", "maxmemory"])
["timeout", "1000", "maxmemory", "1GB"]
>>> await client.config_get(["timeout"] , RandomNode())
{'timeout': '1000'}
>>> await client.config_get(["timeout" , "maxmemory"])
{'timeout': '1000', "maxmemory": "1GB"}
"""
return cast(
List[str], await self._execute_command(RequestType.ConfigGet, parameters)
Dict[str, str],
await self._execute_command(RequestType.ConfigGet, parameters),
)

async def config_set(self, parameters_map: Mapping[str, str]) -> TOK:
Expand Down
25 changes: 24 additions & 1 deletion python/python/glide/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ class ReadFrom(Enum):
"""


class ProtocolVersion(Enum):
"""
Represents the communication protocol with the server.
"""

RESP2 = SentProtocolVersion.RESP2
"""
Communicate using Redis RESP2.
"""
RESP3 = SentProtocolVersion.RESP3
"""
Communicate using Redis RESP3.
"""


class BackoffStrategy:
def __init__(self, num_of_retries: int, factor: int, exponent_base: int):
"""
Expand Down Expand Up @@ -84,6 +99,7 @@ def __init__(
read_from: ReadFrom = ReadFrom.PRIMARY,
request_timeout: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
):
"""
Represents the configuration settings for a Redis client.
Expand Down Expand Up @@ -115,6 +131,7 @@ def __init__(
self.read_from = read_from
self.request_timeout = request_timeout
self.client_name = client_name
self.protocol = protocol

def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
Expand Down Expand Up @@ -144,7 +161,7 @@ def _create_a_protobuf_conn_request(
request.authentication_info.password = self.credentials.password
if self.client_name:
request.client_name = self.client_name
request.protocol = SentProtocolVersion.RESP2
request.protocol = self.protocol.value

return request

Expand Down Expand Up @@ -175,6 +192,7 @@ class RedisClientConfiguration(BaseClientConfiguration):
If not set, a default backoff strategy will be used.
database_id (Optional[Int]): index of the logical database to connect to.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the Redis RESP protocol to communicate with the server.
"""

def __init__(
Expand All @@ -187,6 +205,7 @@ def __init__(
reconnect_strategy: Optional[BackoffStrategy] = None,
database_id: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
):
super().__init__(
addresses=addresses,
Expand All @@ -195,6 +214,7 @@ def __init__(
read_from=read_from,
request_timeout=request_timeout,
client_name=client_name,
protocol=protocol,
)
self.reconnect_strategy = reconnect_strategy
self.database_id = database_id
Expand Down Expand Up @@ -238,6 +258,7 @@ class ClusterClientConfiguration(BaseClientConfiguration):
This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries.
If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the Redis RESP protocol to communicate with the server.
Notes:
Currently, the reconnection strategy in cluster mode is not configurable, and exponential backoff
Expand All @@ -252,6 +273,7 @@ def __init__(
read_from: ReadFrom = ReadFrom.PRIMARY,
request_timeout: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
):
super().__init__(
addresses=addresses,
Expand All @@ -260,4 +282,5 @@ def __init__(
read_from=read_from,
request_timeout=request_timeout,
client_name=client_name,
protocol=protocol,
)
6 changes: 4 additions & 2 deletions python/python/glide/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, TypeVar, Union
from typing import Dict, List, Literal, Set, TypeVar, Union

from glide.protobuf.connection_request_pb2 import ConnectionRequest
from glide.protobuf.redis_request_pb2 import RedisRequest
Expand All @@ -9,7 +9,9 @@
# Typing
T = TypeVar("T")
TOK = Literal["OK"]
TResult = Union[TOK, str, List[str], List[List[str]], int, None, Dict[str, T], float]
TResult = Union[
TOK, str, List[str], List[List[str]], int, None, Dict[str, T], float, Set[T]
]
TRequest = Union[RedisRequest, ConnectionRequest]
# When routing to a single node, response will be T
# Otherwise, response will be : {Address : response , ... } with type of Dict[str, T].
Expand Down
7 changes: 5 additions & 2 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from glide.config import (
ClusterClientConfiguration,
NodeAddress,
ProtocolVersion,
RedisClientConfiguration,
RedisCredentials,
)
Expand Down Expand Up @@ -92,8 +93,7 @@ def pytest_sessionfinish(session, exitstatus):

@pytest.fixture()
async def redis_client(
request,
cluster_mode: bool,
request, cluster_mode: bool
) -> AsyncGenerator[TRedisClient, None]:
"Get async socket client for tests"
client = await create_client(request, cluster_mode)
Expand All @@ -108,6 +108,7 @@ async def create_client(
database_id: int = 0,
addresses: Optional[List[NodeAddress]] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
) -> Union[RedisClient, RedisClusterClient]:
# Create async socket client
use_tls = request.config.getoption("--tls")
Expand All @@ -120,6 +121,7 @@ async def create_client(
use_tls=use_tls,
credentials=credentials,
client_name=client_name,
protocol=protocol,
)
return await RedisClusterClient.create(cluster_config)
else:
Expand All @@ -132,5 +134,6 @@ async def create_client(
credentials=credentials,
database_id=database_id,
client_name=client_name,
protocol=protocol,
)
return await RedisClient.create(config)
37 changes: 29 additions & 8 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import string
import time
from datetime import datetime, timedelta, timezone
from typing import Dict, List, TypeVar, Union
from typing import Dict, List, TypeVar, Union, cast

import pytest
from glide import ClosingError, RequestError, TimeoutError
Expand All @@ -16,7 +16,7 @@
ExpiryType,
InfoSection,
)
from glide.config import RedisCredentials
from glide.config import ProtocolVersion, RedisCredentials
from glide.constants import OK
from glide.redis_client import RedisClient, RedisClusterClient, TRedisClient
from glide.routes import (
Expand Down Expand Up @@ -258,6 +258,24 @@ async def test_socket_set_get(self, redis_client: TRedisClient):
assert await redis_client.set(key, value) == OK
assert await redis_client.get(key) == value

@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_use_resp3_protocol(self, redis_client: TRedisClient):
result = cast(Dict[str, str], await redis_client.custom_command(["HELLO"]))

assert int(result["proto"]) == 3

@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_allow_opt_in_to_resp2_protocol(self, cluster_mode, request):
redis_client = await create_client(
request,
cluster_mode,
protocol=ProtocolVersion.RESP2,
)

result = cast(Dict[str, str], await redis_client.custom_command(["HELLO"]))

assert int(result["proto"]) == 2

@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_conditional_set(self, redis_client: TRedisClient):
key = get_random_string(10)
Expand Down Expand Up @@ -334,7 +352,7 @@ async def test_request_error_raises_exception(self, redis_client: TRedisClient):
await redis_client.custom_command(["HSET", key, "1", "bar"])
assert "WRONGTYPE" in str(e)

@pytest.mark.parametrize("cluster_mode", [False, True])
@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_info_server_replication(self, redis_client: TRedisClient):
sections = [InfoSection.SERVER, InfoSection.REPLICATION]
info = get_first_result(await redis_client.info(sections))
Expand Down Expand Up @@ -497,11 +515,14 @@ async def test_ping(self, redis_client: TRedisClient):
async def test_config_get_set(self, redis_client: TRedisClient):
previous_timeout = await redis_client.config_get(["timeout"])
assert await redis_client.config_set({"timeout": "1000"}) == OK
assert await redis_client.config_get(["timeout"]) == ["timeout", "1000"]
assert await redis_client.config_get(["timeout"]) == {"timeout": "1000"}
# revert changes to previous timeout
assert isinstance(previous_timeout, list)
assert isinstance(previous_timeout[-1], str)
assert await redis_client.config_set({"timeout": previous_timeout[-1]}) == OK
assert isinstance(previous_timeout, dict)
assert isinstance(previous_timeout["timeout"], str)
assert (
await redis_client.config_set({"timeout": previous_timeout["timeout"]})
== OK
)

@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_decr_decrby_existing_key(self, redis_client: TRedisClient):
Expand Down Expand Up @@ -719,7 +740,7 @@ async def test_sadd_srem_smembers_scard_non_existing_key(
non_existing_key = get_random_string(10)
assert await redis_client.srem(non_existing_key, ["member"]) == 0
assert await redis_client.scard(non_existing_key) == 0
assert await redis_client.smembers(non_existing_key) == []
assert await redis_client.smembers(non_existing_key) == set()

@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_sadd_srem_smembers_scard_wrong_type_raise_error(
Expand Down
Loading

0 comments on commit 724edc9

Please sign in to comment.