Skip to content

Commit

Permalink
Fix ElasticSearch scroll result is created even on no scroll CTR
Browse files Browse the repository at this point in the history
Fixes #2038

Signed-off-by: Oleksandr Porunov <[email protected]>
  • Loading branch information
porunov committed Mar 12, 2020
1 parent 4e09ad6 commit c01bfea
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,17 @@ public static <T> Stream<T> asStream(final Iterator<T> source) {
return StreamSupport.stream(iterable.spliterator(),false);
}

public JanusGraph getForceIndexGraph() throws BackendException {
final ModifiableConfiguration adjustedConfig = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, getConfiguration(), BasicConfiguration.Restriction.NONE);
public JanusGraph getForceIndexGraph() {
return getForceIndexGraph(getConfiguration());
}

public JanusGraph getForceIndexGraph(WriteConfiguration writeConfiguration) {
final ModifiableConfiguration adjustedConfig = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, writeConfiguration, BasicConfiguration.Restriction.NONE);
adjustedConfig.set(GraphDatabaseConfiguration.FORCE_INDEX_USAGE, true);
final WriteConfiguration writeConfig = adjustedConfig.getConfiguration();
TestGraphConfigs.applyOverrides(writeConfig);
Preconditions.checkNotNull(writeConfig);
return JanusGraphFactory.open(writeConfig);
final WriteConfiguration adjustedWriteConfig = adjustedConfig.getConfiguration();
TestGraphConfigs.applyOverrides(adjustedWriteConfig);
Preconditions.checkNotNull(adjustedWriteConfig);
return JanusGraphFactory.open(adjustedWriteConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2224,6 +2224,24 @@ public void testOrForceIndexPartialIndex() throws Exception {
}
}

@Test
public void testIndexDataRetrievalWithLimitLessThenBatch() throws Exception {
WriteConfiguration config = getConfiguration();
config.set("index.search.max-result-set-size", 10);
JanusGraph customGraph = getForceIndexGraph(config);
final JanusGraphManagement management = customGraph.openManagement();
final PropertyKey num = management.makePropertyKey("num").dataType(Integer.class).cardinality(Cardinality.SINGLE).make();
management.buildIndex("oridx", Vertex.class).addKey(num).buildMixedIndex(INDEX);
management.commit();
customGraph.tx().commit();
final GraphTraversalSource g = customGraph.traversal();
g.addV().property("num", 1).next();
g.addV().property("num", 2).next();
customGraph.tx().commit();
assertEquals(2, customGraph.traversal().V().has("num", P.lt(3)).limit(4).toList().size());
JanusGraphFactory.close(customGraph);
}

@Test
public void testOrForceIndexComposite() throws Exception {
JanusGraph customGraph = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever info
compat.createRequestBody(sr, useScroll? NULL_PARAMETERS : TRACK_TOTAL_HITS_DISABLED_PARAMETERS),
useScroll);
log.debug("First Executed query [{}] in {} ms", query.getCondition(), response.getTook());
final ElasticSearchScroll resultIterator = new ElasticSearchScroll(client, response, sr.getSize());
final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, sr.getSize());
final Stream<RawQuery.Result<String>> toReturn
= StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false);
return (query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn).map(RawQuery.Result::getResult);
Expand All @@ -1121,6 +1121,10 @@ public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever info
}
}

private Iterator<RawQuery.Result<String>> getResultsIterator(boolean useScroll, ElasticSearchResponse response, int windowSize){
return (useScroll)? new ElasticSearchScroll(client, response, windowSize) : response.getResults().iterator();
}

private String convertToEsDataType(Class<?> dataType, Mapping mapping) {
if(String.class.isAssignableFrom(dataType)) {
return "string";
Expand Down Expand Up @@ -1206,9 +1210,10 @@ private void addOrderToQuery(KeyInformation.IndexRetriever informations, Elastic
public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever information,
BaseTransaction tx) throws BackendException {
final int size = query.hasLimit() ? Math.min(query.getLimit() + query.getOffset(), batchSize) : batchSize;
final ElasticSearchResponse response = runCommonQuery(query, information, tx, size, size >= batchSize );
final boolean useScroll = size >= batchSize;
final ElasticSearchResponse response = runCommonQuery(query, information, tx, size, useScroll);
log.debug("First Executed query [{}] in {} ms", query.getQuery(), response.getTook());
final ElasticSearchScroll resultIterator = new ElasticSearchScroll(client, response, size);
final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, size);
final Stream<RawQuery.Result<String>> toReturn
= StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED),
false).skip(query.getOffset());
Expand Down

0 comments on commit c01bfea

Please sign in to comment.