Skip to content

Commit

Permalink
rework fetch to stop using max window size
Browse files Browse the repository at this point in the history
fetch now uses an expanded slice count as the query size

refs: #948
  • Loading branch information
godber committed Aug 23, 2022
1 parent 381fdab commit cc818e7
Showing 1 changed file with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
isString,
isWildCardString,
matchWildcard,
pRetry,
toIntegerOrThrow
} from '@terascope/utils';
import { DataFrame } from '@terascope/data-mate';
Expand Down Expand Up @@ -111,21 +112,82 @@ export class ElasticsearchReaderAPI {
/**
* Fetch a given slice, the data will be returned in the format
* specified under `response_type` in the config
*/
*
* `fetch` now sets the query `size` parameter to 1.5 times the count
* recorded in the slice, rather than the window size of the index.
* It will retry up to five times if the size of the result set comes back
* the same size as the querySize, this is meant to ensure all of the
* records are retrieved, but should be unlikely to happen in most cases
*
* When it previously used the indices max window size, it appeared to
* slow ES queries down significantly, refs:
* https://github.com/terascope/elasticsearch-assets/issues/948
*/
async fetch(queryParams: ReaderSlice = {}): Promise<DataEntity[]|DataFrame|Buffer> {
// attempt to get window if not set
const countExpansionFactor = 1.5;
let querySize = 10000;
const retryLimit = 5;

// attempt to get window if not set (sets this.windowSize as side effect)
if (!this.windowSize) await this.setWindowSize();

// if we did go ahead and complete query
const query = buildQuery(this.config, {
...queryParams, count: this.windowSize
});
// set querySize to the lesser of expandedSize or windowSize
if (queryParams.count) {
const expandedSize = Math.ceil(queryParams.count * countExpansionFactor);
if (this.windowSize) {
querySize = (expandedSize <= this.windowSize) ? expandedSize : this.windowSize;
}
}

return this.client.search(
query,
this.config.response_type ?? FetchResponseType.data_entities,
this.config.type_config
);
const _fetch = async ():
Promise<DataEntity[]|DataFrame|Buffer> => {
const query = buildQuery(this.config, {
...queryParams, count: querySize
});

const result = await this.client.search(
query,
this.config.response_type ?? FetchResponseType.data_entities,
this.config.type_config
);

const resultSize = this._getResultSize(result);

if (resultSize === querySize) {
// it's unlikely that this condition would be true without
// resultSize actually being larger than querySize so we will
// throw away these results, expand querySize and query again
// by relying on pRetry
querySize = Math.ceil(querySize * countExpansionFactor);
if (this.windowSize) {
querySize = (querySize <= this.windowSize) ? querySize : this.windowSize;
}
throw new Error(`The result set contained exactly ${querySize} records, searching again with size: ${querySize}`);
}

return result;
};

const result = await pRetry(() => _fetch(), { retries: retryLimit });
return result;
}

/**
* Handles multiple result types and returns the number of returned records
* @param result the object returned that contains the search results
* @returns the number of records returned by the search
*/
_getResultSize(result: DataEntity[]|DataFrame|Buffer): number {
let resultSize;
if (Buffer.isBuffer(result)) {
const json = result.toJSON();
resultSize = json.data.length;
} else if (DataEntity.isDataEntityArray(result)) {
resultSize = result.length;
} else {
resultSize = result.toArray().length;
}
return resultSize;
}

_searchRequest(query: SearchParams, fullResponse?: false): Promise<DataEntity[]>;
Expand Down

0 comments on commit cc818e7

Please sign in to comment.