From e0b2e32ed9785a66483671822e057aab095c3a87 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sun, 22 Dec 2024 11:25:54 +0000 Subject: [PATCH] Refactor cluster readiness check and enhance scan error handling in tests Signed-off-by: avifenesh --- python/python/tests/test_scan.py | 76 ++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/python/python/tests/test_scan.py b/python/python/tests/test_scan.py index 87e025cedc..69d5243aaf 100644 --- a/python/python/tests/test_scan.py +++ b/python/python/tests/test_scan.py @@ -1,9 +1,8 @@ - import asyncio from typing import AsyncGenerator, List, cast import pytest -from glide import AllNodes, AllPrimaries, ByAddressRoute +from glide import ByAddressRoute from glide.async_commands.command_args import ObjectType from glide.config import ProtocolVersion from glide.exceptions import RequestError @@ -18,18 +17,40 @@ async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool: # we allow max 20 seconds to get the nodes timeout = 20 - start = asyncio.get_event_loop().time() + start_time = asyncio.get_event_loop().time() + while True: - if asyncio.get_event_loop().time() - start > timeout: + if asyncio.get_event_loop().time() - start_time > timeout: return False - nodes_raw = await client.custom_command(["CLUSTER", "NODES"]) - node_bytes_raw = cast(bytes, nodes_raw) - parsed_nodes = [ - line for line in node_bytes_raw.decode().split("\n") if line.strip() - ] - if len(parsed_nodes) == count: - break - await asyncio.sleep(1) + + cluster_info = await client.custom_command(["CLUSTER", "INFO"]) + cluster_info_map = {} + + if cluster_info: + info_str = ( + cluster_info + if isinstance(cluster_info, str) + else ( + cluster_info.decode() + if isinstance(cluster_info, bytes) + else str(cluster_info) + ) + ) + cluster_info_lines = info_str.split("\n") + cluster_info_lines = [line for line in cluster_info_lines if line] + + for line in cluster_info_lines: + key, value = line.split(":") + cluster_info_map[key.strip()] = value.strip() + + if ( + cluster_info_map.get("cluster_state") == "ok" + and int(cluster_info_map.get("cluster_known_nodes", "0")) == count + ): + break + + await asyncio.sleep(2) + return True @@ -64,7 +85,6 @@ async def glide_client_scoped( True, valkey_cluster=function_scoped_cluster, protocol=protocol, - timeout=100, ) assert isinstance(client, GlideClusterClient) yield client @@ -319,11 +339,8 @@ async def test_cluster_scan_non_covered_slots( glide_client_scoped: GlideClusterClient, ): key = get_random_string(10) - for i in range(10000): - try: - await glide_client_scoped.set(f"{key}:{i}", "value") - except RequestError as e: - continue + for i in range(1000): + await glide_client_scoped.set(f"{key}:{i}", "value") cursor = ClusterScanCursor() result = await glide_client_scoped.scan(cursor) cursor = cast(ClusterScanCursor, result[0]) @@ -343,16 +360,21 @@ async def test_cluster_scan_non_covered_slots( ) # now we let it few seconds gossip to get the new cluster configuration await is_cluster_ready(glide_client_scoped, len(all_other_addresses)) - # Iterate scan to get missing slots error + # Iterate scan until error is returned, as it might take time for the inner core to forget the missing node cursor = ClusterScanCursor() - with pytest.raises(RequestError) as e_info: - while not cursor.is_finished(): - result = await glide_client_scoped.scan(cursor) - cursor = cast(ClusterScanCursor, result[0]) - assert ( - "Could not find an address covering a slot, SCAN operation cannot continue" - in str(e_info.value) - ) + while True: + try: + while not cursor.is_finished(): + result = await glide_client_scoped.scan(cursor) + cursor = cast(ClusterScanCursor, result[0]) + # Reset cursor for next iteration + cursor = ClusterScanCursor() + except RequestError as e_info: + assert ( + "Could not find an address covering a slot, SCAN operation cannot continue" + in str(e_info) + ) + break # Scan with allow_non_covered_slots=True while not cursor.is_finished(): result = await glide_client_scoped.scan(