Skip to content

Commit

Permalink
feat: adds unit tests for sync and async reads of a pipeline transfor…
Browse files Browse the repository at this point in the history
…mation (#1663)

* add unit test for pipeline transformations

* add a unit test for a pipeline transformation

* Update gax/test/unit/streaming.ts

Co-authored-by: Leah E. Cole <[email protected]>

* fix unit test

* incorporate review suggestions

* louder error message

* added an async test

* Update gax/test/unit/streaming.ts

Co-authored-by: Leah E. Cole <[email protected]>

* Update gax/test/unit/streaming.ts

Co-authored-by: Leah E. Cole <[email protected]>

* update node version for CI

* change ci to only node 18

* unrevert node changes

* fix node engine

* fix typo

* change engine

* fix package json

* change node engine to 16

* fix engine

* fix engine

* change compodoc version to be supported by node 16

* override compodoc dependency issue for node 16

* fix package json issues

* revert node engine

---------

Co-authored-by: Leah E. Cole <[email protected]>
  • Loading branch information
kevkim-codes and leahecole authored Oct 25, 2024
1 parent f57a58e commit c7995c4
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 2 deletions.
1 change: 0 additions & 1 deletion gax/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"webpack": "^4.0.0",
"webpack-cli": "^4.0.0"
},

"scripts": {
"docs": "jsdoc -c .jsdoc.js",
"pretest": "npm run prepare",
Expand Down
155 changes: 154 additions & 1 deletion gax/test/unit/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import * as assert from 'assert';
import * as sinon from 'sinon';
import {afterEach, describe, it} from 'mocha';
import {PassThrough} from 'stream';
import {PassThrough, Transform, pipeline} from 'stream';

import {
GaxCallStream,
Expand Down Expand Up @@ -297,6 +297,7 @@ describe('streaming', () => {
done();
});
});

it('cancels in the middle when new retries are enabled', done => {
function schedulePush(s: CancellableStream, c: number) {
const intervalId = setInterval(() => {
Expand Down Expand Up @@ -441,6 +442,7 @@ describe('streaming', () => {
assert.strictEqual(responseCallback.callCount, 1);
});
});

it('emit response when stream received metadata event and new gax retries is enabled', done => {
const responseMetadata = {metadata: true};
const expectedStatus = {code: 0, metadata: responseMetadata};
Expand Down Expand Up @@ -734,6 +736,7 @@ describe('streaming', () => {
done();
});
});

it('emit parsed GoogleError when new retries are enabled', done => {
const errorInfoObj = {
reason: 'SERVICE_DISABLED',
Expand Down Expand Up @@ -898,6 +901,7 @@ describe('streaming', () => {
done();
});
});

it('emit transient error message if neither maxRetries nor totaltimeout are defined when new retries are enabled', done => {
const errorInfoObj = {
reason: 'SERVICE_DISABLED',
Expand Down Expand Up @@ -982,6 +986,7 @@ describe('streaming', () => {
done();
});
});

it('emit transient error on second or later error when new retries are enabled', done => {
const errorInfoObj = {
reason: 'SERVICE_DISABLED',
Expand Down Expand Up @@ -1103,6 +1108,7 @@ describe('streaming', () => {
done();
});
});

it('emit error and retry once', done => {
const firstError = Object.assign(new GoogleError('UNAVAILABLE'), {
code: 14,
Expand Down Expand Up @@ -1258,6 +1264,7 @@ describe('streaming', () => {
done();
});
});

it('retries using resumption request function ', done => {
const receivedData: string[] = [];
const error = Object.assign(new GoogleError('test error'), {
Expand Down Expand Up @@ -1371,6 +1378,7 @@ describe('streaming', () => {
done();
});
});

it('errors when there is a resumption request function an gaxStreamingRetries is not enabled', done => {
const error = Object.assign(new GoogleError('test error'), {
code: 14,
Expand Down Expand Up @@ -1466,6 +1474,138 @@ describe('streaming', () => {
done();
});
});

it('properly emits the end event at the end of a pipeline transformation synchronously', done => {
const spy = sinon.spy((...args: Array<{}>) => {
assert.strictEqual(args.length, 3);
const s = new PassThrough({
objectMode: true,
});
s.push({resources: [1, 2]});
s.push(null);
setImmediate(() => {
s.emit('metadata');
});
setImmediate(() => {
s.emit('status');
});

return s;
});

// Initial stream.
const apiCall = createApiCallStreaming(
spy,
streaming.StreamType.SERVER_STREAMING,
false,
true // new retry behavior enabled
);
const s1 = apiCall({}, undefined);

// Transform stream.
const transform = new Transform({
objectMode: true,
transform: (data, _encoding, callback) => {
callback(
null,
data.resources.map((element: number) => element + 1)
);
},
});

// Final stream.
const s2 = new PassThrough({
objectMode: true,
});

const finalResults: Array<{resources: Array<number>}> = [];

s2.on('data', data => {
finalResults.push(data);
});
s2.on('end', () => {
assert.strictEqual(
JSON.stringify(finalResults),
JSON.stringify([[2, 3]])
);
done();
});

pipeline(s1, transform, s2, err => {
if (err) {
throw new Error(
'pipeline in properly emits the end event at the end of a pipeline transformation test failed'
);
}
});
});

it('properly emits the end event at the end of a pipeline transformation asynchronously', done => {
const spy = sinon.spy((...args: Array<{}>) => {
assert.strictEqual(args.length, 3);
const s = new PassThrough({
objectMode: true,
});
s.push({resources: [1, 2]});
s.push(null);
setImmediate(() => {
s.emit('metadata');
});
setImmediate(() => {
s.emit('status');
});

return s;
});

// Initial stream.
const apiCall = createApiCallStreaming(
spy,
streaming.StreamType.SERVER_STREAMING,
false,
true // new retry behavior enabled
);
const s1 = apiCall({}, undefined);

// Transform stream.
const transform = new Transform({
objectMode: true,
transform: (data, _encoding, callback) => {
setTimeout(() => {
callback(
null,
data.resources.map((element: number) => element + 1)
);
}, 10);
},
});

// Final stream.
const s2 = new PassThrough({
objectMode: true,
});

const finalResults: Array<{resources: Array<number>}> = [];

s2.on('data', data => {
finalResults.push(data);
});
s2.on('end', () => {
assert.strictEqual(
JSON.stringify(finalResults),
JSON.stringify([[2, 3]])
);
done();
});

pipeline(s1, transform, s2, err => {
if (err) {
throw new Error(
'pipeline in properly emits the end event at the end of a pipeline transformation test failed'
);
}
});
});
});

describe('handles server streaming retries in gax when gaxStreamingRetries is enabled', () => {
Expand Down Expand Up @@ -1536,6 +1676,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
});
});

it('server streaming call retries until exceeding max retries and surfaces underlying error in note', done => {
const retrySpy = sinon.spy(
streaming.StreamProxy.prototype,
Expand Down Expand Up @@ -1600,6 +1741,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
});
});

it('does not retry when there is no shouldRetryFn and retryCodes is an empty array', done => {
// we don't call the timeout/max retry check on non retryable error codes
const retrySpy = sinon.spy(
Expand Down Expand Up @@ -1715,6 +1857,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
});
});

it('allows custom CallOptions.retry settings with shouldRetryFn instead of retryCodes and new retry behavior', done => {
sinon
.stub(streaming.StreamProxy.prototype, 'eventForwardHelper')
Expand Down Expand Up @@ -1758,6 +1901,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
);
});

it('allows custom CallOptions.retry settings with retryCodes and new retry behavior', done => {
sinon
.stub(streaming.StreamProxy.prototype, 'eventForwardHelper')
Expand Down Expand Up @@ -1794,6 +1938,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
);
});

it('allows the user to pass a custom resumption strategy', done => {
sinon
// typecasting to any is a workaround for stubbing private functions in sinon
Expand Down Expand Up @@ -1849,6 +1994,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
);
});

it('throws an error when both totalTimeoutMillis and maxRetries are passed at call time when new retry behavior is enabled', done => {
const status = {code: 4, message: 'test'};
const error = Object.assign(new GoogleError('test error'), {
Expand Down Expand Up @@ -1909,6 +2055,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
}
});
});

it('throws an error when both retryRequestoptions and retryOptions are passed at call time when new retry behavior is enabled', done => {
//if this is reached, it means the settings merge in createAPICall did not fail properly
sinon.stub(StreamingApiCaller.prototype, 'call').callsFake(() => {
Expand Down Expand Up @@ -1967,6 +2114,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
done();
}
});

it('throws a warning and converts retryRequestOptions for new retry behavior', done => {
const warnStub = sinon.stub(warnings, 'warn');
sinon
Expand Down Expand Up @@ -2065,6 +2213,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
)
);
});

it('throws a warning and converts retryRequestOptions for new retry behavior - zero/falsiness check', done => {
const warnStub = sinon.stub(warnings, 'warn');
sinon
Expand Down Expand Up @@ -2163,6 +2312,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
)
);
});

it('throws a warning and converts retryRequestOptions for new retry behavior - no maxRetries', done => {
const warnStub = sinon.stub(warnings, 'warn');
sinon
Expand Down Expand Up @@ -2260,6 +2410,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
)
);
});

it('throws a warning and converts retryRequestOptions for new retry behavior - no maxRetries zero/falsiness check', done => {
const warnStub = sinon.stub(warnings, 'warn');
sinon
Expand Down Expand Up @@ -2358,6 +2509,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
);
});
});

describe('warns/errors about server streaming retry behavior when gaxStreamingRetries is disabled', () => {
afterEach(() => {
// restore 'call' stubs and 'warn' stubs
Expand Down Expand Up @@ -2398,6 +2550,7 @@ describe('REST streaming apiCall return StreamArrayParser', () => {
const UserService = root.lookupService('UserService');
UserService.resolveAll();
const streamMethod = UserService.methods['RunQuery'];

it('forwards data, end event', done => {
const spy = sinon.spy((...args: Array<{}>) => {
assert.strictEqual(args.length, 3);
Expand Down

0 comments on commit c7995c4

Please sign in to comment.