Skip to content

Commit

Permalink
Node allow uncovered slots cscan (#2815)
Browse files Browse the repository at this point in the history
* addresed comments

Signed-off-by: avifenesh <[email protected]>

* Go: add allow_non_covered_slots to ClusterScan and related commands

Signed-off-by: avifenesh <[email protected]>

* feat: Implement continuous slot scanning until next unscanned slot

Signed-off-by: jhpung <[email protected]>

* fix: improve slot scanning logic when address not found

Signed-off-by: jhpung <[email protected]>

* add allowNonCoveredSlots option to ScanOptions and update GlideClusterClient

Signed-off-by: avifenesh <[email protected]>

* test: add tests for GlideClusterClient scan with allowNonCoveredSlots option

Signed-off-by: avifenesh <[email protected]>

* refactor: enhance cluster readiness check and improve error handling in scan tests

Signed-off-by: avifenesh <[email protected]>

---------

Signed-off-by: avifenesh <[email protected]>
Signed-off-by: jhpung <[email protected]>
Co-authored-by: jhpung <[email protected]>
  • Loading branch information
avifenesh and jhpung authored Dec 22, 2024
1 parent 96d974d commit 210ba24
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node, Python: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815))
* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
* Go: Add HINCRBYFLOAT command ([#2846](https://github.com/valkey-io/valkey-glide/pull/2846))
* Go: Add SUNIONSTORE command ([#2805](https://github.com/valkey-io/valkey-glide/pull/2805))
Expand Down
8 changes: 8 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import eslint from "@eslint/js";
import prettierConfig from "eslint-config-prettier";
import tseslint from "typescript-eslint";
import jsdoc from "eslint-plugin-jsdoc";

export default tseslint.config(
eslint.configs.recommended,
Expand Down Expand Up @@ -54,6 +55,13 @@ export default tseslint.config(
next: "*",
},
],
"@typescript-eslint/indent": ["error", 4, {
"SwitchCase": 1,
"ObjectExpression": 1,
"FunctionDeclaration": {"parameters": "first"},
"FunctionExpression": {"parameters": "first"},
"ignoredNodes": ["TSTypeParameterInstantiation"]
}],
},
},
prettierConfig,
Expand Down
1 change: 1 addition & 0 deletions node/DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ In order to run these tests, use:
```bash
npm run test-modules -- --cluster-endpoints=<address>:<port>
```
Note: these tests don't run with standalone server as of now.

### REPL (interactive shell)
Expand Down
13 changes: 13 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3857,6 +3857,19 @@ export interface ScanOptions extends BaseScanOptions {
type?: ObjectType;
}

/**
* Options for the SCAN command.
* `match`: The match filter is applied to the result of the command and will only include keys that match the pattern specified.
* `count`: `COUNT` is a just a hint for the command for how many elements to fetch from the server, the default is 10.
* `type`: The type of the object to scan.
* Types are the data types of Valkey: `string`, `list`, `set`, `zset`, `hash`, `stream`.
* `allowNonCoveredSlots`: If true, the scan will keep scanning even if slots are not covered by the cluster.
* By default, the scan will stop if slots are not covered by the cluster.
*/
export interface ClusterScanOptions extends ScanOptions {
allowNonCoveredSlots?: boolean;
}

/**
* Options specific to the ZSCAN command, extending from the base scan options.
*/
Expand Down
17 changes: 9 additions & 8 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
FunctionStatsSingleResponse,
InfoOptions,
LolwutOptions,
ScanOptions,
ClusterScanOptions,
createClientGetName,
createClientId,
createConfigGet,
Expand Down Expand Up @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration {
/**
* Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}.
*
* Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks
* Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks
* and Pub/Sub subscription settings.
*
* @remarks
Expand Down Expand Up @@ -579,7 +579,7 @@ export class GlideClusterClient extends BaseClient {
*/
protected scanOptionsToProto(
cursor: string,
options?: ScanOptions,
options?: ClusterScanOptions,
): command_request.ClusterScan {
const command = command_request.ClusterScan.create();
command.cursor = cursor;
Expand All @@ -596,6 +596,7 @@ export class GlideClusterClient extends BaseClient {
command.objectType = options.type;
}

command.allowNonCoveredSlots = options?.allowNonCoveredSlots ?? false;
return command;
}

Expand All @@ -604,7 +605,7 @@ export class GlideClusterClient extends BaseClient {
*/
protected createClusterScanPromise(
cursor: ClusterScanCursor,
options?: ScanOptions & DecoderOption,
options?: ClusterScanOptions & DecoderOption,
): Promise<[ClusterScanCursor, GlideString[]]> {
// separate decoder option from scan options
const { decoder, ...scanOptions } = options || {};
Expand Down Expand Up @@ -633,7 +634,7 @@ export class GlideClusterClient extends BaseClient {
*
* @param cursor - The cursor object that wraps the scan state.
* To start a new scan, create a new empty `ClusterScanCursor` using {@link ClusterScanCursor}.
* @param options - (Optional) The scan options, see {@link ScanOptions} and {@link DecoderOption}.
* @param options - (Optional) The scan options, see {@link ClusterScanOptions} and {@link DecoderOption}.
* @returns A Promise resolving to an array containing the next cursor and an array of keys,
* formatted as [`ClusterScanCursor`, `string[]`].
*
Expand All @@ -651,14 +652,14 @@ export class GlideClusterClient extends BaseClient {
* console.log(allKeys); // ["key1", "key2", "key3"]
*
* // Iterate over keys matching a pattern
* await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "notMykey", value: "value3"}, {key: "somethingElse", value: "value4"}]);
* await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "notMyKey", value: "value3"}, {key: "somethingElse", value: "value4"}]);
* let cursor = new ClusterScanCursor();
* const matchedKeys: GlideString[] = [];
* while (!cursor.isFinished()) {
* const [cursor, keys] = await client.scan(cursor, { match: "*key*", count: 10 });
* matchedKeys.push(...keys);
* }
* console.log(matchedKeys); // ["key1", "key2", "notMykey"]
* console.log(matchedKeys); // ["key1", "key2", "notMyKey"]
*
* // Iterate over keys of a specific type
* await client.mset([{key: "key1", value: "value1"}, {key: "key2", value: "value2"}, {key: "key3", value: "value3"}]);
Expand All @@ -674,7 +675,7 @@ export class GlideClusterClient extends BaseClient {
*/
public async scan(
cursor: ClusterScanCursor,
options?: ScanOptions & DecoderOption,
options?: ClusterScanOptions & DecoderOption,
): Promise<[ClusterScanCursor, GlideString[]]> {
return this.createClusterScanPromise(cursor, options);
}
Expand Down
162 changes: 162 additions & 0 deletions node/tests/ScanTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
GlideString,
ObjectType,
ProtocolVersion,
GlideClusterClientConfiguration,
} from "..";
import { ValkeyCluster } from "../../utils/TestUtils.js";
import {
flushAndCloseClient,
getClientConfigurationOption,
getServerVersion,
parseEndpoints,
waitForClusterReady as isClusterReadyWithExpectedNodeCount,
} from "./TestUtilities";

const TIMEOUT = 50000;
Expand Down Expand Up @@ -376,6 +378,166 @@ describe("Scan GlideClusterClient", () => {
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with allowNonCoveredSlots %p`,
async (protocol) => {
const testCluster = await ValkeyCluster.createCluster(
true,
3,
0,
getServerVersion,
);
const config: GlideClusterClientConfiguration = {
addresses: testCluster
.getAddresses()
.map(([host, port]) => ({ host, port })),
protocol,
};
const testClient = await GlideClusterClient.createClient(config);

try {
for (let i = 0; i < 10000; i++) {
const result = await testClient.set(`${uuidv4()}`, "value");
expect(result).toBe("OK");
}

// Perform an initial scan to ensure all works as expected
let cursor = new ClusterScanCursor();
let result = await testClient.scan(cursor);
cursor = result[0];
expect(cursor.isFinished()).toBe(false);

// Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots
await testClient.configSet({
"cluster-require-full-coverage": "no",
});

// Forget one server to simulate a node failure
const addresses = testCluster.getAddresses();
const addressToForget = addresses[0];
const allOtherAddresses = addresses.slice(1);
const idToForget = await testClient.customCommand(
["CLUSTER", "MYID"],
{
route: {
type: "routeByAddress",
host: addressToForget[0],
port: addressToForget[1],
},
},
);

for (const address of allOtherAddresses) {
await testClient.customCommand(
["CLUSTER", "FORGET", idToForget as string],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
}

// Wait for the cluster to stabilize after forgetting a node
const ready = await isClusterReadyWithExpectedNodeCount(
testClient,
allOtherAddresses.length,
);
expect(ready).toBe(true);

// Attempt to scan without 'allowNonCoveredSlots', expecting an error
// Since it might take time for the inner core to forget the missing node,
// we retry the scan until the expected error is thrown.

const maxRetries = 10;
let retries = 0;
let errorReceived = false;

while (retries < maxRetries && !errorReceived) {
retries++;
cursor = new ClusterScanCursor();

try {
while (!cursor.isFinished()) {
result = await testClient.scan(cursor);
cursor = result[0];
}

// If scan completes without error, wait and retry
await new Promise((resolve) =>
setTimeout(resolve, 1000),
);
} catch (error) {
if (
error instanceof Error &&
error.message.includes(
"Could not find an address covering a slot, SCAN operation cannot continue",
)
) {
// Expected error occurred
errorReceived = true;
} else {
// Unexpected error, rethrow
throw error;
}
}
}

expect(errorReceived).toBe(true);

// Perform scan with 'allowNonCoveredSlots: true'
cursor = new ClusterScanCursor();

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
cursor = result[0];
}

expect(cursor.isFinished()).toBe(true);

// Get keys using 'KEYS *' from the remaining nodes
const keys: GlideString[] = [];

for (const address of allOtherAddresses) {
const result = await testClient.customCommand(
["KEYS", "*"],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
keys.push(...(result as GlideString[]));
}

// Scan again with 'allowNonCoveredSlots: true' and collect results
cursor = new ClusterScanCursor();
const results: GlideString[] = [];

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
results.push(...result[1]);
cursor = result[0];
}

// Compare the sets of keys obtained from 'KEYS *' and 'SCAN'
expect(new Set(results)).toEqual(new Set(keys));
} finally {
testClient.close();
await testCluster.close();
}
},
TIMEOUT,
);
});

//standalone tests
Expand Down
45 changes: 45 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,51 @@ function intoArrayInternal(obj: any, builder: string[]) {
}
}

// The function is used to check if the cluster is ready with the count nodes known command using the client supplied.
// The way it works is by parsing the response of the CLUSTER INFO command and checking if the cluster_state is ok and the cluster_known_nodes is equal to the count.
// If so, we know the cluster is ready, and it has the amount of nodes we expect.
export async function waitForClusterReady(
client: GlideClusterClient,
count: number,
): Promise<boolean> {
const timeout = 20000; // 20 seconds timeout in milliseconds
const startTime = Date.now();

while (true) {
if (Date.now() - startTime > timeout) {
return false;
}

const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]);
// parse the response
const clusterInfoMap = new Map<string, string>();

if (clusterInfo) {
const clusterInfoLines = clusterInfo
.toString()
.split("\n")
.filter((line) => line.length > 0);

for (const line of clusterInfoLines) {
const [key, value] = line.split(":");

clusterInfoMap.set(key.trim(), value.trim());
}

if (
clusterInfoMap.get("cluster_state") == "ok" &&
Number(clusterInfoMap.get("cluster_known_nodes")) == count
) {
break;
}
}

await new Promise((resolve) => setTimeout(resolve, 2000));
}

return true;
}

/**
* accept any variable `v` and convert it into String, recursively
*/
Expand Down
12 changes: 7 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
{
"devDependencies": {
"@eslint/js": "^9.10.0",
"@eslint/js": "9.17.0",
"@types/eslint__js": "^8.42.3",
"@types/eslint-config-prettier": "^6.11.3",
"eslint": "9.14.0",
"eslint": "9.17.0",
"eslint-config-prettier": "^9.1.0",
"prettier": "^3.3.3",
"typescript": "^5.6.2",
"typescript-eslint": "^8.13"
"eslint-plugin-jsdoc": "^50.6.1",
"prettier": "3.4.2",
"prettier-eslint": "16.3.0",
"typescript": "5.7.2",
"typescript-eslint": "8.18.1"
}
}
Loading

0 comments on commit 210ba24

Please sign in to comment.