Skip to content

Commit

Permalink
fix(search): Apply SearchFlags passed in through to scroll queries (d…
Browse files Browse the repository at this point in the history
…atahub-project#9041)

Co-authored-by: Indy Prentice <[email protected]>
  • Loading branch information
iprentic and Indy Prentice authored Oct 18, 2023
1 parent 7855fb6 commit 409f981
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ public ScrollResult getCachedScrollResults(
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "scroll_cache_miss").time();
result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText);
result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText, flags);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "scroll_cache_miss_count").inc();
}
} else {
result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText);
result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText, flags);
}
return result;
}
Expand Down Expand Up @@ -328,7 +328,8 @@ private ScrollResult getRawScrollResults(
@Nullable final String scrollId,
@Nullable final String keepAlive,
final int count,
final boolean fulltext) {
final boolean fulltext,
@Nullable final SearchFlags searchFlags) {
if (fulltext) {
return entitySearchService.fullTextScroll(
entities,
Expand All @@ -337,15 +338,17 @@ private ScrollResult getRawScrollResults(
sortCriterion,
scrollId,
keepAlive,
count);
count,
searchFlags);
} else {
return entitySearchService.structuredScroll(entities,
input,
filters,
sortCriterion,
scrollId,
keepAlive,
count);
count,
searchFlags);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,26 @@ public List<String> getBrowsePaths(@Nonnull String entityName, @Nonnull Urn urn)
@Nonnull
@Override
public ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size) {
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags) {
log.debug(String.format(
"Scrolling Structured Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, scrollId: %s, size: %s",
entities, input, postFilters, sortCriterion, scrollId, size));
SearchFlags flags = Optional.ofNullable(searchFlags).orElse(new SearchFlags());
flags.setFulltext(true);
return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size,
new SearchFlags().setFulltext(true));
flags);
}

@Nonnull
@Override
public ScrollResult structuredScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size) {
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags) {
log.debug(String.format(
"Scrolling FullText Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, scrollId: %s, size: %s",
entities, input, postFilters, sortCriterion, scrollId, size));
return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size,
new SearchFlags().setFulltext(false));
SearchFlags flags = Optional.ofNullable(searchFlags).orElse(new SearchFlags());
flags.setFulltext(false);
return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size, flags);
}

public Optional<SearchResponse> raw(@Nonnull String indexName, @Nullable String jsonQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public SearchRequest getSearchRequest(@Nonnull String input, @Nullable Filter fi
BoolQueryBuilder filterQuery = getFilterQuery(filter);
searchSourceBuilder.query(QueryBuilders.boolQuery().must(getQuery(input, finalSearchFlags.isFulltext())).filter(filterQuery));
_aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation);
searchSourceBuilder.highlighter(getHighlights());
if (!finalSearchFlags.isSkipHighlighting()) {
searchSourceBuilder.highlighter(_highlights);
}
ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs);
searchRequest.source(searchSourceBuilder);
log.debug("Search request is: " + searchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import org.junit.Assert;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.action.search.SearchRequest;
import org.springframework.cache.CacheManager;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
Expand Down Expand Up @@ -108,6 +110,7 @@ abstract public class LineageServiceTestBase extends AbstractTestNGSpringContext
private GraphService _graphService;
private CacheManager _cacheManager;
private LineageSearchService _lineageSearchService;
private RestHighLevelClient _searchClientSpy;

private static final String ENTITY_NAME = "testEntity";
private static final Urn TEST_URN = TestEntityUtil.getTestEntityUrn();
Expand Down Expand Up @@ -162,10 +165,11 @@ private ElasticSearchService buildEntitySearchService() {
EntityIndexBuilders indexBuilders =
new EntityIndexBuilders(getIndexBuilder(), _entityRegistry,
_indexConvention, _settingsBuilder);
ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, getSearchClient(), _indexConvention, false,
_searchClientSpy = spy(getSearchClient());
ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, _searchClientSpy, _indexConvention, false,
ELASTICSEARCH_IMPLEMENTATION_ELASTICSEARCH, getSearchConfiguration(), null);
ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, getSearchClient(), _indexConvention, getSearchConfiguration(), getCustomSearchConfiguration());
ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, getSearchClient(), _indexConvention, getBulkProcessor(), 1);
ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, _searchClientSpy, _indexConvention, getSearchConfiguration(), getCustomSearchConfiguration());
ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, _searchClientSpy, _indexConvention, getBulkProcessor(), 1);
return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO);
}

Expand Down Expand Up @@ -246,9 +250,15 @@ public void testSearchService() throws Exception {
_elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString());
syncAfterWrite(getBulkProcessor());

Mockito.reset(_searchClientSpy);
searchResult = searchAcrossLineage(null, TEST1);
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().get(0).getEntity(), urn);
// Verify that highlighting was turned off in the query
ArgumentCaptor<SearchRequest> searchRequestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
Mockito.verify(_searchClientSpy, times(1)).search(searchRequestCaptor.capture(), any());
SearchRequest capturedRequest = searchRequestCaptor.getValue();
assertNull(capturedRequest.source().highlighter());
clearCache(false);

when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,30 @@ public void testDatasetFieldsAndHighlights() {
), "unexpected lineage fields in highlights: " + highlightFields);
}

@Test
public void testSearchRequestHandlerHighlightingTurnedOff() {
SearchRequestHandler requestHandler = SearchRequestHandler.getBuilder(TestEntitySpecBuilder.getSpec(), testQueryConfig, null);
SearchRequest searchRequest = requestHandler.getSearchRequest("testQuery", null, null, 0,
10, new SearchFlags().setFulltext(false).setSkipHighlighting(true), null);
SearchSourceBuilder sourceBuilder = searchRequest.source();
assertEquals(sourceBuilder.from(), 0);
assertEquals(sourceBuilder.size(), 10);
// Filters
Collection<AggregationBuilder> aggBuilders = sourceBuilder.aggregations().getAggregatorFactories();
// Expect 2 aggregations: textFieldOverride and _index
assertEquals(aggBuilders.size(), 2);
for (AggregationBuilder aggBuilder : aggBuilders) {
if (aggBuilder.getName().equals("textFieldOverride")) {
TermsAggregationBuilder filterPanelBuilder = (TermsAggregationBuilder) aggBuilder;
assertEquals(filterPanelBuilder.field(), "textFieldOverride.keyword");
} else if (!aggBuilder.getName().equals("_entityType")) {
fail("Found unexepected aggregation: " + aggBuilder.getName());
}
}
// Highlights should not be present
assertNull(sourceBuilder.highlighter());
}

@Test
public void testSearchRequestHandler() {
SearchRequestHandler requestHandler = SearchRequestHandler.getBuilder(TestEntitySpecBuilder.getSpec(), testQueryConfig, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable
* @param sortCriterion {@link SortCriterion} to be applied to search results
* @param scrollId opaque scroll identifier to pass to search service
* @param size the number of search hits to return
* @param searchFlags flags controlling search options
* @return a {@link ScrollResult} that contains a list of matched documents and related search result metadata
*/
@Nonnull
ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);

/**
* Gets a list of documents that match given search request. The results are aggregated and filters are applied to the
Expand All @@ -204,11 +205,12 @@ ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String inpu
* @param sortCriterion {@link SortCriterion} to be applied to search results
* @param scrollId opaque scroll identifier to pass to search service
* @param size the number of search hits to return
* @param searchFlags flags controlling search options
* @return a {@link ScrollResult} that contains a list of matched documents and related search result metadata
*/
@Nonnull
ScrollResult structuredScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);

/**
* Max result size returned by the underlying search backend
Expand Down

0 comments on commit 409f981

Please sign in to comment.