Skip to content

Commit

Permalink
grpc-js: Pass channel args to LB policies with updates
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Nov 20, 2024
1 parent f621dc6 commit 1fa6d92
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 163 deletions.
9 changes: 5 additions & 4 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import { ChannelOptions } from '@grpc/grpc-js';

grpc_xds.register();

Expand Down Expand Up @@ -88,7 +89,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper, options: grpc.ChannelOptions) {
constructor(channelControlHelper: ChannelControlHelper) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (connectivityState, picker) => {
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
Expand All @@ -97,14 +98,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
}
exitIdle(): void {
this.child.exitIdle();
Expand Down
19 changes: 11 additions & 8 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
import { AGGREGATE_CLUSTER_BACKWARDS_COMPAT, EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { XDS_CONFIG_KEY } from './resolver-xds';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -91,6 +92,8 @@ export function localityToName(locality: Locality__Output) {
return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`;
}

export const ROOT_CLUSTER_KEY = 'grpc.internal.root_cluster';

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;

Expand All @@ -99,8 +102,8 @@ export class CdsLoadBalancer implements LoadBalancer {
private priorityNames: string[] = [];
private nextPriorityChildNumber = 0;

constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options);
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}

private getNextPriorityName(cluster: string) {
Expand All @@ -110,14 +113,14 @@ export class CdsLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const clusterName = lbConfig.getCluster();
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
Expand Down Expand Up @@ -165,7 +168,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()}));
return;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...attributes, rootCluster: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
Expand All @@ -180,8 +183,8 @@ export class CdsLoadBalancer implements LoadBalancer {
if (clusterConfig.cluster.type === 'EDS') {
endpointPickingPolicy = clusterConfig.cluster.lbPolicyConfig;
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof attributes.rootCluster === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(attributes.rootCluster);
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
Expand Down Expand Up @@ -279,7 +282,7 @@ export class CdsLoadBalancer implements LoadBalancer {
return;
}
trace(JSON.stringify(typedChildConfig.toJsonObject(), undefined, 2));
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, attributes);
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, options);
}
}
exitIdle(): void {
Expand Down
14 changes: 7 additions & 7 deletions packages/grpc-js-xds/src/load-balancer-priority.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
this.parent.channelControlHelper.requestReresolution();
}
}
}), parent.options);
}));
this.picker = new QueuePicker(this.childBalancer);
this.startFailoverTimer();
}
Expand Down Expand Up @@ -307,7 +307,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* The attributes object from the latest update, saved to be passed along to
* each new child as they are created
*/
private latestAttributes: { [key: string]: unknown } = {};
private latestOptions: ChannelOptions = {};
/**
* The latest load balancing policies and address lists for each child from
* the latest update
Expand All @@ -323,7 +323,7 @@ export class PriorityLoadBalancer implements LoadBalancer {

private updatesPaused = false;

constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private updateState(state: ConnectivityState, picker: Picker) {
trace(
Expand Down Expand Up @@ -392,7 +392,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
child.updateAddressList(
childUpdate.subchannelAddress,
childUpdate.lbConfig,
this.latestAttributes
this.latestOptions
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
Expand Down Expand Up @@ -431,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
Expand Down Expand Up @@ -467,7 +467,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
childAddressList.push(childAddress);
}
this.latestAttributes = attributes;
this.latestOptions = options;
this.latestUpdates.clear();
this.priorities = lbConfig.getPriorities();
this.updatesPaused = true;
Expand All @@ -486,7 +486,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
existingChild.updateAddressList(
childAddresses,
childConfig.config,
attributes
options
);
}
}
Expand Down
22 changes: 10 additions & 12 deletions packages/grpc-js-xds/src/load-balancer-ring-hash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ class RingHashLoadBalancer implements LoadBalancer {
private updatesPaused = false;
private currentState: connectivityState = connectivityState.IDLE;
private ring: RingEntry[] = [];
private ringHashSizeCap = DEFAULT_RING_SIZE_CAP;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {
constructor(private channelControlHelper: ChannelControlHelper) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
Expand Down Expand Up @@ -254,9 +253,6 @@ class RingHashLoadBalancer implements LoadBalancer {
},
}
);
if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) {
this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'];
}
}

private calculateAndUpdateState() {
Expand Down Expand Up @@ -316,7 +312,8 @@ class RingHashLoadBalancer implements LoadBalancer {

private constructRing(
endpointList: Endpoint[],
config: RingHashLoadBalancingConfig
config: RingHashLoadBalancingConfig,
ringHashSizeCap: number
) {
this.ring = [];
const endpointWeights: EndpointWeight[] = [];
Expand All @@ -336,8 +333,8 @@ class RingHashLoadBalancer implements LoadBalancer {
minNormalizedWeight
);
}
const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap);
const minRingSize = Math.min(config.getMinRingSize(), ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), ringHashSizeCap);
/* Calculate a scale factor that meets the following conditions:
* 1. The result is between minRingSize and maxRingSize, inclusive
* 2. The smallest normalized weight is scaled to a whole number, if it
Expand Down Expand Up @@ -390,7 +387,7 @@ class RingHashLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand All @@ -403,11 +400,11 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const endpoint of endpointList) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint);
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
);
}
const weight = this.leafWeightMap.get(endpoint);
Expand All @@ -420,8 +417,9 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const leaf of removedLeaves) {
leaf.destroy();
}
const ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'] ?? DEFAULT_RING_SIZE_CAP
loadXxhashApi().then(() => {
this.constructRing(dedupedEndpointList, lbConfig);
this.constructRing(dedupedEndpointList, lbConfig, ringHashSizeCap);
this.updatesPaused = false;
this.calculateAndUpdateState();
});
Expand Down
12 changes: 6 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-weighted-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));

this.picker = new QueuePicker(this.childBalancer);
}
Expand All @@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}

updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down Expand Up @@ -243,7 +243,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
private targetList: string[] = [];
private updatesPaused = false;

constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private maybeUpdateState() {
if (!this.updatesPaused) {
Expand Down Expand Up @@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker);
}

updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand Down Expand Up @@ -365,7 +365,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, attributes);
target.updateAddressList(targetEndpoints, targetConfig, options);
}

// Deactivate targets that are not in the new config
Expand Down
13 changes: 7 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
import { XDS_CLIENT_KEY, XDS_CONFIG_KEY } from "./resolver-xds";

const TRACER_NAME = 'xds_cluster_impl';

Expand Down Expand Up @@ -211,7 +212,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
private xdsClient: XdsClient | null = null;
private latestClusterConfig: ClusterConfig | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
Expand Down Expand Up @@ -248,15 +249,15 @@ class XdsClusterImplBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
}
}), options);
}));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
Expand All @@ -281,7 +282,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.latestClusterConfig = clusterConfig;
this.xdsClient = attributes.xdsClient as XdsClient;
this.xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
if (clusterConfig.cluster.lrsLoadReportingServer) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
clusterConfig.cluster.lrsLoadReportingServer,
Expand All @@ -290,7 +291,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}

this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down
12 changes: 6 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class XdsClusterManager implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));

this.picker = new QueuePicker(this.childBalancer);
}
Expand All @@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer {
this.picker = picker;
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
this.childBalancer.updateAddressList(endpointList, childConfig, attributes);
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand All @@ -167,7 +167,7 @@ class XdsClusterManager implements LoadBalancer {
// Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private maybeUpdateState() {
if (!this.updatesPaused) {
Expand Down Expand Up @@ -207,7 +207,7 @@ class XdsClusterManager implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap));
}

updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand All @@ -234,7 +234,7 @@ class XdsClusterManager implements LoadBalancer {
child = new this.XdsClusterManagerChildImpl(this, name);
this.children.set(name, child);
}
child.updateAddressList(endpointList, childConfig, attributes);
child.updateAddressList(endpointList, childConfig, options);
}
this.updatesPaused = false;
this.updateState();
Expand Down
Loading

0 comments on commit 1fa6d92

Please sign in to comment.