-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathk8s_lock.js
121 lines (113 loc) · 4.37 KB
/
k8s_lock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
const {KubeConfig, V1MicroTime} = require("@kubernetes/client-node");
const k8s = require("@kubernetes/client-node");
class K8SLock {
constructor({kubeConfig, leaseName, namespace, createLeaseIfNotExist = true, labels = {}, lockLeaserId, leaseDurationInSeconds=30, refreshLockInterval, lockTryInterval}={}) {
this.kubeConfig = kubeConfig;
if(!this.kubeConfig) {
this.kubeConfig = new KubeConfig();
this.kubeConfig.loadFromDefault();
}
this.leaseName = leaseName;
this.namespace = namespace;
this.createLeaseIfNotExist = createLeaseIfNotExist;
this.labels = labels;
this.lockLeaserId = lockLeaserId;
this.leaseDurationInSeconds = leaseDurationInSeconds;
this.refreshLock = refreshLockInterval || this.leaseDurationInSeconds*1000/2; //half time lease guarantee lease will be keep
this.lockTryInterval = lockTryInterval ||this.leaseDurationInSeconds*1000;
this.isLocking = false;
}
async _lock() {
const k8sApi = this.kubeConfig.makeApiClient(k8s.CoordinationV1Api);
let lease;
try {
lease = await k8sApi.readNamespacedLease(this.leaseName, this.namespace);
}catch (e) {
if(e?.statusCode === 404 && this.createLeaseIfNotExist) {
lease = await k8sApi.createNamespacedLease(this.namespace,{
metadata: {
name: this.leaseName,
labels: this.labels
},
spec: {
}
});
} else {
throw e;
}
}
if(this.isLocking && lease.body.spec.holderIdentity === this.lockLeaserId) {
this.isLocking = false;
}
if(new Date(lease.body.spec.renewTime || 0 ) < new Date() || lease.body.spec.holderIdentity === this.lockLeaserId) {
const currentDate = new V1MicroTime();
try {
const body = {
metadata: {
labels: this.labels,
resourceVersion: lease.body.metadata.resourceVersion
},
spec: {
leaseDurationSeconds: this.leaseDurationInSeconds,
holderIdentity: this.lockLeaserId,
renewTime: new V1MicroTime(currentDate.getTime() + this.leaseDurationInSeconds * 1000)
}
};
if(lease.body.spec.holderIdentity !== this.lockLeaserId) {
body.spec.leaseTransitions = (lease.body.spec.leaseTransitions || 0) + 1;
body.spec.acquireTime= currentDate;
}
await k8sApi.patchNamespacedLease(this.leaseName, this.namespace, body,undefined,undefined,undefined,undefined,undefined,{
headers: {
"Content-Type": "application/strategic-merge-patch+json"
}
});
}catch (e) {
if(e?.statusCode === 409) {
this.isLocking = false;
return false;
}
throw e;
}
this.isLocking = true;
return true;
} else {
this.isLocking = false;
return false;
}
}
async _keepLocking() {
while(this.keepLocking) {
const resp = await this._lock();
if (!resp) {
this.keepLocking = false;
}
await new Promise((accept) => setTimeout(accept, this.refreshLock));
}
}
async startLocking() {
let self = this;
const lock = await this.getLock(true);
if(this.isLocking) {
this.keepLocking = true;
setTimeout(() => {
self._keepLocking()
}, this.refreshLock); //launch async
return {isLocking: this.isLocking};
}
}
async stopLocking() {
this.keepLocking = false;
}
async getLock(waitUntilLock) {
let locked = await this._lock();
if(waitUntilLock) {
while(!locked) {
await new Promise((accept) => setTimeout(accept, this.lockTryInterval));
locked = await this._lock();
}
}
return locked;
}
}
module.exports = {K8SLock};