Skip to content

Commit

Permalink
Refactoring: Convert global config to client config and make it avail…
Browse files Browse the repository at this point in the history
…able through client context (#202)

Convert global config to client config and make it available through client context

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Nov 14, 2023
1 parent 57c21d7 commit 80c660e
Show file tree
Hide file tree
Showing 18 changed files with 397 additions and 266 deletions.
27 changes: 25 additions & 2 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext from './contracts/IClientContext';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
import HiveDriver from './hive/HiveDriver';
import { Int64 } from './hive/Types';
import DBSQLSession from './DBSQLSession';
Expand Down Expand Up @@ -46,6 +46,8 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
private static defaultLogger?: IDBSQLLogger;

private readonly config: ClientConfig;

private connectionProvider?: IConnectionProvider;

private authProvider?: IAuthentication;
Expand All @@ -69,8 +71,25 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
return this.defaultLogger;
}

private static getDefaultConfig(): ClientConfig {
return {
arrowEnabled: true,
useArrowNativeTypes: true,
socketTimeout: 15 * 60 * 1000, // 15 minutes

retryMaxAttempts: 30,
retriesTimeout: 900 * 1000,
retryDelayMin: 1 * 1000,
retryDelayMax: 60 * 1000,

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,
};
}

constructor(options?: ClientOptions) {
super();
this.config = DBSQLClient.getDefaultConfig();
this.logger = options?.logger ?? DBSQLClient.getDefaultLogger();
this.logger.log(LogLevel.info, 'Created DBSQLClient');
}
Expand Down Expand Up @@ -129,7 +148,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
public async connect(options: ConnectionOptions, authProvider?: IAuthentication): Promise<IDBSQLClient> {
this.authProvider = this.initAuthProvider(options, authProvider);

this.connectionProvider = new HttpConnection(this.getConnectionOptions(options));
this.connectionProvider = new HttpConnection(this.getConnectionOptions(options), this);

const thriftConnection = await this.connectionProvider.getThriftConnection();

Expand Down Expand Up @@ -196,6 +215,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.authProvider = undefined;
}

public getConfig(): ClientConfig {
return this.config;
}

public getLogger(): IDBSQLLogger {
return this.logger;
}
Expand Down
12 changes: 6 additions & 6 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ import { definedOrError } from './utils';
import CloseableCollection from './utils/CloseableCollection';
import { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
import globalConfig from './globalConfig';
import StagingError from './errors/StagingError';
import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext from './contracts/IClientContext';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

const defaultMaxRows = 100000;

Expand All @@ -59,11 +58,11 @@ function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
};
}

function getArrowOptions(): {
function getArrowOptions(config: ClientConfig): {
canReadArrowResult: boolean;
useArrowNativeTypes?: TSparkArrowTypes;
} {
const { arrowEnabled = true, useArrowNativeTypes = true } = globalConfig;
const { arrowEnabled = true, useArrowNativeTypes = true } = config;

if (!arrowEnabled) {
return {
Expand Down Expand Up @@ -187,14 +186,15 @@ export default class DBSQLSession implements IDBSQLSession {
public async executeStatement(statement: string, options: ExecuteStatementOptions = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.executeStatement({
sessionHandle: this.sessionHandle,
statement,
queryTimeout: options.queryTimeout,
runAsync: true,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
});
const response = await this.handleResponse(operationPromise);
Expand Down
13 changes: 9 additions & 4 deletions lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@ import { ProxyAgent } from 'proxy-agent';

import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
import globalConfig from '../../globalConfig';
import IClientContext from '../../contracts/IClientContext';

import ThriftHttpConnection from './ThriftHttpConnection';

export default class HttpConnection implements IConnectionProvider {
private readonly options: IConnectionOptions;

private readonly context: IClientContext;

private headers: HeadersInit = {};

private connection?: ThriftHttpConnection;

private agent?: http.Agent;

constructor(options: IConnectionOptions) {
constructor(options: IConnectionOptions, context: IClientContext) {
this.options = options;
this.context = context;
}

public setHeaders(headers: HeadersInit) {
Expand All @@ -44,11 +47,12 @@ export default class HttpConnection implements IConnectionProvider {
}

private getAgentDefaultOptions(): http.AgentOptions {
const clientConfig = this.context.getConfig();
return {
keepAlive: true,
maxSockets: 5,
keepAliveMsecs: 10000,
timeout: this.options.socketTimeout ?? globalConfig.socketTimeout,
timeout: this.options.socketTimeout ?? clientConfig.socketTimeout,
};
}

Expand Down Expand Up @@ -89,6 +93,7 @@ export default class HttpConnection implements IConnectionProvider {
public async getThriftConnection(): Promise<any> {
if (!this.connection) {
const { options } = this;
const clientConfig = this.context.getConfig();
const agent = await this.getAgent();

this.connection = new ThriftHttpConnection(
Expand All @@ -99,7 +104,7 @@ export default class HttpConnection implements IConnectionProvider {
},
{
agent,
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
timeout: options.socketTimeout ?? clientConfig.socketTimeout,
headers: {
...options.headers,
...this.headers,
Expand Down
16 changes: 16 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,23 @@ import IDriver from './IDriver';
import IConnectionProvider from '../connection/contracts/IConnectionProvider';
import TCLIService from '../../thrift/TCLIService';

export interface ClientConfig {
arrowEnabled?: boolean;
useArrowNativeTypes?: boolean;
socketTimeout: number;

retryMaxAttempts: number;
retriesTimeout: number; // in milliseconds
retryDelayMin: number; // in milliseconds
retryDelayMax: number; // in milliseconds

useCloudFetch: boolean;
cloudFetchConcurrentDownloads: number;
}

export default interface IClientContext {
getConfig(): ClientConfig;

getLogger(): IDBSQLLogger;

getConnectionProvider(): Promise<IConnectionProvider>;
Expand Down
27 changes: 0 additions & 27 deletions lib/globalConfig.ts

This file was deleted.

19 changes: 12 additions & 7 deletions lib/hive/Commands/BaseCommand.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Thrift } from 'thrift';
import TCLIService from '../../../thrift/TCLIService';
import HiveDriverError from '../../errors/HiveDriverError';
import globalConfig from '../../globalConfig';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';

interface CommandExecutionInfo {
startTime: number; // in milliseconds
attempt: number;
}

function getRetryDelay(attempt: number): number {
function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(globalConfig.retryDelayMin * scale, globalConfig.retryDelayMax);
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
Expand All @@ -22,8 +22,11 @@ function delay(milliseconds: number): Promise<void> {
export default abstract class BaseCommand {
protected client: TCLIService.Client;

constructor(client: TCLIService.Client) {
protected context: IClientContext;

constructor(client: TCLIService.Client, context: IClientContext) {
this.client = client;
this.context = context;
}

protected executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
Expand All @@ -49,19 +52,21 @@ export default abstract class BaseCommand {
case 503: // Service Unavailable
info.attempt += 1;

const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)
const retryDelay = getRetryDelay(info.attempt);
const retryDelay = getRetryDelay(info.attempt, clientConfig);

const attemptsExceeded = info.attempt >= globalConfig.retryMaxAttempts;
const attemptsExceeded = info.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`,
);
}

const timeoutExceeded = Date.now() - info.startTime + retryDelay >= globalConfig.retriesTimeout;
const timeoutExceeded = Date.now() - info.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`,
Expand Down
Loading

0 comments on commit 80c660e

Please sign in to comment.