Skip to content

Commit

Permalink
Poll for results from frontend for async search (opensearch-project#8481
Browse files Browse the repository at this point in the history
)

* poll for results from frontend for async search

Signed-off-by: Amardeepsingh Siglani <[email protected]>

* Changeset file for PR opensearch-project#8481 created/updated

* fixed async sql strategy; added elapsed time to results

Signed-off-by: Amardeepsingh Siglani <[email protected]>

* removed unwanted toast notifications

Signed-off-by: Amardeepsingh Siglani <[email protected]>

---------

Signed-off-by: Amardeepsingh Siglani <[email protected]>
Co-authored-by: opensearch-changeset-bot[bot] <154024398+opensearch-changeset-bot[bot]@users.noreply.github.com>
  • Loading branch information
amsiglan and opensearch-changeset-bot[bot] authored Oct 7, 2024
1 parent 60df427 commit 58e1645
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 56 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/8481.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feat:
- Add logic to poll for async query result ([#8481](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/8481))
50 changes: 46 additions & 4 deletions src/plugins/data/common/data_frames/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,54 @@ export interface IDataFrameWithAggs extends IDataFrame {
aggs: Record<string, DataFrameAgg | DataFrameBucketAgg | DataFrameBucketAgg[]>;
}

export interface IDataFrameResponse extends SearchResponse<any> {
type: DATA_FRAME_TYPES;
body: IDataFrame | IDataFrameWithAggs | IDataFrameError;
export interface IDataFrameDefaultResponse {
type: DATA_FRAME_TYPES.DEFAULT;
body: IDataFrame | IDataFrameWithAggs;
took: number;
}

export type IDataFramePollingResponse = {
type: DATA_FRAME_TYPES.POLLING;
} & (FetchStatusResponse | QueryStartedResponse);

export interface IDataFrameErrorResponse {
type: DATA_FRAME_TYPES.ERROR;
body: IDataFrameError;
took: number;
}

export interface IDataFrameError extends IDataFrameResponse {
export type IDataFrameResponse = SearchResponse<any> &
(IDataFrameDefaultResponse | IDataFramePollingResponse | IDataFrameErrorResponse);

export interface IDataFrameError extends SearchResponse<any> {
error: Error;
}

export interface PollQueryResultsParams {
queryId?: string;
sessionId?: string;
}

export type QueryStatusConfig = PollQueryResultsParams;

export interface QuerySuccessStatusResponse {
status: 'success';
body: IDataFrame | IDataFrameWithAggs | IDataFrameError;
}

export interface QueryStartedResponse {
status: 'started';
body: { queryStatusConfig: QueryStatusConfig };
}

export interface QueryFailedStatusResponse {
status: 'failed';
body: IDataFrameError;
}

export type FetchStatusResponse =
| QueryFailedStatusResponse
| QuerySuccessStatusResponse
| { status?: string };

export type PollQueryResultsHandler = () => Promise<FetchStatusResponse>;
42 changes: 38 additions & 4 deletions src/plugins/data/common/search/search_source/search_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ import { fieldWildcardFilter } from '../../../../opensearch_dashboards_utils/com
import { IIndexPattern } from '../../index_patterns';
import {
DATA_FRAME_TYPES,
FetchStatusResponse,
IDataFrame,
IDataFrameDefaultResponse,
IDataFrameError,
IDataFramePollingResponse,
IDataFrameResponse,
QueryStartedResponse,
QuerySuccessStatusResponse,
convertResult,
createDataFrame,
} from '../../data_frames';
Expand All @@ -115,6 +120,7 @@ import {
import { getHighlightRequest } from '../../../common/field_formats';
import { fetchSoon } from './legacy';
import { extractReferences } from './extract_references';
import { handleQueryResults } from '../../utils/helpers';

/** @internal */
export const searchSourceRequiredUiSettings = [
Expand Down Expand Up @@ -436,14 +442,42 @@ export class SearchSource {
return search({ params }, options).then(async (response: any) => {
if (response.hasOwnProperty('type')) {
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.DEFAULT) {
const dataFrameResponse = response as IDataFrameResponse;
const dataFrameResponse = response as IDataFrameDefaultResponse;
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
}
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.POLLING) {
const dataFrameResponse = response as IDataFrameResponse;
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
const startTime = Date.now();
const { status } = response as IDataFramePollingResponse;
let results;
if (status === 'success') {
results = response as QuerySuccessStatusResponse;
} else if (status === 'started') {
const {
body: { queryStatusConfig },
} = response as QueryStartedResponse;

if (!queryStatusConfig) {
throw new Error('Cannot poll results for undefined query status config');
}

results = await handleQueryResults({
pollQueryResults: async () =>
search(
{ params: { ...params, pollQueryResultsParams: { ...queryStatusConfig } } },
options
) as Promise<FetchStatusResponse>,
queryId: queryStatusConfig.queryId,
});
} else {
throw new Error('Invalid query state');
}

const elapsedMs = Date.now() - startTime;
(results as any).took = elapsedMs;

await this.setDataFrame((results as QuerySuccessStatusResponse).body as IDataFrame);
return onResponse(searchRequest, convertResult(results as IDataFrameResponse));
}
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.ERROR) {
const dataFrameError = response as IDataFrameError;
Expand Down
70 changes: 70 additions & 0 deletions src/plugins/data/common/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Any modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { timer } from 'rxjs';
import { filter, mergeMap, take, takeWhile } from 'rxjs/operators';
import {
PollQueryResultsHandler,
FetchStatusResponse,
QueryFailedStatusResponse,
} from '../data_frames';

export interface QueryStatusOptions {
pollQueryResults: PollQueryResultsHandler;
queryId?: string;
interval?: number;
}

export const handleQueryResults = <T>(
options: QueryStatusOptions
): Promise<FetchStatusResponse> => {
const { pollQueryResults, interval = 5000, queryId } = options;

return timer(0, interval)
.pipe(
mergeMap(() => pollQueryResults()),
takeWhile((response: FetchStatusResponse) => {
const status = response?.status?.toUpperCase();
return status !== 'SUCCESS' && status !== 'FAILED';
}, true),
filter((response: FetchStatusResponse) => {
const status = response?.status?.toUpperCase();
if (status === 'FAILED') {
throw (
(response as QueryFailedStatusResponse).body.error ??
new Error(`Failed to fetch results ${queryId ?? ''}`)
);
}
return status === 'SUCCESS';
}),
take(1)
)
.toPromise();
};
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { BehaviorSubject, Subject, merge } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
import { i18n } from '@osd/i18n';
import { useEffect } from 'react';
import { cloneDeep, isEqual } from 'lodash';
import { cloneDeep } from 'lodash';
import { useLocation } from 'react-router-dom';
import { RequestAdapter } from '../../../../../inspector/public';
import { DiscoverViewServices } from '../../../build_services';
Expand Down Expand Up @@ -243,7 +243,7 @@ export const useSearch = (services: DiscoverViewServices) => {
elapsedMs,
},
});
} catch (error) {
} catch (error: any) {
// If the request was aborted then no need to surface this error in the UI
if (error instanceof Error && error.name === 'AbortError') return;

Expand All @@ -259,9 +259,9 @@ export const useSearch = (services: DiscoverViewServices) => {
}
let errorBody;
try {
errorBody = JSON.parse(error.body.message);
errorBody = JSON.parse(error.message);
} catch (e) {
errorBody = error.body.message;
errorBody = error.message;
}

data$.next({
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/query_enhancements/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import { CoreSetup } from 'opensearch-dashboards/public';
import { PollQueryResultsParams } from '../../data/common';

export interface QueryAggConfig {
[key: string]: {
Expand All @@ -25,6 +26,7 @@ export interface EnhancedFetchContext {
http: CoreSetup['http'];
path: string;
signal?: AbortSignal;
body?: { pollQueryResultsParams: PollQueryResultsParams };
}

export interface QueryStatusOptions<T> {
Expand Down
9 changes: 7 additions & 2 deletions src/plugins/query_enhancements/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
*/

import { Query } from 'src/plugins/data/common';
import { from, throwError, timer } from 'rxjs';
import { from, timer } from 'rxjs';
import { filter, mergeMap, take, takeWhile } from 'rxjs/operators';
import { stringify } from '@osd/std';
import {
EnhancedFetchContext,
QueryAggConfig,
Expand Down Expand Up @@ -49,7 +50,11 @@ export const handleFacetError = (response: any) => {

export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: QueryAggConfig) => {
const { http, path, signal } = context;
const body = JSON.stringify({ query: { ...query, format: 'jdbc' }, aggConfig });
const body = stringify({
query: { ...query, format: 'jdbc' },
aggConfig,
pollQueryResultsParams: context.body?.pollQueryResultsParams,
});
return from(
http.fetch({
method: 'POST',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { trimEnd } from 'lodash';
import { Observable, throwError } from 'rxjs';
import { catchError, tap } from 'rxjs/operators';
import { catchError } from 'rxjs/operators';
import { CoreStart } from 'opensearch-dashboards/public';
import {
DataPublicPluginStart,
Expand Down Expand Up @@ -36,16 +36,16 @@ export class SQLSearchInterceptor extends SearchInterceptor {
signal?: AbortSignal,
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const isAsync = strategy === SEARCH_STRATEGY.SQL_ASYNC;
const context: EnhancedFetchContext = {
http: this.deps.http,
path: trimEnd(`${API.SEARCH}/${strategy}`),
signal,
body: {
pollQueryResultsParams: request.params?.pollQueryResultsParams,
},
};

if (isAsync) this.notifications.toasts.add('Fetching data...');
return fetch(context, this.queryService.queryString.getQuery()).pipe(
tap(() => isAsync && this.notifications.toasts.addSuccess('Fetch complete...')),
catchError((error) => {
return throwError(error);
})
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/query_enhancements/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ export function defineSearchStrategyRouteProvider(logger: Logger, router: IRoute
format: schema.string(),
}),
aggConfig: schema.nullable(schema.object({}, { unknowns: 'allow' })),
pollQueryResultsParams: schema.maybe(
schema.object({
queryId: schema.maybe(schema.string()),
sessionId: schema.maybe(schema.string()),
})
),
}),
},
},
Expand Down
Loading

0 comments on commit 58e1645

Please sign in to comment.