Skip to content

Commit

Permalink
Added AZAffinity to Node.js
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Awawdi <[email protected]>

az NODE tests

Signed-off-by: Muhammad Awawdi <[email protected]>

CR fixes

Signed-off-by: Muhammad Awawdi <[email protected]>

CR change

Signed-off-by: Muhammad Awawdi <[email protected]>

Modified tests to dynamically pull the replicas number, and added more replicas to cluster config

Signed-off-by: Muhammad Awawdi <[email protected]>

updated number of replicas for the cluster, dropped the part for CMD as its irrelevant

Signed-off-by: Muhammad Awawdi <[email protected]>

prettier

Signed-off-by: Muhammad Awawdi <[email protected]>

Created a new cluster for AzAffinity in CME, and added Standalone tests for CMD

Signed-off-by: Muhammad Awawdi <[email protected]>
  • Loading branch information
Muhammad-awawdi-amazon committed Nov 19, 2024
1 parent b8dc6a5 commit ad8f145
Show file tree
Hide file tree
Showing 3 changed files with 700 additions and 15 deletions.
15 changes: 15 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,19 @@ export interface BaseClientConfiguration {
* used.
*/
inflightRequestsLimit?: number;
/**
* Availability Zone of the client.
* This setting ensures that read operations are directed to nodes within the specified AZ.
* If not set, the AZAffinity strategy will not be applied, and read operations will follow the selected ReadFrom strategy without AZ-based routing.
*
* @example
* ```typescript
* // Example configuration for setting client availability zone and read strategy
* configuration.clientAz = 'us-east-1a'; // Sets the client's availability zone
* configuration.readFrom = 'AZAffinity'; // Directs read operations to nodes within the same AZ
* ```
*/
clientAz?: string;
}

/**
Expand Down Expand Up @@ -719,6 +732,7 @@ export class BaseClient {
private readonly pubsubFutures: [PromiseFunction, ErrorFunction][] = [];
private pendingPushNotification: response.Response[] = [];
private readonly inflightRequestsLimit: number;
private readonly clientAz: string | undefined;
private config: BaseClientConfiguration | undefined;

protected configurePubsub(
Expand Down Expand Up @@ -7578,6 +7592,7 @@ export class BaseClient {
readFrom,
authenticationInfo,
inflightRequestsLimit: options.inflightRequestsLimit,
clientAz: options.clientAz ?? null,
};
}

Expand Down
247 changes: 247 additions & 0 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
GlideString,
ListDirection,
ProtocolVersion,
ReadFrom,
RequestError,
Script,
Transaction,
Expand Down Expand Up @@ -1500,7 +1501,253 @@ describe("GlideClient", () => {
}
},
);
describe("GlideClient - AZAffinity Read Strategy Test", () => {
it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"should route GET commands to all replicas with the same AZ using protocol %p",
async (protocol) => {
const az = "us-east-1a";
const GET_CALLS = 3;
const get_cmdstat = `cmdstat_get:calls=${GET_CALLS}`;

let client_for_config_set;
let client_for_testing_az;

try {
// Stage 1: Configure nodes
client_for_config_set = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
),
);

// Skip test if version is below 8.0.0
if (cluster.checkIfServerVersionLessThan("8.0.0")) {
console.log(
"Skipping test: requires Valkey 8.0.0 or higher",
);
return;
}

await client_for_config_set.customCommand([
"CONFIG",
"RESETSTAT",
]);
await client_for_config_set.customCommand([
"CONFIG",
"SET",
"availability-zone",
az,
]);

// Stage 2: Create AZ affinity client and verify configuration
client_for_testing_az = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
{
readFrom: "AZAffinity" as ReadFrom,
clientAz: az,
},
),
);

const azs = await client_for_testing_az.customCommand([
"CONFIG",
"GET",
"availability-zone",
]);

if (Array.isArray(azs) && azs.length > 0) {
const configItem = azs[0] as {
key: string;
value: string;
};

if (
configItem &&
configItem.key === "availability-zone"
) {
expect(configItem.value).toBe(az);
} else {
throw new Error("Invalid config item format");
}
} else {
throw new Error(
"Unexpected response format from CONFIG GET command",
);
}

// Stage 3: Set test data and perform GET operations
await client_for_testing_az.set("foo", "testvalue");

for (let i = 0; i < GET_CALLS; i++) {
await client_for_testing_az.get("foo");
}

// Stage 4: Verify GET commands were routed correctly
const info_result =
await client_for_testing_az.customCommand([
"INFO",
"COMMANDSTATS",
]);

if (
typeof info_result === "string" &&
info_result.includes(get_cmdstat)
) {
expect(info_result).toContain(get_cmdstat);
} else {
throw new Error(
"Unexpected response format from INFO command in standalone mode",
);
}
} finally {
// Cleanup
await client_for_config_set?.close();
await client_for_testing_az?.close();
}
},
);
});
describe("GlideClient - AZAffinity Routing to 1 replica", () => {
it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"should route commands to single replica with AZ using protocol %p",
async (protocol) => {
const az = "us-east-1a";
const GET_CALLS = 3;
const get_cmdstat = `calls=${GET_CALLS}`;

let client_for_config_set;
let client_for_testing_az;

try {
// Stage 1: Configure nodes
client_for_config_set = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
),
);

// Skip test if version is below 8.0.0
if (cluster.checkIfServerVersionLessThan("8.0.0")) {
console.log(
"Skipping test: requires Valkey 8.0.0 or higher",
);
return;
}

await client_for_config_set.customCommand([
"CONFIG",
"SET",
"availability-zone",
az,
]);

await client_for_config_set.customCommand([
"CONFIG",
"RESETSTAT",
]);

// Stage 2: Create AZ affinity client and verify configuration
client_for_testing_az = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
{
readFrom: "AZAffinity",
clientAz: az,
},
),
);
await client_for_testing_az.set("foo", "testvalue");

for (let i = 0; i < GET_CALLS; i++) {
await client_for_testing_az.get("foo");
}

// Stage 4: Verify GET commands were routed correctly
const info_result =
await client_for_testing_az.customCommand([
"INFO",
"ALL",
]);

if (typeof info_result === "string") {
expect(info_result).toContain(get_cmdstat);
expect(info_result).toContain(
`availability_zone:${az}`,
);
} else {
throw new Error(
"Unexpected response format from INFO COMMANDSTATS command",
);
}
} finally {
await client_for_config_set?.close();
await client_for_testing_az?.close();
}
},
);
});
describe("GlideClient - AZAffinity with Non-existing AZ", () => {
it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"should route commands to a replica when AZ does not exist using protocol %p",
async (protocol) => {
const GET_CALLS = 4;
const get_cmdstat = `cmdstat_get:calls=${GET_CALLS}`;

let client_for_testing_az;

try {
if (cluster.checkIfServerVersionLessThan("8.0.0")) {
console.log(
"Skipping test: requires Valkey 8.0.0 or higher",
);
return;
}

client_for_testing_az = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
{
readFrom: "AZAffinity",
clientAz: "non-existing-az",
requestTimeout: 2000,
},
),
);

await client_for_testing_az.customCommand([
"CONFIG",
"RESETSTAT",
]);

for (let i = 0; i < GET_CALLS; i++) {
await client_for_testing_az.get("foo");
}

const info_result =
await client_for_testing_az.customCommand([
"INFO",
"COMMANDSTATS",
]);

if (typeof info_result === "string") {
expect(info_result).toContain(get_cmdstat);
} else {
throw new Error(
"Unexpected response format from INFO command",
);
}
} finally {
await client_for_testing_az?.close();
}
},
);
});
runBaseTests({
init: async (protocol, configOverrides) => {
const config = getClientConfigurationOption(
Expand Down
Loading

0 comments on commit ad8f145

Please sign in to comment.