Skip to content

Commit

Permalink
Add records_processed Prom metrics (#1178)
Browse files Browse the repository at this point in the history
* update job-components from 0.74.1 to 0.74.2

* add record_processed promMetrics to ReadeAPIFetcher and ElasticsearcReader

* update elasticsearch-asset-apis

* update script from 0.77.1 to 0.77.2, update teraslice-state-storage from 0.53.1 to 0.53.2

* bump: (minor) @terascope/[email protected], [email protected]

bump: (minor) [email protected]

* change labels, rename things

* update yarn.lock
  • Loading branch information
busma13 authored May 21, 2024
1 parent ade89ab commit 11944c3
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 21 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"name": "elasticsearch",
"version": "3.5.8"
"version": "3.6.0"
}
8 changes: 4 additions & 4 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "asset",
"displayName": "Asset",
"version": "3.5.8",
"version": "3.6.0",
"private": true,
"description": "",
"license": "MIT",
Expand All @@ -19,9 +19,9 @@
"dependencies": {
"@terascope/data-mate": "^0.56.2",
"@terascope/elasticsearch-api": "^3.20.2",
"@terascope/elasticsearch-asset-apis": "^0.11.10",
"@terascope/job-components": "^0.74.1",
"@terascope/teraslice-state-storage": "^0.53.1",
"@terascope/elasticsearch-asset-apis": "^0.12.0",
"@terascope/job-components": "^0.74.2",
"@terascope/teraslice-state-storage": "^0.53.2",
"@terascope/utils": "^0.59.1",
"datemath-parser": "^1.0.6",
"got": "^11.8.3",
Expand Down
14 changes: 14 additions & 0 deletions asset/src/__lib/ReaderAPIFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ export class ReaderAPIFetcher extends Fetcher<ESDateConfig> {
const apiManager = this.getAPI<ElasticReaderFactoryAPI>(apiName);
this.api = await apiManager.create(apiName, {});
await super.initialize();

const { context, api, opConfig } = this;
await this.context.apis.foundation.promMetrics.addGauge(
'elasticsearch_records_read',
'Number of records read from elasticsearch',
['op_name'],
async function collect() {
const recordsRead = api.getRecordsFetched();
const labels = {
op_name: opConfig._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, recordsRead);
});
}

async fetch(slice: ReaderSlice): Promise<DataEntity[] | DataFrame | Buffer> {
Expand Down
21 changes: 21 additions & 0 deletions asset/src/elasticsearch_bulk/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,26 @@ import { ElasticsearchBulkConfig } from './interfaces';
export default class ElasticsearchBulk extends BatchProcessor<ElasticsearchBulkConfig> {
client!: ElasticsearchBulkSender;
apiManager!: ElasticSenderAPI;
private recordsWritten = 0;

async initialize(): Promise<void> {
await super.initialize();
const apiManager = this.getAPI<ElasticSenderAPI>(this.opConfig.api_name);
this.client = await apiManager.create('bulkSender', this.opConfig);
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
this.context.apis.foundation.promMetrics.addGauge(
'elasticsearch_records_written',
'Number of records written to elasticsearch',
['op_name'],
async function collect() {
const labels = {
op_name: self.opConfig._op,
...self.context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, self.getRecordsWritten());
}
);
}

async onBatch(data: DataEntity[]): Promise<DataEntity[]> {
Expand All @@ -27,6 +42,12 @@ export default class ElasticsearchBulk extends BatchProcessor<ElasticsearchBulkC
}
}

this.recordsWritten += data.length;

return data;
}

getRecordsWritten() {
return this.recordsWritten;
}
}
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticsearch-assets",
"displayName": "Elasticsearch Assets",
"version": "3.5.8",
"version": "3.6.0",
"private": true,
"description": "bundle of processors for teraslice",
"homepage": "https://github.com/terascope/elasticsearch-assets#readme",
Expand Down Expand Up @@ -44,11 +44,11 @@
"devDependencies": {
"@terascope/data-types": "^0.50.1",
"@terascope/elasticsearch-api": "^3.20.2",
"@terascope/elasticsearch-asset-apis": "^0.11.10",
"@terascope/elasticsearch-asset-apis": "^0.12.0",
"@terascope/eslint-config": "^0.8.0",
"@terascope/job-components": "^0.74.1",
"@terascope/job-components": "^0.74.2",
"@terascope/scripts": "0.77.2",
"@terascope/teraslice-state-storage": "^0.53.1",
"@terascope/teraslice-state-storage": "^0.53.2",
"@terascope/types": "^0.17.1",
"@types/bluebird": "^3.5.38",
"@types/elasticsearch": "^5.0.40",
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-asset-apis/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/elasticsearch-asset-apis",
"displayName": "Elasticsearch Asset Apis",
"version": "0.11.11",
"version": "0.12.0",
"description": "Elasticsearch reader and sender apis",
"homepage": "https://github.com/terascope/elasticsearch-assets",
"repository": "[email protected]:terascope/elasticsearch-assets.git",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class ElasticsearchReaderAPI {
windowSize: number|undefined = undefined;
protected readonly dateFormat: string;
protected readonly emitter: EventEmitter;
recordsFetched = 0;

constructor(
config: ESReaderOptions, client: ReaderClient, emitter: EventEmitter, logger: Logger
Expand Down Expand Up @@ -149,6 +150,7 @@ export class ElasticsearchReaderAPI {
this.logger.debug(msg);
throw new Error(msg); // throw for pRetry
}
this.recordsFetched += resultSize;

return result;
};
Expand Down Expand Up @@ -741,6 +743,10 @@ export class ElasticsearchReaderAPI {
// this is method in api is badly named
return this.client.verify();
}

getRecordsFetched() {
return this.recordsFetched;
}
}

function isObject(val: unknown): val is AnyObject {
Expand Down
23 changes: 12 additions & 11 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@
lodash "^4.17.21"
yargs "^17.7.2"

"@terascope/elasticsearch-api@^3.20.1", "@terascope/elasticsearch-api@^3.20.2":
"@terascope/elasticsearch-api@^3.20.2":
version "3.20.2"
resolved "https://registry.yarnpkg.com/@terascope/elasticsearch-api/-/elasticsearch-api-3.20.2.tgz#f207d68d672bb8bd36fa8e25e6c2c7f21204a70b"
integrity sha512-Tg5EKfKuGurHo9RSl5HHhRUKGXNpflqKjq6nXwfm6A3sO3cO4nMK/TzSxX+/+Dhx8yjgU/0BKOKwCHsoU8R+tw==
Expand Down Expand Up @@ -803,16 +803,17 @@
progress "^2.0.3"
yargs "^17.2.1"

"@terascope/job-components@^0.74.1":
version "0.74.1"
resolved "https://registry.yarnpkg.com/@terascope/job-components/-/job-components-0.74.1.tgz#374fda9ba69a7ee035110ac13c5fe2a28a2f5bfe"
integrity sha512-DjDj2qGUIJVpiw8HGcdVCuEZsM8hi3d8GchNKFbherxyD7rtv3y+56nbT/rtHSoK8RhCuJTxfdaLwzpdLlpQcg==
"@terascope/job-components@^0.74.2":
version "0.74.2"
resolved "https://registry.yarnpkg.com/@terascope/job-components/-/job-components-0.74.2.tgz#de875f792644abf3324f74d98e6b7daa046d7fff"
integrity sha512-FEliIWkcK43Q+kiONkwutXgV/f9qQMGUjesmJq9IyqYswOd7ZHtlXCjBUziKqdUWZ01OZ3YDzFxeuufJoLflUg==
dependencies:
"@terascope/utils" "^0.59.1"
convict "^6.2.4"
convict-format-with-moment "^6.2.0"
convict-format-with-validator "^6.2.0"
datemath-parser "^1.0.6"
prom-client "^15.1.2"
uuid "^9.0.1"

"@terascope/[email protected]":
Expand Down Expand Up @@ -851,13 +852,13 @@
dependencies:
bluebird "^3.7.2"

"@terascope/teraslice-state-storage@^0.53.1":
version "0.53.1"
resolved "https://registry.yarnpkg.com/@terascope/teraslice-state-storage/-/teraslice-state-storage-0.53.1.tgz#b1c5c12e39202b42062b94e936f1b7466f69db27"
integrity sha512-U1Z75uekFBTjvbRT2CdMAED0HhZqsG1k37KS+YBXzHyQMFjCAQcAF9v5bCuVEQgA7lgFyfdyggSXAkp85C9MXg==
"@terascope/teraslice-state-storage@^0.53.2":
version "0.53.2"
resolved "https://registry.yarnpkg.com/@terascope/teraslice-state-storage/-/teraslice-state-storage-0.53.2.tgz#bc21d9b5fd9bade3e82121ae2c0d6c69a67ba1c0"
integrity sha512-6uG/kTodEYNyWopfTHA2zMqr488HmZlryEIPa2eBGxZO5vr46cjnd1EOcY5gDDfdSruVQMwq2z++SFTv4+RgTw==
dependencies:
"@terascope/elasticsearch-api" "^3.20.1"
"@terascope/utils" "^0.59.1"
"@terascope/elasticsearch-api" "^3.20.2"
"@terascope/utils" "^0.59.2"

"@terascope/types@^0.17.1", "@terascope/types@^0.17.2":
version "0.17.2"
Expand Down

0 comments on commit 11944c3

Please sign in to comment.