Skip to content

Commit

Permalink
Refactor cluster readiness check and enhance scan error handling in t…
Browse files Browse the repository at this point in the history
…ests

Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Dec 22, 2024
1 parent 0bf56e3 commit e0b2e32
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions python/python/tests/test_scan.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@

import asyncio
from typing import AsyncGenerator, List, cast

import pytest
from glide import AllNodes, AllPrimaries, ByAddressRoute
from glide import ByAddressRoute
from glide.async_commands.command_args import ObjectType
from glide.config import ProtocolVersion
from glide.exceptions import RequestError
Expand All @@ -18,18 +17,40 @@
async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool:
# we allow max 20 seconds to get the nodes
timeout = 20
start = asyncio.get_event_loop().time()
start_time = asyncio.get_event_loop().time()

while True:
if asyncio.get_event_loop().time() - start > timeout:
if asyncio.get_event_loop().time() - start_time > timeout:
return False
nodes_raw = await client.custom_command(["CLUSTER", "NODES"])
node_bytes_raw = cast(bytes, nodes_raw)
parsed_nodes = [
line for line in node_bytes_raw.decode().split("\n") if line.strip()
]
if len(parsed_nodes) == count:
break
await asyncio.sleep(1)

cluster_info = await client.custom_command(["CLUSTER", "INFO"])
cluster_info_map = {}

if cluster_info:
info_str = (
cluster_info
if isinstance(cluster_info, str)
else (
cluster_info.decode()
if isinstance(cluster_info, bytes)
else str(cluster_info)
)
)
cluster_info_lines = info_str.split("\n")
cluster_info_lines = [line for line in cluster_info_lines if line]

for line in cluster_info_lines:
key, value = line.split(":")
cluster_info_map[key.strip()] = value.strip()

if (
cluster_info_map.get("cluster_state") == "ok"
and int(cluster_info_map.get("cluster_known_nodes", "0")) == count
):
break

await asyncio.sleep(2)

return True


Expand Down Expand Up @@ -64,7 +85,6 @@ async def glide_client_scoped(
True,
valkey_cluster=function_scoped_cluster,
protocol=protocol,
timeout=100,
)
assert isinstance(client, GlideClusterClient)
yield client
Expand Down Expand Up @@ -319,11 +339,8 @@ async def test_cluster_scan_non_covered_slots(
glide_client_scoped: GlideClusterClient,
):
key = get_random_string(10)
for i in range(10000):
try:
await glide_client_scoped.set(f"{key}:{i}", "value")
except RequestError as e:
continue
for i in range(1000):
await glide_client_scoped.set(f"{key}:{i}", "value")
cursor = ClusterScanCursor()
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
Expand All @@ -343,16 +360,21 @@ async def test_cluster_scan_non_covered_slots(
)
# now we let it few seconds gossip to get the new cluster configuration
await is_cluster_ready(glide_client_scoped, len(all_other_addresses))
# Iterate scan to get missing slots error
# Iterate scan until error is returned, as it might take time for the inner core to forget the missing node
cursor = ClusterScanCursor()
with pytest.raises(RequestError) as e_info:
while not cursor.is_finished():
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
assert (
"Could not find an address covering a slot, SCAN operation cannot continue"
in str(e_info.value)
)
while True:
try:
while not cursor.is_finished():
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
# Reset cursor for next iteration
cursor = ClusterScanCursor()
except RequestError as e_info:
assert (
"Could not find an address covering a slot, SCAN operation cannot continue"
in str(e_info)
)
break
# Scan with allow_non_covered_slots=True
while not cursor.is_finished():
result = await glide_client_scoped.scan(
Expand Down

0 comments on commit e0b2e32

Please sign in to comment.