Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Query: Removes Ode changes which rejected certain types of queries which were previously supported on 3.36.0-preview #4142

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using global::Azure;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand All @@ -33,12 +33,10 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
internal static class CosmosQueryExecutionContextFactory
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
private static readonly Regex QueryInspectionRegex = new Regex(QueryInspectionPattern, RegexOptions.IgnoreCase | RegexOptions.Compiled);

public static IQueryPipelineStage Create(
DocumentContainer documentContainer,
Expand Down Expand Up @@ -149,14 +147,14 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy

if (targetRange != null)
{
return await TryCreateSinglePartitionExecutionContextAsync(
return await TryCreateExecutionContextAsync(
documentContainer,
partitionedQueryExecutionInfo: null,
cosmosQueryContext,
containerQueryProperties,
inputParameters,
targetRange,
createQueryPipelineTrace,
trace,
cancellationToken);
}

Expand Down Expand Up @@ -299,7 +297,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione

if (targetRange != null)
{
tryCreatePipelineStage = await TryCreateSinglePartitionExecutionContextAsync(
tryCreatePipelineStage = await TryCreateExecutionContextAsync(
documentContainer,
partitionedQueryExecutionInfo,
cosmosQueryContext,
Expand Down Expand Up @@ -330,7 +328,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
return tryCreatePipelineStage;
}

private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitionExecutionContextAsync(
private static async Task<TryCatch<IQueryPipelineStage>> TryCreateExecutionContextAsync(
DocumentContainer documentContainer,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
CosmosQueryContext cosmosQueryContext,
Expand All @@ -340,17 +338,6 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitio
ITrace trace,
CancellationToken cancellationToken)
{
// Retrieve the query plan in a subset of cases to ensure the query is valid before creating the Ode pipeline
if (partitionedQueryExecutionInfo == null && QueryInspectionRegex.IsMatch(inputParameters.SqlQuerySpec.QueryText))
{
partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync(
cosmosQueryContext,
inputParameters,
containerQueryProperties,
trace,
cancellationToken);
}

// Test code added to confirm the correct pipeline is being utilized
SetTestInjectionPipelineType(inputParameters, OptimisticDirectExecution);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void PositiveOptimisticDirectExecutionOutput()
partitionKeyPath: @"/pk",
partitionKeyValue: null),
};

this.ExecuteTestSuite(testVariations);
}

Expand Down Expand Up @@ -380,11 +379,11 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync()
{
int numItems = 100;
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Single Partition Key and Value Field",
description: @"Single Partition Key and Value Field",
query: "SELECT * FROM c",
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");

int result = await this.GetPipelineAndDrainAsync(
input,
Expand All @@ -395,63 +394,6 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync()
Assert.AreEqual(numItems, result);
}

// test checks that the Ode code path ensures that a query is valid before sending it to the backend
// these queries with previous ODE implementation would have succeeded. However, with the new query validity check, they should all throw an exception
[TestMethod]
public async Task TestQueryValidityCheckWithODEAsync()
{
const string UnsupportedSelectStarInGroupBy = "'SELECT *' is not allowed with GROUP BY";
const string UnsupportedCompositeAggregate = "Compositions of aggregates and other expressions are not allowed.";
const string UnsupportedNestedAggregateExpression = "Cannot perform an aggregate function on an expression containing an aggregate or a subquery.";
const string UnsupportedSelectLisWithAggregateOrGroupByExpression = "invalid in the select list because it is not contained in either an aggregate function or the GROUP BY clause";

List<(string Query, string ExpectedMessage)> testVariations = new List<(string Query, string ExpectedMessage)>
{
("SELECT COUNT (1) + 5 FROM c", UnsupportedCompositeAggregate),
("SELECT MIN(c.price) + 10 FROM c", UnsupportedCompositeAggregate),
("SELECT MAX(c.price) - 4 FROM c", UnsupportedCompositeAggregate),
("SELECT SUM (c.price) + 20 FROM c",UnsupportedCompositeAggregate),
("SELECT AVG(c.price) * 50 FROM c", UnsupportedCompositeAggregate),
("SELECT * from c GROUP BY c.name", UnsupportedSelectStarInGroupBy),
("SELECT SUM(c.sales) AS totalSales, AVG(SUM(c.salesAmount)) AS averageTotalSales\n\n\nFROM c", UnsupportedNestedAggregateExpression),
("SELECT c.category, c.price, COUNT(c) FROM c GROUP BY c.category\r\n", UnsupportedSelectLisWithAggregateOrGroupByExpression)
};

List<(string, string)> testVariationsWithCaseSensitivity = new List<(string, string)>();
foreach ((string Query, string ExpectedMessage) testCase in testVariations)
{
testVariationsWithCaseSensitivity.Add((testCase.Query, testCase.ExpectedMessage));
testVariationsWithCaseSensitivity.Add((testCase.Query.ToLower(), testCase.ExpectedMessage));
testVariationsWithCaseSensitivity.Add((testCase.Query.ToUpper(), testCase.ExpectedMessage));
}

foreach ((string Query, string ExpectedMessage) testCase in testVariationsWithCaseSensitivity)
{
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Unsupported queries in CosmosDB that were previously supported by Ode pipeline and returning wrong results",
query: testCase.Query,
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");

try
{
int result = await this.GetPipelineAndDrainAsync(
input,
numItems: 100,
isMultiPartition: false,
expectedContinuationTokenCount: 0,
requiresDist: true);
Assert.Fail("Invalid query being executed did not result in an exception");
}
catch (Exception ex)
{
Assert.IsTrue(ex.InnerException.Message.Contains(testCase.ExpectedMessage));
continue;
}
}
}

// test to check if pipeline handles a 410 exception properly and returns all the documents.
[TestMethod]
public async Task TestPipelineForGoneExceptionOnSingleAndMultiplePartitionAsync()
Expand Down Expand Up @@ -628,7 +570,7 @@ private async Task<int> GetPipelineAndDrainAsync(OptimisticDirectExecutionTestIn
return documents.Count;
}

internal static TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition)
internal static PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition)
{
TryCatch<PartitionedQueryExecutionInfo> tryGetQueryPlan = QueryPartitionProviderTestInstance.Object.TryGetPartitionedQueryExecutionInfo(
querySpecJsonString: querySpecJsonString,
Expand All @@ -641,7 +583,7 @@ internal static TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryEx
useSystemPrefix: false,
geospatialType: Cosmos.GeospatialType.Geography);

return tryGetQueryPlan;
return tryGetQueryPlan.Result;
}

private static async Task<IQueryPipelineStage> GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions)
Expand Down Expand Up @@ -789,6 +731,7 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(new SqlQuerySpec(input.Query), Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, input.PartitionKeyDefinition);
CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters(
sqlQuerySpec: new SqlQuerySpec(input.Query),
initialUserContinuationToken: input.ContinuationToken,
Expand All @@ -797,8 +740,8 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
maxItemCount: queryRequestOptions.MaxItemCount,
maxBufferedItemCount: queryRequestOptions.MaxBufferedItemCount,
partitionKey: input.PartitionKeyValue,
properties: new Dictionary<string, object>() { { "x-ms-query-partitionkey-definition", input.PartitionKeyDefinition } },
partitionedQueryExecutionInfo: null,
properties: queryRequestOptions.Properties,
partitionedQueryExecutionInfo: partitionedQueryExecutionInfo,
executionEnvironment: null,
returnResultsInDeterministicOrder: null,
forcePassthrough: false,
Expand Down Expand Up @@ -1100,8 +1043,7 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

TryCatch<PartitionedQueryExecutionInfo> queryPlan = OptimisticDirectExecutionQueryBaselineTests.TryGetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition);
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlan.Succeeded ? queryPlan.Result : throw queryPlan.Exception;
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = OptimisticDirectExecutionQueryBaselineTests.GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition);
return TryCatch<PartitionedQueryExecutionInfo>.FromResult(partitionedQueryExecutionInfo);
}
}
Expand Down
Loading