Skip to content

Commit

Permalink
[FEAT] [OS] expose timeout on put, for really large assets, it is pos…
Browse files Browse the repository at this point in the history
…sible for the publish request to timeout (#457)
  • Loading branch information
aricart authored Jan 12, 2023
1 parent c8d0410 commit 6420682
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
1 change: 1 addition & 0 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export type {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PeerInfo,
Placement,
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export type {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PeerInfo,
Perf,
Expand Down
19 changes: 14 additions & 5 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The NATS Authors
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -26,6 +26,7 @@ import {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PubAck,
PurgeResponse,
Expand Down Expand Up @@ -306,8 +307,12 @@ export class ObjectStoreImpl implements ObjectStore {
async _put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array> | null,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo> {
const jsi = this.js as JetStreamClientImpl;
opts = opts || { timeout: jsi.timeout };
opts.timeout = opts.timeout || 1000;
const { timeout } = opts;
const maxPayload = jsi.nc.info?.max_payload || 1024;
meta = meta || {} as ObjectStoreMeta;
meta.options = meta.options || {};
Expand Down Expand Up @@ -351,7 +356,7 @@ export class ObjectStoreImpl implements ObjectStore {
sha.update(payload);
info.chunks!++;
info.size! += payload.length;
proms.push(this.js.publish(chunkSubj, payload));
proms.push(this.js.publish(chunkSubj, payload, { timeout }));
}
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
Expand All @@ -364,7 +369,10 @@ export class ObjectStoreImpl implements ObjectStore {
const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
proms.push(
this.js.publish(metaSubj, JSONCodec().encode(info), { headers: h }),
this.js.publish(metaSubj, JSONCodec().encode(info), {
headers: h,
timeout,
}),
);

// if we had this object trim it out
Expand All @@ -389,7 +397,7 @@ export class ObjectStoreImpl implements ObjectStore {
const payload = db.drain(meta.options.max_chunk_size);
sha.update(payload);
proms.push(
this.js.publish(chunkSubj, payload),
this.js.publish(chunkSubj, payload, { timeout }),
);
}
}
Expand All @@ -406,13 +414,14 @@ export class ObjectStoreImpl implements ObjectStore {
put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array> | null,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo> {
if (meta?.options?.link) {
return Promise.reject(
new Error("link cannot be set when putting the object in bucket"),
);
}
return this._put(meta, rs);
return this._put(meta, rs, opts);
}

async get(name: string): Promise<ObjectResult | null> {
Expand Down
10 changes: 9 additions & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 The NATS Authors
* Copyright 2020-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -2919,13 +2919,21 @@ export type ObjectResult = {
error: Promise<Error | null>;
};

export type ObjectStorePutOpts = {
/**
* maximum number of millis for the put requests to succeed
*/
timeout: number;
};

export interface ObjectStore {
info(name: string): Promise<ObjectInfo | null>;
list(): Promise<ObjectInfo[]>;
get(name: string): Promise<ObjectResult | null>;
put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array>,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo>;
delete(name: string): Promise<PurgeResponse>;
link(name: string, meta: ObjectInfo): Promise<ObjectInfo>;
Expand Down

0 comments on commit 6420682

Please sign in to comment.