-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
$batch #675
base: master
Are you sure you want to change the base?
$batch #675
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -101,6 +101,8 @@ import { | |||||
setExecutedMigrations, | ||||||
} from '../migrator/utils'; | ||||||
|
||||||
const validBatchMethods = new Set(['PUT', 'POST', 'PATCH', 'DELETE', 'GET']); | ||||||
|
||||||
const LF2AbstractSQLTranslator = LF2AbstractSQL.createTranslator(sbvrTypes); | ||||||
const LF2AbstractSQLTranslatorVersion = `${LF2AbstractSQLVersion}+${sbvrTypesVersion}`; | ||||||
|
||||||
|
@@ -140,6 +142,7 @@ export interface ApiKey extends Actor { | |||||
} | ||||||
|
||||||
export interface Response { | ||||||
id?: string; | ||||||
statusCode: number; | ||||||
headers?: | ||||||
| { | ||||||
|
@@ -1143,6 +1146,7 @@ const $getAffectedIds = async ({ | |||||
const parsedRequest: uriParser.ParsedODataRequest & | ||||||
Partial<Pick<uriParser.ODataRequest, 'engine' | 'translateVersions'>> = | ||||||
await uriParser.parseOData({ | ||||||
id: request.batchRequestId, | ||||||
method: request.method, | ||||||
url: `/${request.vocabulary}${request.url}`, | ||||||
}); | ||||||
|
@@ -1192,11 +1196,103 @@ export const getModel = (vocabulary: string) => { | |||||
return models[vocabulary]; | ||||||
}; | ||||||
|
||||||
const validateBatch = (req: Express.Request) => { | ||||||
const { requests } = req.body as { requests: uriParser.UnparsedRequest[] }; | ||||||
if (!Array.isArray(requests)) { | ||||||
throw new BadRequestError( | ||||||
'Batch requests must include an array of requests in the body via the "requests" property', | ||||||
); | ||||||
} | ||||||
if (req.headers != null && req.headers['content-type'] == null) { | ||||||
throw new BadRequestError( | ||||||
'Headers in a batch request must include a "content-type" header if they are provided', | ||||||
); | ||||||
} | ||||||
if ( | ||||||
requests.find( | ||||||
(request) => | ||||||
request.headers?.authorization != null || | ||||||
request.url?.includes('apikey='), | ||||||
) != null | ||||||
) { | ||||||
throw new BadRequestError( | ||||||
'Authorization may only be passed to the main batch request', | ||||||
); | ||||||
} | ||||||
const ids = new Set<string>( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
requests | ||||||
.map((request) => request.id) | ||||||
.filter((id) => typeof id === 'string') as string[], | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
); | ||||||
if (ids.size !== requests.length) { | ||||||
throw new BadRequestError( | ||||||
'All requests in a batch request must have unique string ids', | ||||||
); | ||||||
} | ||||||
|
||||||
for (const request of requests) { | ||||||
if ( | ||||||
request.headers != null && | ||||||
request.headers['content-type'] == null && | ||||||
(req.headers == null || req.headers['content-type'] == null) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) { | ||||||
throw new BadRequestError( | ||||||
'Requests of a batch request that have headers must include a "content-type" header', | ||||||
); | ||||||
} | ||||||
if (request.method == null) { | ||||||
throw new BadRequestError( | ||||||
'Requests of a batch request must have a "method"', | ||||||
); | ||||||
} | ||||||
const upperCaseMethod = request.method.toUpperCase(); | ||||||
if (!validBatchMethods.has(upperCaseMethod)) { | ||||||
throw new BadRequestError( | ||||||
`Requests of a batch request must have a method matching one of the following: ${Array.from( | ||||||
validBatchMethods, | ||||||
).join(', ')}`, | ||||||
); | ||||||
} | ||||||
if ( | ||||||
request.body !== undefined && | ||||||
(upperCaseMethod === 'GET' || upperCaseMethod === 'DELETE') | ||||||
) { | ||||||
throw new BadRequestError( | ||||||
'GET and DELETE requests of a batch request must not have a body', | ||||||
); | ||||||
} | ||||||
} | ||||||
|
||||||
const urls = new Set<string | undefined>( | ||||||
requests.map((request) => request.url), | ||||||
); | ||||||
if (urls.has(undefined)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also check for |
||||||
throw new BadRequestError('Requests of a batch request must have a "url"'); | ||||||
} | ||||||
const containsBatch = | ||||||
Array.from(urls).filter((url) => !!url?.includes('/$batch')).length > 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if (containsBatch) { | ||||||
throw new BadRequestError('Batch requests cannot contain batch requests'); | ||||||
} | ||||||
const urlModels = new Set( | ||||||
Array.from(urls.values()).map((url: string) => url.split('/')[1]), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can limit the splitting to stop on the first match:
Suggested change
|
||||||
); | ||||||
if (urlModels.size > 1) { | ||||||
throw new BadRequestError( | ||||||
'Batch requests must consist of requests for only one model', | ||||||
); | ||||||
} | ||||||
}; | ||||||
|
||||||
const runODataRequest = (req: Express.Request, vocabulary: string) => { | ||||||
if (env.DEBUG) { | ||||||
api[vocabulary].logger.log('Parsing', req.method, req.url); | ||||||
} | ||||||
|
||||||
if (req.url.startsWith(`/${vocabulary}/$batch`)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be good to store this check somewhere as I've seen equivalent |
||||||
validateBatch(req); | ||||||
} | ||||||
|
||||||
// Get the hooks for the current method/vocabulary as we know it, | ||||||
// in order to run PREPARSE hooks, before parsing gets us more info | ||||||
const { versions } = models[vocabulary]; | ||||||
|
@@ -1244,11 +1340,20 @@ const runODataRequest = (req: Express.Request, vocabulary: string) => { | |||||
await runHooks('PREPARSE', reqHooks, { req, tx: req.tx }); | ||||||
let requests: uriParser.UnparsedRequest[]; | ||||||
// Check if it is a single request or a batch | ||||||
if (req.batch != null && req.batch.length > 0) { | ||||||
requests = req.batch; | ||||||
if (req.url.startsWith(`/${vocabulary}/$batch`)) { | ||||||
await Promise.all( | ||||||
req.body.requests.map( | ||||||
async (request: HookReq) => | ||||||
await runHooks('PREPARSE', reqHooks, { | ||||||
req: request, | ||||||
tx: req.tx, | ||||||
}), | ||||||
), | ||||||
); | ||||||
requests = req.body.requests; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will execute all requests in one database transaction. Is this desired or should every request run in it's own database transaction? @Page- What do you think, should every request in a batch run in its own db tx? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that is part of the point/idea of implementing $batch @fisehara , to run all requests in a batch on one transaction. Part of the following roadmap item: https://roadmap.balena.io/posts/19/allow-changes-to-variables-to-be-batched-and-submitted-all-at-once |
||||||
} else { | ||||||
const { method, url, body } = req; | ||||||
requests = [{ method, url, data: body }]; | ||||||
requests = [{ method, url, body }]; | ||||||
} | ||||||
|
||||||
const prepareRequest = async ( | ||||||
|
@@ -1312,7 +1417,13 @@ const runODataRequest = (req: Express.Request, vocabulary: string) => { | |||||
|
||||||
// Parse the OData requests | ||||||
const results = await mappingFn(requests, async (requestPart) => { | ||||||
const parsedRequest = await uriParser.parseOData(requestPart); | ||||||
const parsedRequest = await uriParser.parseOData( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the requests are independent, should they be executed in parallel on the DB or should pinejs limit the requests to be executed sequencially to do some kind of rate limiting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requests should be executed sequentially. If a request along the way fails, the batch request should fail. However, there will be no rollback if the requests are not atomic
I don't know enough about pine and the API to answer this well, your opinion based on what I stated above would be better |
||||||
requestPart, | ||||||
req.url.startsWith(`/${vocabulary}/$batch`) && | ||||||
!requestPart.url.includes(`/${vocabulary}/$batch`) | ||||||
? req.headers | ||||||
: undefined, | ||||||
); | ||||||
|
||||||
let request: uriParser.ODataRequest | uriParser.ODataRequest[]; | ||||||
if (Array.isArray(parsedRequest)) { | ||||||
|
@@ -1392,7 +1503,10 @@ export const handleODataRequest: Express.Handler = async (req, res, next) => { | |||||
|
||||||
res.set('Cache-Control', 'no-cache'); | ||||||
// If we are dealing with a single request unpack the response and respond normally | ||||||
if (req.batch == null || req.batch.length === 0) { | ||||||
if ( | ||||||
!req.url.startsWith(`/${apiRoot}/$batch`) || | ||||||
req.body.requests?.length === 0 | ||||||
) { | ||||||
let [response] = responses; | ||||||
if (response instanceof HttpError) { | ||||||
response = httpErrorToResponse(response); | ||||||
|
@@ -1401,15 +1515,15 @@ export const handleODataRequest: Express.Handler = async (req, res, next) => { | |||||
|
||||||
// Otherwise its a multipart request and we reply with the appropriate multipart response | ||||||
} else { | ||||||
(res.status(200) as any).sendMulti( | ||||||
responses.map((response) => { | ||||||
res.status(200).json({ | ||||||
responses: responses.map((response) => { | ||||||
if (response instanceof HttpError) { | ||||||
myarmolinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return httpErrorToResponse(response); | ||||||
myarmolinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} else { | ||||||
return response; | ||||||
} | ||||||
}), | ||||||
); | ||||||
}); | ||||||
} | ||||||
} catch (e: any) { | ||||||
if (handleHttpErrors(req, res, e)) { | ||||||
|
@@ -1436,7 +1550,7 @@ export const handleHttpErrors = ( | |||||
for (const handleErrorFn of handleErrorFns) { | ||||||
handleErrorFn(req, err); | ||||||
} | ||||||
const response = httpErrorToResponse(err); | ||||||
const response = httpErrorToResponse(err, req); | ||||||
handleResponse(res, response); | ||||||
return true; | ||||||
} | ||||||
|
@@ -1455,10 +1569,12 @@ const handleResponse = (res: Express.Response, response: Response): void => { | |||||
|
||||||
const httpErrorToResponse = ( | ||||||
err: HttpError, | ||||||
req?: Express.Request, | ||||||
): RequiredField<Response, 'statusCode'> => { | ||||||
const message = err.getResponseBody(); | ||||||
return { | ||||||
statusCode: err.status, | ||||||
body: err.getResponseBody(), | ||||||
body: req != null && 'batch' in req ? { responses: [], message } : message, | ||||||
headers: err.headers, | ||||||
}; | ||||||
}; | ||||||
|
@@ -1572,7 +1688,8 @@ const runChangeSet = | |||||
throw new Error('No request id'); | ||||||
} | ||||||
result.headers ??= {}; | ||||||
result.headers['content-id'] = request.id; | ||||||
result.headers['content-id'] = request.batchRequestId; | ||||||
result.id = request.batchRequestId; | ||||||
changeSetResults.set(request.id, result); | ||||||
}; | ||||||
|
||||||
|
@@ -1611,22 +1728,32 @@ const prepareResponse = async ( | |||||
result: any, | ||||||
tx: Db.Tx, | ||||||
): Promise<Response> => { | ||||||
let response: Response; | ||||||
switch (request.method) { | ||||||
case 'GET': | ||||||
return await respondGet(req, request, result, tx); | ||||||
response = await respondGet(req, request, result, tx); | ||||||
break; | ||||||
case 'POST': | ||||||
return await respondPost(req, request, result, tx); | ||||||
response = await respondPost(req, request, result, tx); | ||||||
break; | ||||||
case 'PUT': | ||||||
case 'PATCH': | ||||||
case 'MERGE': | ||||||
return await respondPut(req, request, result, tx); | ||||||
response = await respondPut(req, request, result, tx); | ||||||
break; | ||||||
case 'DELETE': | ||||||
return await respondDelete(req, request, result, tx); | ||||||
response = await respondDelete(req, request, result, tx); | ||||||
break; | ||||||
case 'OPTIONS': | ||||||
return await respondOptions(req, request, result, tx); | ||||||
response = await respondOptions(req, request, result, tx); | ||||||
break; | ||||||
default: | ||||||
throw new MethodNotAllowedError(); | ||||||
} | ||||||
if (request.batchRequestId != null) { | ||||||
response['id'] = request.batchRequestId; | ||||||
} | ||||||
return response; | ||||||
}; | ||||||
|
||||||
const checkReadOnlyRequests = (request: uriParser.ODataRequest) => { | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,19 +25,22 @@ import { | |
TranslationError, | ||
} from './errors'; | ||
import * as sbvrUtils from './sbvr-utils'; | ||
import { IncomingHttpHeaders } from 'http'; | ||
|
||
export type OdataBinds = ODataBinds; | ||
|
||
export interface UnparsedRequest { | ||
id?: string; | ||
myarmolinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||
method: string; | ||
url: string; | ||
data?: any; | ||
headers?: { [header: string]: string }; | ||
body?: any; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the reasoning behind renaming data to body? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was following the spec, which specifies that there is a If we want to keep it is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This predates the odata json $batch but now that it exists I think it makes sense to match that format for our internal data structure as it should simplify a whole bunch of things |
||
headers?: IncomingHttpHeaders; | ||
changeSet?: UnparsedRequest[]; | ||
_isChangeSet?: boolean; | ||
} | ||
|
||
export interface ParsedODataRequest { | ||
headers?: IncomingHttpHeaders; | ||
method: SupportedMethod; | ||
url: string; | ||
vocabulary: string; | ||
|
@@ -48,6 +51,7 @@ export interface ParsedODataRequest { | |
odataBinds: OdataBinds; | ||
custom: AnyObject; | ||
id?: number | undefined; | ||
batchRequestId?: string; | ||
_defer?: boolean; | ||
} | ||
export interface ODataRequest extends ParsedODataRequest { | ||
|
@@ -263,15 +267,19 @@ export const metadataEndpoints = ['$metadata', '$serviceroot']; | |
|
||
export async function parseOData( | ||
b: UnparsedRequest & { _isChangeSet?: false }, | ||
headers?: IncomingHttpHeaders, | ||
): Promise<ParsedODataRequest>; | ||
export async function parseOData( | ||
b: UnparsedRequest & { _isChangeSet: true }, | ||
headers?: IncomingHttpHeaders, | ||
): Promise<ParsedODataRequest[]>; | ||
export async function parseOData( | ||
b: UnparsedRequest, | ||
headers?: IncomingHttpHeaders, | ||
): Promise<ParsedODataRequest | ParsedODataRequest[]>; | ||
export async function parseOData( | ||
b: UnparsedRequest, | ||
batchHeaders?: IncomingHttpHeaders, | ||
): Promise<ParsedODataRequest | ParsedODataRequest[]> { | ||
try { | ||
if (b._isChangeSet && b.changeSet != null) { | ||
|
@@ -292,12 +300,14 @@ export async function parseOData( | |
const odata = memoizedParseOdata(url); | ||
|
||
return { | ||
batchRequestId: b.id, | ||
headers: { ...batchHeaders, ...b.headers }, | ||
method: b.method as SupportedMethod, | ||
url, | ||
vocabulary: apiRoot, | ||
resourceName: odata.tree.resource, | ||
originalResourceName: odata.tree.resource, | ||
values: b.data ?? {}, | ||
values: b.body ?? {}, | ||
odataQuery: odata.tree, | ||
odataBinds: odata.binds, | ||
custom: {}, | ||
|
@@ -362,7 +372,7 @@ const parseODataChangeset = ( | |
originalResourceName: odata.tree.resource, | ||
odataBinds: odata.binds, | ||
odataQuery: odata.tree, | ||
values: b.data ?? {}, | ||
values: b.body ?? {}, | ||
custom: {}, | ||
id: contentId, | ||
_defer: defer, | ||
|
@@ -379,7 +389,7 @@ const splitApiRoot = (url: string) => { | |
}; | ||
|
||
const mustExtractHeader = ( | ||
body: { headers?: { [header: string]: string } }, | ||
body: { headers?: IncomingHttpHeaders }, | ||
header: string, | ||
) => { | ||
const h: any = body.headers?.[header]?.[0]; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically params for authentication can be customized so this isn't perfect - I'm not sure if it's something we can account for but worth being aware of