From 86e62f2426e95a723040f7bc0652bc3f57982730 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 19 Nov 2024 15:03:18 -0800 Subject: [PATCH] grpc-js: Pass channel args to LB policies with updates --- .../grpc-js-xds/interop/xds-interop-client.ts | 9 +- packages/grpc-js-xds/src/load-balancer-cds.ts | 19 ++- .../grpc-js-xds/src/load-balancer-priority.ts | 14 +- .../src/load-balancer-ring-hash.ts | 22 ++- .../src/load-balancer-weighted-target.ts | 12 +- .../src/load-balancer-xds-cluster-impl.ts | 13 +- .../src/load-balancer-xds-cluster-manager.ts | 12 +- .../src/load-balancer-xds-wrr-locality.ts | 8 +- packages/grpc-js-xds/src/resolver-xds.ts | 7 +- packages/grpc-js-xds/src/xds-client.ts | 9 ++ .../test/test-custom-lb-policies.ts | 8 +- packages/grpc-js/src/experimental.ts | 1 + packages/grpc-js/src/internal-channel.ts | 10 +- .../src/load-balancer-child-handler.ts | 11 +- .../src/load-balancer-outlier-detection.ts | 10 +- .../grpc-js/src/load-balancer-pick-first.ts | 35 +++-- .../grpc-js/src/load-balancer-round-robin.ts | 8 +- packages/grpc-js/src/load-balancer.ts | 13 +- .../grpc-js/src/resolving-load-balancer.ts | 7 +- packages/grpc-js/test/test-pick-first.ts | 145 +++++++++++------- 20 files changed, 212 insertions(+), 161 deletions(-) diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index a245ad09f..dda8bfe92 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult; import PickResultType = grpc.experimental.PickResultType; import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper; import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig; +import { ChannelOptions } from '@grpc/grpc-js'; grpc_xds.register(); @@ -88,7 +89,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}}); class RpcBehaviorLoadBalancer implements LoadBalancer { private child: ChildLoadBalancerHandler; private latestConfig: RpcBehaviorLoadBalancingConfig | null = null; - constructor(channelControlHelper: ChannelControlHelper, options: grpc.ChannelOptions) { + constructor(channelControlHelper: ChannelControlHelper) { const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, { updateState: (connectivityState, picker) => { if (connectivityState === grpc.connectivityState.READY && this.latestConfig) { @@ -97,14 +98,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { channelControlHelper.updateState(connectivityState, picker); } }); - this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options); + this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { return; } this.latestConfig = lbConfig; - this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); + this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options); } exitIdle(): void { this.child.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index 882b60696..1cb598228 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -30,6 +30,7 @@ import { XdsConfig } from './xds-dependency-manager'; import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority'; import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; import { AGGREGATE_CLUSTER_BACKWARDS_COMPAT, EXPERIMENTAL_OUTLIER_DETECTION } from './environment'; +import { XDS_CONFIG_KEY } from './resolver-xds'; const TRACER_NAME = 'cds_balancer'; @@ -91,6 +92,8 @@ export function localityToName(locality: Locality__Output) { return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; } +export const ROOT_CLUSTER_KEY = 'grpc.internal.root_cluster'; + export class CdsLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; @@ -99,8 +102,8 @@ export class CdsLoadBalancer implements LoadBalancer { private priorityNames: string[] = []; private nextPriorityChildNumber = 0; - constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) { - this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options); + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); } private getNextPriorityName(cluster: string) { @@ -110,14 +113,14 @@ export class CdsLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + options: ChannelOptions ): void { if (!(lbConfig instanceof CdsLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); return; } trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); - const xdsConfig = attributes.xdsConfig as XdsConfig; + const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig; const clusterName = lbConfig.getCluster(); const maybeClusterConfig = xdsConfig.clusters.get(clusterName); if (!maybeClusterConfig) { @@ -165,7 +168,7 @@ export class CdsLoadBalancer implements LoadBalancer { this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); return; } - this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...attributes, rootCluster: clusterName}); + this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}); } else { if (!clusterConfig.children.endpoints) { trace('Received update with no resolved endpoints for cluster ' + clusterName); @@ -180,8 +183,8 @@ export class CdsLoadBalancer implements LoadBalancer { if (clusterConfig.cluster.type === 'EDS') { endpointPickingPolicy = clusterConfig.cluster.lbPolicyConfig; if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) { - if (typeof attributes.rootCluster === 'string') { - const maybeRootClusterConfig = xdsConfig.clusters.get(attributes.rootCluster); + if (typeof options[ROOT_CLUSTER_KEY] === 'string') { + const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]); if (maybeRootClusterConfig?.success) { endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig; } @@ -279,7 +282,7 @@ export class CdsLoadBalancer implements LoadBalancer { return; } trace(JSON.stringify(typedChildConfig.toJsonObject(), undefined, 2)); - this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, attributes); + this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, options); } } exitIdle(): void { diff --git a/packages/grpc-js-xds/src/load-balancer-priority.ts b/packages/grpc-js-xds/src/load-balancer-priority.ts index 54e01fa84..57c8ca7c4 100644 --- a/packages/grpc-js-xds/src/load-balancer-priority.ts +++ b/packages/grpc-js-xds/src/load-balancer-priority.ts @@ -197,7 +197,7 @@ export class PriorityLoadBalancer implements LoadBalancer { this.parent.channelControlHelper.requestReresolution(); } } - }), parent.options); + })); this.picker = new QueuePicker(this.childBalancer); this.startFailoverTimer(); } @@ -307,7 +307,7 @@ export class PriorityLoadBalancer implements LoadBalancer { * The attributes object from the latest update, saved to be passed along to * each new child as they are created */ - private latestAttributes: { [key: string]: unknown } = {}; + private latestOptions: ChannelOptions = {}; /** * The latest load balancing policies and address lists for each child from * the latest update @@ -323,7 +323,7 @@ export class PriorityLoadBalancer implements LoadBalancer { private updatesPaused = false; - constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {} + constructor(private channelControlHelper: ChannelControlHelper) {} private updateState(state: ConnectivityState, picker: Picker) { trace( @@ -392,7 +392,7 @@ export class PriorityLoadBalancer implements LoadBalancer { child.updateAddressList( childUpdate.subchannelAddress, childUpdate.lbConfig, - this.latestAttributes + this.latestOptions ); } else { /* We're going to try to use this child, so reactivate it if it has been @@ -431,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + options: ChannelOptions ): void { if (!(lbConfig instanceof PriorityLoadBalancingConfig)) { // Reject a config of the wrong type @@ -467,7 +467,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } childAddressList.push(childAddress); } - this.latestAttributes = attributes; + this.latestOptions = options; this.latestUpdates.clear(); this.priorities = lbConfig.getPriorities(); this.updatesPaused = true; @@ -486,7 +486,7 @@ export class PriorityLoadBalancer implements LoadBalancer { existingChild.updateAddressList( childAddresses, childConfig.config, - attributes + options ); } } diff --git a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts index a124d3b88..77f0ab317 100644 --- a/packages/grpc-js-xds/src/load-balancer-ring-hash.ts +++ b/packages/grpc-js-xds/src/load-balancer-ring-hash.ts @@ -225,8 +225,7 @@ class RingHashLoadBalancer implements LoadBalancer { private updatesPaused = false; private currentState: connectivityState = connectivityState.IDLE; private ring: RingEntry[] = []; - private ringHashSizeCap = DEFAULT_RING_SIZE_CAP; - constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) { + constructor(private channelControlHelper: ChannelControlHelper) { this.childChannelControlHelper = createChildChannelControlHelper( channelControlHelper, { @@ -254,9 +253,6 @@ class RingHashLoadBalancer implements LoadBalancer { }, } ); - if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) { - this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap']; - } } private calculateAndUpdateState() { @@ -316,7 +312,8 @@ class RingHashLoadBalancer implements LoadBalancer { private constructRing( endpointList: Endpoint[], - config: RingHashLoadBalancingConfig + config: RingHashLoadBalancingConfig, + ringHashSizeCap: number ) { this.ring = []; const endpointWeights: EndpointWeight[] = []; @@ -336,8 +333,8 @@ class RingHashLoadBalancer implements LoadBalancer { minNormalizedWeight ); } - const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap); - const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap); + const minRingSize = Math.min(config.getMinRingSize(), ringHashSizeCap); + const maxRingSize = Math.min(config.getMaxRingSize(), ringHashSizeCap); /* Calculate a scale factor that meets the following conditions: * 1. The result is between minRingSize and maxRingSize, inclusive * 2. The smallest normalized weight is scaled to a whole number, if it @@ -390,7 +387,7 @@ class RingHashLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + options: ChannelOptions ): void { if (!(lbConfig instanceof RingHashLoadBalancingConfig)) { trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); @@ -403,11 +400,11 @@ class RingHashLoadBalancer implements LoadBalancer { for (const endpoint of endpointList) { const leafBalancer = this.leafMap.get(endpoint); if (leafBalancer) { - leafBalancer.updateEndpoint(endpoint); + leafBalancer.updateEndpoint(endpoint, options); } else { this.leafMap.set( endpoint, - new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options) + new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options) ); } const weight = this.leafWeightMap.get(endpoint); @@ -420,8 +417,9 @@ class RingHashLoadBalancer implements LoadBalancer { for (const leaf of removedLeaves) { leaf.destroy(); } + const ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'] ?? DEFAULT_RING_SIZE_CAP loadXxhashApi().then(() => { - this.constructRing(dedupedEndpointList, lbConfig); + this.constructRing(dedupedEndpointList, lbConfig, ringHashSizeCap); this.updatesPaused = false; this.calculateAndUpdateState(); }); diff --git a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts index 89192b622..16a6fb151 100644 --- a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts @@ -178,7 +178,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.updateState(connectivityState, picker); }, - }), parent.options); + })); this.picker = new QueuePicker(this.childBalancer); } @@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.parent.maybeUpdateState(); } - updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void { this.weight = lbConfig.weight; - this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -243,7 +243,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { private targetList: string[] = []; private updatesPaused = false; - constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {} + constructor(private channelControlHelper: ChannelControlHelper) {} private maybeUpdateState() { if (!this.updatesPaused) { @@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, picker); } - updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); @@ -365,7 +365,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { } const targetEndpoints = childEndpointMap.get(targetName) ?? []; trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')); - target.updateAddressList(targetEndpoints, targetConfig, attributes); + target.updateAddressList(targetEndpoints, targetConfig, options); } // Deactivate targets that are not in the new config diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts index fd2a9d887..c2558652d 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts @@ -40,6 +40,7 @@ import UnavailablePicker = experimental.UnavailablePicker; import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; import { ClusterConfig, XdsConfig } from "./xds-dependency-manager"; import { CdsUpdate } from "./xds-resource-type/cluster-resource-type"; +import { XDS_CLIENT_KEY, XDS_CONFIG_KEY } from "./resolver-xds"; const TRACER_NAME = 'xds_cluster_impl'; @@ -211,7 +212,7 @@ class XdsClusterImplBalancer implements LoadBalancer { private xdsClient: XdsClient | null = null; private latestClusterConfig: ClusterConfig | null = null; - constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) { + constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { createSubchannel: (subchannelAddress, subchannelArgs) => { if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) { @@ -248,15 +249,15 @@ class XdsClusterImplBalancer implements LoadBalancer { channelControlHelper.updateState(connectivityState, picker); } } - }), options); + })); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); return; } trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); - const xdsConfig = attributes.xdsConfig as XdsConfig; + const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig; const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster()); if (!maybeClusterConfig) { trace('Received update with no config for cluster ' + lbConfig.getCluster()); @@ -281,7 +282,7 @@ class XdsClusterImplBalancer implements LoadBalancer { this.lastestEndpointList = endpointList; this.latestConfig = lbConfig; this.latestClusterConfig = clusterConfig; - this.xdsClient = attributes.xdsClient as XdsClient; + this.xdsClient = options[XDS_CLIENT_KEY] as XdsClient; if (clusterConfig.cluster.lrsLoadReportingServer) { this.clusterDropStats = this.xdsClient.addClusterDropStats( clusterConfig.cluster.lrsLoadReportingServer, @@ -290,7 +291,7 @@ class XdsClusterImplBalancer implements LoadBalancer { ); } - this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options); } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index 870321cea..0843c4505 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -131,7 +131,7 @@ class XdsClusterManager implements LoadBalancer { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.updateState(connectivityState, picker); }, - }), parent.options); + })); this.picker = new QueuePicker(this.childBalancer); } @@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer { this.picker = picker; this.parent.maybeUpdateState(); } - updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { - this.childBalancer.updateAddressList(endpointList, childConfig, attributes); + updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { + this.childBalancer.updateAddressList(endpointList, childConfig, options); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -167,7 +167,7 @@ class XdsClusterManager implements LoadBalancer { // Shutdown is a placeholder value that will never appear in normal operation. private currentState: ConnectivityState = ConnectivityState.SHUTDOWN; private updatesPaused = false; - constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {} + constructor(private channelControlHelper: ChannelControlHelper) {} private maybeUpdateState() { if (!this.updatesPaused) { @@ -207,7 +207,7 @@ class XdsClusterManager implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap)); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); @@ -234,7 +234,7 @@ class XdsClusterManager implements LoadBalancer { child = new this.XdsClusterManagerChildImpl(this, name); this.children.set(name, child); } - child.updateAddressList(endpointList, childConfig, attributes); + child.updateAddressList(endpointList, childConfig, options); } this.updatesPaused = false; this.updateState(); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index babcb528e..be353d29d 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -73,10 +73,10 @@ class XdsWrrLocalityLoadBalancingConfig implements TypedLoadBalancingConfig { class XdsWrrLocalityLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) { - this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options); + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); return; @@ -99,7 +99,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { targets: targets } }; - this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), attributes); + this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options); } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 160d4f394..ce48948dd 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -80,6 +80,9 @@ const RETRY_CODES: {[key: string]: status} = { 'unavailable': status.UNAVAILABLE }; +export const XDS_CONFIG_KEY = `${experimental.SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX}.xds_config`; +export const XDS_CLIENT_KEY = 'grpc.internal.xds_client'; + class XdsResolver implements Resolver { private listenerResourceName: string | null = null; @@ -355,8 +358,8 @@ class XdsResolver implements Resolver { loadBalancingConfig: [lbPolicyConfig] } this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, { - xdsClient: this.xdsClient, - xdsConfig: xdsConfig + [XDS_CLIENT_KEY]: this.xdsClient, + [XDS_CONFIG_KEY]: xdsConfig }); } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 2caf34e22..dad503974 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -1322,6 +1322,15 @@ export class XdsClient { } return this.certificateProviderRegistry.get(instanceName); } + + /** + * Returns a valid JSON-stringifiable object, to avoid causing a circular + * reference error when an object containing this object is stringified. + * @returns + */ + toJSON(): object { + return {}; + } } let singletonXdsClient: XdsClient | null = null; diff --git a/packages/grpc-js-xds/test/test-custom-lb-policies.ts b/packages/grpc-js-xds/test/test-custom-lb-policies.ts index c49ac4b06..8d91c2b5d 100644 --- a/packages/grpc-js-xds/test/test-custom-lb-policies.ts +++ b/packages/grpc-js-xds/test/test-custom-lb-policies.ts @@ -84,7 +84,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}}); class RpcBehaviorLoadBalancer implements LoadBalancer { private child: ChildLoadBalancerHandler; private latestConfig: RpcBehaviorLoadBalancingConfig | null = null; - constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions) { + constructor(channelControlHelper: ChannelControlHelper) { const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, { updateState: (state, picker) => { if (state === connectivityState.READY && this.latestConfig) { @@ -93,14 +93,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { channelControlHelper.updateState(state, picker); } }); - this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options); + this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { return; } this.latestConfig = lbConfig; - this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); + this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options); } exitIdle(): void { this.child.exitIdle(); diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 13dfe7463..a616066db 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -64,3 +64,4 @@ export { FileWatcherCertificateProviderConfig } from './certificate-provider'; export { createCertificateProviderChannelCredentials } from './channel-credentials'; +export { SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX } from './internal-channel'; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 67c94d89c..0ca264905 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -159,6 +159,8 @@ class ShutdownPicker implements Picker { } } +export const SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX = 'grpc.internal.no_subchannel'; + export class InternalChannel { private readonly resolvingLoadBalancer: ResolvingLoadBalancer; private readonly subchannelPool: SubchannelPool; @@ -296,10 +298,16 @@ export class InternalChannel { subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions ) => { + const finalSubchannelArgs: ChannelOptions = {}; + for (const [key, value] of Object.entries(subchannelArgs)) { + if (!key.startsWith(SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX)) { + finalSubchannelArgs[key] = value; + } + } const subchannel = this.subchannelPool.getOrCreateSubchannel( this.target, subchannelAddress, - Object.assign({}, this.options, subchannelArgs), + finalSubchannelArgs, this.credentials ); subchannel.throttleKeepalive(this.keepaliveTime); diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index 352ea7b81..3d1baf1ba 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -30,7 +30,7 @@ import { SubchannelInterface } from './subchannel-interface'; const TYPE_NAME = 'child_load_balancer_helper'; -export class ChildLoadBalancerHandler implements LoadBalancer { +export class ChildLoadBalancerHandler { private currentChild: LoadBalancer | null = null; private pendingChild: LoadBalancer | null = null; private latestConfig: TypedLoadBalancingConfig | null = null; @@ -85,8 +85,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { }; constructor( - private readonly channelControlHelper: ChannelControlHelper, - private readonly options: ChannelOptions + private readonly channelControlHelper: ChannelControlHelper ) {} protected configUpdateRequiresNewPolicyInstance( @@ -105,7 +104,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + options: ChannelOptions ): void { let childToUpdate: LoadBalancer; if ( @@ -114,7 +113,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { this.configUpdateRequiresNewPolicyInstance(this.latestConfig, lbConfig) ) { const newHelper = new this.ChildPolicyHelper(this); - const newChild = createLoadBalancer(lbConfig, newHelper, this.options)!; + const newChild = createLoadBalancer(lbConfig, newHelper)!; newHelper.setChild(newChild); if (this.currentChild === null) { this.currentChild = newChild; @@ -134,7 +133,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { } } this.latestConfig = lbConfig; - childToUpdate.updateAddressList(endpointList, lbConfig, attributes); + childToUpdate.updateAddressList(endpointList, lbConfig, options); } exitIdle(): void { if (this.currentChild) { diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 9ee6b5f73..c5f7e179b 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -468,8 +468,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { private timerStartTime: Date | null = null; constructor( - channelControlHelper: ChannelControlHelper, - options: ChannelOptions + channelControlHelper: ChannelControlHelper ) { this.childBalancer = new ChildLoadBalancerHandler( createChildChannelControlHelper(channelControlHelper, { @@ -504,8 +503,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { channelControlHelper.updateState(connectivityState, picker); } }, - }), - options + }) ); this.ejectionTimer = setInterval(() => {}, 0); clearInterval(this.ejectionTimer); @@ -760,7 +758,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + options: ChannelOptions ): void { if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) { return; @@ -779,7 +777,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } this.entryMap.deleteMissing(endpointList); const childPolicy = lbConfig.getChildPolicy(); - this.childBalancer.updateAddressList(endpointList, childPolicy, attributes); + this.childBalancer.updateAddressList(endpointList, childPolicy, options); if ( lbConfig.getSuccessRateEjectionConfig() || diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 59876c121..bf3798e4b 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -224,7 +224,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { */ private stickyTransientFailureMode = false; - private reportHealthStatus: boolean; + private reportHealthStatus: boolean = false; /** * The most recent error reported by any subchannel as it transitioned to @@ -234,6 +234,8 @@ export class PickFirstLoadBalancer implements LoadBalancer { private latestAddressList: SubchannelAddress[] | null = null; + private latestOptions: ChannelOptions = {}; + /** * Load balancer that attempts to connect to each backend in the address list * in order, and picks the first one that connects, using it for every @@ -242,12 +244,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { * this load balancer's owner. */ constructor( - private readonly channelControlHelper: ChannelControlHelper, - options: ChannelOptions + private readonly channelControlHelper: ChannelControlHelper ) { this.connectionDelayTimeout = setTimeout(() => {}, 0); clearTimeout(this.connectionDelayTimeout); - this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME]; } private allChildrenHaveReportedTF(): boolean { @@ -461,10 +461,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.children = []; } - private connectToAddressList(addressList: SubchannelAddress[]) { + private connectToAddressList(addressList: SubchannelAddress[], options: ChannelOptions) { trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])'); const newChildrenList = addressList.map(address => ({ - subchannel: this.channelControlHelper.createSubchannel(address, {}), + subchannel: this.channelControlHelper.createSubchannel(address, options), hasReportedTransientFailure: false, })); for (const { subchannel } of newChildrenList) { @@ -499,11 +499,13 @@ export class PickFirstLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], - lbConfig: TypedLoadBalancingConfig + lbConfig: TypedLoadBalancingConfig, + options: ChannelOptions ): void { if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { return; } + this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME]; /* Previously, an update would be discarded if it was identical to the * previous update, to minimize churn. Now the DNS resolver is * rate-limited, so that is less of a concern. */ @@ -519,7 +521,8 @@ export class PickFirstLoadBalancer implements LoadBalancer { } const addressList = interleaveAddressFamilies(rawAddressList); this.latestAddressList = addressList; - this.connectToAddressList(addressList); + this.latestOptions = options; + this.connectToAddressList(addressList, options); } exitIdle() { @@ -527,7 +530,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.currentState === ConnectivityState.IDLE && this.latestAddressList ) { - this.connectToAddressList(this.latestAddressList); + this.connectToAddressList(this.latestAddressList, this.latestOptions); } } @@ -560,7 +563,7 @@ export class LeafLoadBalancer { constructor( private endpoint: Endpoint, channelControlHelper: ChannelControlHelper, - options: ChannelOptions + private options: ChannelOptions ) { const childChannelControlHelper = createChildChannelControlHelper( channelControlHelper, @@ -573,14 +576,17 @@ export class LeafLoadBalancer { } ); this.pickFirstBalancer = new PickFirstLoadBalancer( - childChannelControlHelper, - { ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true } + childChannelControlHelper ); this.latestPicker = new QueuePicker(this.pickFirstBalancer); } startConnecting() { - this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG); + this.pickFirstBalancer.updateAddressList( + [this.endpoint], + LEAF_CONFIG, + { ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true } + ); } /** @@ -589,7 +595,8 @@ export class LeafLoadBalancer { * attempt is not already in progress. * @param newEndpoint */ - updateEndpoint(newEndpoint: Endpoint) { + updateEndpoint(newEndpoint: Endpoint, newOptions: ChannelOptions) { + this.options = newOptions; this.endpoint = newEndpoint; if (this.latestState !== ConnectivityState.IDLE) { this.startConnecting(); diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 6e2b0b11d..4482b86b8 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -103,8 +103,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { private lastError: string | null = null; constructor( - private readonly channelControlHelper: ChannelControlHelper, - private readonly options: ChannelOptions + private readonly channelControlHelper: ChannelControlHelper ) { this.childChannelControlHelper = createChildChannelControlHelper( channelControlHelper, @@ -204,7 +203,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer { updateAddressList( endpointList: Endpoint[], - lbConfig: TypedLoadBalancingConfig + lbConfig: TypedLoadBalancingConfig, + options: ChannelOptions ): void { this.resetSubchannelList(); trace('Connect to endpoint list ' + endpointList.map(endpointToString)); @@ -214,7 +214,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { new LeafLoadBalancer( endpoint, this.childChannelControlHelper, - this.options + options ) ); for (const child of this.children) { diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index fb353a59a..1ea9c60c3 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -33,7 +33,7 @@ export interface ChannelControlHelper { /** * Returns a subchannel connected to the specified address. * @param subchannelAddress The address to connect to - * @param subchannelArgs Extra channel arguments specified by the load balancer + * @param subchannelArgs Channel arguments to use to construct the subchannel */ createSubchannel( subchannelAddress: SubchannelAddress, @@ -102,7 +102,7 @@ export interface LoadBalancer { updateAddressList( endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, - attributes: { [key: string]: unknown } + channelOptions: ChannelOptions ): void; /** * If the load balancer is currently in the IDLE state, start connecting. @@ -129,8 +129,7 @@ export interface LoadBalancer { export interface LoadBalancerConstructor { new ( - channelControlHelper: ChannelControlHelper, - options: ChannelOptions + channelControlHelper: ChannelControlHelper ): LoadBalancer; } @@ -172,14 +171,12 @@ export function registerDefaultLoadBalancerType(typeName: string) { export function createLoadBalancer( config: TypedLoadBalancingConfig, - channelControlHelper: ChannelControlHelper, - options: ChannelOptions + channelControlHelper: ChannelControlHelper ): LoadBalancer | null { const typeName = config.getLoadBalancerName(); if (typeName in registeredLoadBalancerTypes) { return new registeredLoadBalancerTypes[typeName].LoadBalancer( - channelControlHelper, - options + channelControlHelper ); } else { return null; diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 72aef0dfd..d18cf700e 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -198,7 +198,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { constructor( private readonly target: GrpcUri, private readonly channelControlHelper: ChannelControlHelper, - channelOptions: ChannelOptions, + private readonly channelOptions: ChannelOptions, private readonly onSuccessfulResolution: ResolutionCallback, private readonly onFailedResolution: ResolutionFailureCallback ) { @@ -242,8 +242,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { channelControlHelper.addChannelzChild.bind(channelControlHelper), removeChannelzChild: channelControlHelper.removeChannelzChild.bind(channelControlHelper), - }, - channelOptions + } ); this.innerResolver = createResolver( target, @@ -302,7 +301,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { this.childLoadBalancer.updateAddressList( endpointList, loadBalancingConfig, - attributes + {...this.channelOptions, ...attributes} ); const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig; diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index dcaa97ee7..1de2e8d37 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -123,10 +123,11 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.READY); @@ -142,13 +143,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.READY); @@ -164,7 +166,7 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { @@ -174,7 +176,8 @@ describe('pick_first load balancing policy', () => { ], }, ], - config + config, + {} ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.READY); @@ -198,10 +201,11 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); }); it('Should stay CONNECTING if only some subchannels fail to connect', done => { @@ -214,13 +218,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -236,13 +241,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -261,13 +267,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); @@ -300,13 +307,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); }); it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => { @@ -327,13 +335,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.READY); @@ -358,13 +367,14 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { currentStartState = ConnectivityState.CONNECTING; @@ -373,7 +383,8 @@ describe('pick_first load balancing policy', () => { { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); }); }); @@ -396,19 +407,21 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ { addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 2 }] }, ], - config + config, + {} ); process.nextTick(() => { currentStartState = ConnectivityState.READY; pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 3 }] }], - config + config, + {} ); }); }); @@ -431,10 +444,11 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -459,16 +473,18 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { currentStartState = ConnectivityState.IDLE; pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -494,16 +510,18 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { currentStartState = ConnectivityState.TRANSIENT_FAILURE; pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -529,15 +547,17 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -575,24 +595,27 @@ describe('pick_first load balancing policy', () => { }, } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 3 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[2].transitionToState( @@ -635,20 +658,23 @@ describe('pick_first load balancing policy', () => { }, } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); process.nextTick(() => { pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 2 }] }], - config + config, + {} ); }); }); @@ -676,10 +702,11 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [{ addresses: [{ host: 'localhost', port: 1 }] }], - config + config, + {} ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); @@ -698,8 +725,8 @@ describe('pick_first load balancing policy', () => { ), } ); - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); - pickFirst.updateAddressList([], config); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([], config, {}); }); describe('Address list randomization', () => { const shuffleConfig = new PickFirstLoadBalancingConfig(true); @@ -733,20 +760,20 @@ describe('pick_first load balancing policy', () => { for (let i = 0; i < 10; i++) { endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); /* Pick from 10 subchannels 5 times, with address randomization enabled, * and verify that at least two different subchannels are picked. The * probability choosing the same address every time is 1/10,000, which * I am considering an acceptable flake rate */ - pickFirst.updateAddressList(endpoints, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig, {}); process.nextTick(() => { assert(pickedSubchannels.size > 1); done(); @@ -789,16 +816,16 @@ describe('pick_first load balancing policy', () => { for (let i = 0; i < 10; i++) { endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } - const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); - pickFirst.updateAddressList(endpoints, config); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList(endpoints, config, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config); + pickFirst.updateAddressList(endpoints, config, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config); + pickFirst.updateAddressList(endpoints, config, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config); + pickFirst.updateAddressList(endpoints, config, {}); process.nextTick(() => { - pickFirst.updateAddressList(endpoints, config); + pickFirst.updateAddressList(endpoints, config, {}); process.nextTick(() => { assert(pickedSubchannels.size === 1); done();