Skip to content

Commit

Permalink
DBSQLOperation Refactoring (3 of 3) (#198)
Browse files Browse the repository at this point in the history
* Refactoring: Introduce concept of results provider; convert FetchResultsHelper into provider of TRowSet

Signed-off-by: Levko Kravets <[email protected]>

* Convert Json/Arrow/CloudFetch result handlers to implement result provider interface

Signed-off-by: Levko Kravets <[email protected]>

* Refine the code and update tests

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Nov 14, 2023
1 parent e169f69 commit 57c21d7
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 197 deletions.
39 changes: 16 additions & 23 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import {
TOperationState,
} from '../../thrift/TCLIService_types';
import Status from '../dto/Status';
import FetchResultsHelper from './FetchResultsHelper';
import { LogLevel } from '../contracts/IDBSQLLogger';
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import CloudFetchResult from '../result/CloudFetchResult';
import IResultsProvider from '../result/IResultsProvider';
import RowSetProvider from '../result/RowSetProvider';
import JsonResultHandler from '../result/JsonResultHandler';
import ArrowResultHandler from '../result/ArrowResultHandler';
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
import { definedOrError } from '../utils';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
Expand Down Expand Up @@ -50,7 +50,7 @@ export default class DBSQLOperation implements IOperation {

public onClose?: () => void;

private readonly _data: FetchResultsHelper;
private readonly _data: RowSetProvider;

private readonly closeOperation?: TCloseOperationResp;

Expand All @@ -68,7 +68,7 @@ export default class DBSQLOperation implements IOperation {

private hasResultSet: boolean = false;

private resultHandler?: IOperationResult;
private resultHandler?: IResultsProvider<Array<any>>;

constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) {
this.operationHandle = handle;
Expand All @@ -82,7 +82,7 @@ export default class DBSQLOperation implements IOperation {
}

this.metadata = directResults?.resultSetMetadata;
this._data = new FetchResultsHelper(
this._data = new RowSetProvider(
this.context,
this.operationHandle,
[directResults?.resultSet],
Expand Down Expand Up @@ -135,14 +135,12 @@ export default class DBSQLOperation implements IOperation {

await this.waitUntilReady(options);

const [resultHandler, data] = await Promise.all([
this.getResultHandler(),
this._data.fetch(options?.maxRows || defaultMaxRows),
]);
const resultHandler = await this.getResultHandler();
await this.failIfClosed();

const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows });
await this.failIfClosed();

const result = await resultHandler.getValue(data ? [data] : []);
this.context
.getLogger()
.log(
Expand Down Expand Up @@ -234,14 +232,9 @@ export default class DBSQLOperation implements IOperation {
return false;
}

// Return early if there are still data available for fetching
if (this._data.hasMoreRows) {
return true;
}

// If we fetched all the data from server - check if there's anything buffered in result handler
const resultHandler = await this.getResultHandler();
return resultHandler.hasPendingData();
return resultHandler.hasMore();
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
Expand Down Expand Up @@ -342,20 +335,20 @@ export default class DBSQLOperation implements IOperation {
return this.metadata;
}

private async getResultHandler(): Promise<IOperationResult> {
private async getResultHandler(): Promise<IResultsProvider<Array<any>>> {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

if (!this.resultHandler) {
switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResult(this.context, metadata.schema);
this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResult(this.context, metadata.schema, metadata.arrowSchema);
this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResult(this.context, metadata.schema);
this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
break;
default:
this.resultHandler = undefined;
Expand Down
36 changes: 22 additions & 14 deletions lib/result/ArrowResult.ts → lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,46 @@ import {
} from 'apache-arrow';
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IOperationResult from './IOperationResult';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

export default class ArrowResult implements IOperationResult {
export default class ArrowResultHandler implements IResultsProvider<Array<any>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly schema: Array<TColumnDesc>;

private readonly arrowSchema?: Buffer;

constructor(context: IClientContext, schema?: TTableSchema, arrowSchema?: Buffer) {
constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
schema?: TTableSchema,
arrowSchema?: Buffer,
) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
this.arrowSchema = arrowSchema;
}

async hasPendingData() {
return false;
public async hasMore() {
return this.source.hasMore();
}

async getValue(data?: Array<TRowSet>) {
if (this.schema.length === 0 || !this.arrowSchema || !data) {
public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0 || !this.arrowSchema) {
return [];
}

const data = await this.source.fetchNext(options);

const batches = await this.getBatches(data);
if (batches.length === 0) {
return [];
Expand All @@ -52,15 +62,13 @@ export default class ArrowResult implements IOperationResult {
return this.getRows(table.schema, table.toArray());
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
protected async getBatches(rowSet?: TRowSet): Promise<Array<Buffer>> {
const result: Array<Buffer> = [];

data.forEach((rowSet) => {
rowSet.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
}
});
rowSet?.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
}
});

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,31 @@ import { Buffer } from 'buffer';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import { TRowSet, TSparkArrowResultLink, TTableSchema } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import ArrowResult from './ArrowResult';
import IResultsProvider from './IResultsProvider';
import ArrowResultHandler from './ArrowResultHandler';
import globalConfig from '../globalConfig';

export default class CloudFetchResult extends ArrowResult {
export default class CloudFetchResultHandler extends ArrowResultHandler {
private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadedBatches: Array<Buffer> = [];

constructor(context: IClientContext, schema?: TTableSchema) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
// Arrow schema returned in metadata is not needed for CloudFetch results:
// each batch already contains schema and could be decoded as is
super(context, schema, Buffer.alloc(0));
super(context, source, schema, Buffer.alloc(0));
}

async hasPendingData() {
return this.pendingLinks.length > 0 || this.downloadedBatches.length > 0;
public async hasMore() {
if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) {
return true;
}
return super.hasMore();
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
data.forEach((item) => {
item.resultLinks?.forEach((link) => {
this.pendingLinks.push(link);
});
protected async getBatches(data?: TRowSet): Promise<Array<Buffer>> {
data?.resultLinks?.forEach((link) => {
this.pendingLinks.push(link);
});

if (this.downloadedBatches.length === 0) {
Expand Down
7 changes: 0 additions & 7 deletions lib/result/IOperationResult.ts

This file was deleted.

9 changes: 9 additions & 0 deletions lib/result/IResultsProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export interface ResultsProviderFetchNextOptions {
limit: number;
}

export default interface IResultsProvider<T> {
fetchNext(options: ResultsProviderFetchNextOptions): Promise<T>;

hasMore(): Promise<boolean>;
}
28 changes: 16 additions & 12 deletions lib/result/JsonResult.ts → lib/result/JsonResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
import { ColumnCode } from '../hive/Types';
import { TRowSet, TTableSchema, TColumn, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IOperationResult from './IOperationResult';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

export default class JsonResult implements IOperationResult {
export default class JsonResultHandler implements IResultsProvider<Array<any>> {
private readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly schema: Array<TColumnDesc>;

constructor(context: IClientContext, schema?: TTableSchema) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
}

async hasPendingData() {
return false;
public async hasMore() {
return this.source.hasMore();
}

async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
if (this.schema.length === 0 || !data) {
public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0) {
return [];
}

return data.reduce((result: Array<any>, rowSet: TRowSet) => {
const columns = rowSet.columns || [];
const rows = this.getRows(columns, this.schema);
const data = await this.source.fetchNext(options);
if (!data) {
return [];
}

return result.concat(rows);
}, []);
const columns = data.columns || [];
return this.getRows(columns, this.schema);
}

private getRows(columns: Array<TColumn>, descriptors: Array<TColumnDesc>): Array<any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { ColumnCode, FetchType, Int64 } from '../hive/Types';
import Status from '../dto/Status';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';

function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean {
if (response.hasMoreRows) {
Expand Down Expand Up @@ -35,7 +36,7 @@ function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean {
return (columnValue?.values?.length || 0) > 0;
}

export default class FetchResultsHelper {
export default class RowSetProvider implements IResultsProvider<TRowSet | undefined> {
private readonly context: IClientContext;

private readonly operationHandle: TOperationHandle;
Expand Down Expand Up @@ -79,7 +80,7 @@ export default class FetchResultsHelper {
return response.results;
}

public async fetch(maxRows: number) {
public async fetchNext({ limit }: ResultsProviderFetchNextOptions) {
const prefetchedResponse = this.prefetchedResults.shift();
if (prefetchedResponse) {
return this.processFetchResponse(prefetchedResponse);
Expand All @@ -89,10 +90,14 @@ export default class FetchResultsHelper {
const response = await driver.fetchResults({
operationHandle: this.operationHandle,
orientation: this.fetchOrientation,
maxRows: new Int64(maxRows),
maxRows: new Int64(limit),
fetchType: FetchType.Data,
});

return this.processFetchResponse(response);
}

public async hasMore() {
return this.hasMoreRows;
}
}
19 changes: 12 additions & 7 deletions tests/e2e/arrow.test.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { expect } = require('chai');
const sinon = require('sinon');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const ArrowResult = require('../../dist/result/ArrowResult').default;
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
const globalConfig = require('../../dist/globalConfig').default;

const fixtures = require('../fixtures/compatibility');
Expand Down Expand Up @@ -76,7 +77,7 @@ describe('Arrow support', () => {
expect(result).to.deep.equal(expectedColumn);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.not.instanceof(ArrowResult);
expect(resultHandler).to.be.not.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -93,7 +94,7 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -110,7 +111,7 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -130,14 +131,18 @@ describe('Arrow support', () => {

// We use some internals here to check that server returned response with multiple batches
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

const rawData = await operation._data.fetch(rowsCount);
sinon.spy(operation._data, 'fetchNext');

const result = await resultHandler.fetchNext({ limit: rowsCount });

expect(operation._data.fetchNext.callCount).to.be.eq(1);
const rawData = await operation._data.fetchNext.firstCall.returnValue;
// We don't know exact count of batches returned, it depends on server's configuration,
// but with much enough rows there should be more than one result batch
expect(rawData.arrowBatches?.length).to.be.gt(1);

const result = await resultHandler.getValue([rawData]);
expect(result.length).to.be.eq(rowsCount);
});
});
Loading

0 comments on commit 57c21d7

Please sign in to comment.