-
Notifications
You must be signed in to change notification settings - Fork 5
/
stampede.js
216 lines (177 loc) · 6.58 KB
/
stampede.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
const Promise = require('bluebird');
const crypto = require('crypto');
const zlib = require('zlib');
Promise.config({ warnings: false});
Promise.promisifyAll(zlib);
const keyNotFound = () => { throw new Error('KEY_NOT_FOUND');};
class Stampede {
constructor(options) {
options = this.options = options || {};
this.whenFnExecuted = options.whenFnExecuted;
this.retryDelay = (options.retryDelay !== undefined) ? options.retryDelay : 100;
this.maxRetries = (options.maxRetries !== undefined) ? options.maxRetries : 5;
this.expiry = options.expiry;
this.passphrase = options.passphrase;
this.algo = options.algo || 'aes192';
if (!options.adapter) throw 'Missing adapter';
this.adapter = Promise.resolve(options.adapter);
// Bind the cached function to make it passable directly to consumers of the cache
this.cached = this.cached.bind(this);
this.Promise = options.Promise || Promise;
}
async setup() {
this._adapter = await this.adapter;
this._hasSetup = true;
}
async get(key,options,retry) {
if (!this._hasSetup) await this.setup();
let value;
retry = retry || 0;
options = options || {};
const maxRetries = (options.maxRetries !== undefined) ? options.maxRetries : this.maxRetries;
const retryDelay = (options.retryDelay !== undefined) ? options.retryDelay : this.retryDelay;
// If we already have the value pre-cached we use it
if (options.preCache && options.preCache[key] !== undefined)
value = await options.preCache[key];
let d = ( value || await this._adapter.get(key,options));
if (!d) keyNotFound();
const updated = +(new Date(d.updated));
const now = new Date();
const expired = d.expiryTime && (now > +d.expiryTime);
const aged = (!isNaN(options.maxAge) && (options.maxAge || options.maxAge === 0) && (updated + (+options.maxAge)) < now);
const retryExpiry = (d.__caching__ && options.retryExpiry && !isNaN(options.retryExpiry) && (updated + (+options.retryExpiry)) < now);
if (expired || aged || retryExpiry) {
if (!options.readOnly) {
await this._adapter.remove(key, d);
}
d = undefined;
keyNotFound();
}
if (d.__caching__) {
if (retry++ > maxRetries)
throw new Error('MAXIMUM_RETRIES');
await Promise.delay(retryDelay);
return this.get(key,options,retry);
}
if (d.compressed) {
const inflated = await zlib.inflateAsync(d.data);
d.data = JSON.parse(inflated);
}
if (d.encrypted) {
const passphrase = options.passphrase !== undefined ? options.passphrase : this.passphrase;
d.data = this.decrypt(d.data,passphrase);
d.encrypted = false;
}
d.updated = new Date(d.updated);
if (d.error) throw d.data;
else return d;
}
async set(key,fn,options) {
if (key === undefined) {
throw new Error('Key cannot be undefined');
}
if (!this._hasSetup) await this.setup();
options = options || {};
const payload = {
info : options.info,
__caching__ : true,
updated : new Date(),
};
const expiry = options.expiry || this.expiry;
if (expiry) payload.expiryTime = new Date().valueOf() + expiry;
await (options.upsert ? this._adapter.update(key,payload,expiry) : this._adapter.insert(key,payload,expiry));
const finalize = async(err, d) => {
if (err) {
if (typeof err === 'string') err = {message:err};
// If the error is to be cached we transform into a JSON object
if (err && err.cache) {
d = Object.assign({}, err, {error: true});
}
// Otherwise we remove the key and throw directly
else {
await this._adapter.remove(key); // no need to pass the payload to what is being removed since it will never be actual data
throw err;
}
}
const raw_data = d;
// Optional encryption
const passphrase = options.passphrase !== undefined ? options.passphrase : this.passphrase;
if (passphrase) {
payload.encrypted = true;
d = this.encrypt(d,passphrase);
}
// Optional compression
const compressed = options.compressed !== undefined ? options.compressed : this.compressed;
if (compressed) {
payload.compressed = true;
d = await zlib.deflateAsync(JSON.stringify(d));
}
payload.data = d;
payload.__caching__ = false;
if (d && d.error) payload.error = true;
let updatePromise = this._adapter.update(key,payload,expiry);
if (this.whenFnExecuted) {
await this.whenFnExecuted(key, payload);
}
await updatePromise;
payload.data = raw_data;
if (payload.error) throw payload.data;
else return options.payload ? payload : payload.data;
};
if (options.clues) {
return [ function $noThrow(_) { return fn; }, function(value) {
return value.error ? finalize(value) : finalize(null, value);
}];
} else {
try {
const data = (typeof fn === 'function') ? await Promise.try(fn) : fn;
return finalize(null, data);
} catch(e) {
return finalize(e);
}
}
}
async info(key,options) {
if (!this._hasSetup) await this.setup();
const d = await this._adapter.get(key,options);
return d.info;
}
async cached(key,fn,options) {
if (!this._hasSetup) await this.setup();
options = options || {};
try {
const d = await this.get(key,options,0);
return options.payload ? d : d.data;
} catch(e) {
if (e && e.message === 'KEY_NOT_FOUND') {
try {
return await this.set(key,fn,options);
} catch(err) {
// If we experienced a race situation we try to get the results
if (err && err.message && String(err.message).indexOf('KEY_EXISTS') !== -1)
return this.cached(key,fn,options);
else
throw err;
}
} else throw e;
}
}
encrypt(data,passphrase) {
if (!passphrase) throw 'MISSING_PASSPHRASE';
const cipher = crypto.createCipher(this.algo ,passphrase);
return cipher.update(JSON.stringify(data),'utf-8','base64') + cipher.final('base64');
}
decrypt(data,passphrase) {
if (!passphrase) throw 'MISSING_PASSPHRASE';
const decipher = crypto.createDecipher(this.algo,passphrase);
try {
return JSON.parse(decipher.update(data,'base64','utf-8')+ decipher.final('utf-8'));
} catch(e) {
if (e instanceof TypeError || (e.message && e.message.indexOf('bad decrypt') !== -1))
throw new Error('BAD_PASSPHRASE');
else
throw e;
}
}
}
module.exports = Stampede;