Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LWT support #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 55 additions & 52 deletions client/base_client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { QoS } from "../lib/mod.ts";
import { encode as connectEncoder } from "../packets/connect.ts";
import { ConnectPacket, encode as connectEncoder } from "../packets/connect.ts";
import { encode as disconnectEncoder } from "../packets/disconnect.ts";
import type {
AnyPacket,
Expand Down Expand Up @@ -46,6 +46,7 @@ export type ClientOptions = {
reconnect?: boolean | RetryOptions;
incomingStore?: IncomingStore;
outgoingStore?: OutgoingStore;
will?: ConnectPacket["will"];
logger?: (msg: string, ...args: unknown[]) => void;
};

Expand Down Expand Up @@ -273,15 +274,14 @@ export abstract class Client {
protected constructor(options?: ClientOptions) {
this.options = options || {};
this.clientId = this.generateClientId();
this.keepAlive =
typeof this.options.keepAlive === "number"
? this.options.keepAlive
: defaultKeepAlive;
this.keepAlive = typeof this.options.keepAlive === "number"
? this.options.keepAlive
: defaultKeepAlive;

this.incomingStore =
this.options.incomingStore || new IncomingMemoryStore();
this.outgoingStore =
this.options.outgoingStore || new OutgoingMemoryStore();
this.incomingStore = this.options.incomingStore ||
new IncomingMemoryStore();
this.outgoingStore = this.options.outgoingStore ||
new OutgoingMemoryStore();

this.log = this.options.logger || (() => {});

Expand All @@ -296,7 +296,9 @@ export abstract class Client {
break;
default:
return Promise.reject(
new Error(`should not be connecting in ${this.connectionState} state`)
new Error(
`should not be connecting in ${this.connectionState} state`,
),
);
}

Expand All @@ -314,7 +316,7 @@ export abstract class Client {
public publish(
topic: string,
payload: PublishPayload,
options?: PublishOptions
options?: PublishOptions,
): Promise<void> {
const dup = (options && options.dup) || false;
const qos = (options && options.qos) || 0;
Expand Down Expand Up @@ -379,44 +381,44 @@ export abstract class Client {

public async subscribe(
topicFilter: string,
qos?: QoS
qos?: QoS,
): Promise<Subscription[]>;

public async subscribe(
topicFilters: string[],
qos?: QoS
qos?: QoS,
): Promise<Subscription[]>;

public async subscribe(
subscription: SubscriptionOption,
qos?: QoS
qos?: QoS,
): Promise<Subscription[]>;

public async subscribe(
subscriptions: SubscriptionOption[],
qos?: QoS
qos?: QoS,
): Promise<Subscription[]>;

public async subscribe(
input: SubscriptionOption | string | (SubscriptionOption | string)[],
qos?: QoS
qos?: QoS,
): Promise<Subscription[]> {
switch (this.connectionState) {
case "disconnecting":
case "disconnected":
throw new Error(
`should not be subscribing in ${this.connectionState} state`
`should not be subscribing in ${this.connectionState} state`,
);
}

const arr = Array.isArray(input) ? input : [input];
const subs = arr.map<Subscription>((sub) => {
return typeof sub === "object"
? {
topicFilter: sub.topicFilter,
qos: sub.qos || qos || 0,
state: "pending",
}
topicFilter: sub.topicFilter,
qos: sub.qos || qos || 0,
state: "pending",
}
: { topicFilter: sub, qos: qos || 0, state: "pending" };
});
const promises = [];
Expand All @@ -427,7 +429,7 @@ export abstract class Client {
// to do when it receives a subscribe packet containing a topic filter
// matching an existing subscription.
this.subscriptions = this.subscriptions.filter(
(old) => old.topicFilter !== sub.topicFilter
(old) => old.topicFilter !== sub.topicFilter,
);

this.subscriptions.push(sub);
Expand Down Expand Up @@ -482,7 +484,7 @@ export abstract class Client {
case "disconnecting":
case "disconnected":
throw new Error(
`should not be unsubscribing in ${this.connectionState} state`
`should not be unsubscribing in ${this.connectionState} state`,
);
}

Expand All @@ -491,7 +493,7 @@ export abstract class Client {

for (const topicFilter of arr) {
const sub = this.subscriptions.find(
(sub) => sub.topicFilter === topicFilter
(sub) => sub.topicFilter === topicFilter,
) || { topicFilter, qos: 0, state: "unknown" };
const deferred = new Deferred<UnsubackPacket | null>();
const promise = deferred.promise.then(() => sub);
Expand Down Expand Up @@ -539,7 +541,7 @@ export abstract class Client {
for (const sub of this.subscriptions) {
if (sub.state === "removed") {
const unresolvedSubscribe = this.unresolvedSubscribes.get(
sub.topicFilter
sub.topicFilter,
);

if (unresolvedSubscribe) {
Expand All @@ -549,7 +551,7 @@ export abstract class Client {
}

const unresolvedUnsubscribe = this.unresolvedUnsubscribes.get(
sub.topicFilter
sub.topicFilter,
);

if (unresolvedUnsubscribe) {
Expand All @@ -565,7 +567,7 @@ export abstract class Client {
}

this.subscriptions = this.subscriptions.filter(
(sub) => sub.state !== "removed"
(sub) => sub.state !== "removed",
);

if (subs.length > 0 && this.connectionState === "connected") {
Expand Down Expand Up @@ -605,7 +607,7 @@ export abstract class Client {
break;
default:
throw new Error(
`should not be disconnecting in ${this.connectionState} state`
`should not be disconnecting in ${this.connectionState} state`,
);
}
}
Expand Down Expand Up @@ -651,9 +653,10 @@ export abstract class Client {
username: this.options.username,
password: this.options.password,
clean: this.options.clean !== false,
will: this.options.will,
keepAlive: this.keepAlive,
},
connectEncoder
connectEncoder,
);

this.startConnectTimer();
Expand Down Expand Up @@ -803,7 +806,7 @@ export abstract class Client {
break;
default:
throw new Error(
`should not be receiving connack packets in ${this.connectionState} state`
`should not be receiving connack packets in ${this.connectionState} state`,
);
}

Expand All @@ -822,7 +825,7 @@ export abstract class Client {
} else if (packet.qos === 1) {
if (typeof packet.id !== "number" || packet.id < 1) {
return this.protocolViolation(
"publish packet with qos 1 is missing id"
"publish packet with qos 1 is missing id",
);
}

Expand All @@ -833,17 +836,17 @@ export abstract class Client {
type: "puback",
id: packet.id,
},
pubackEncoder
pubackEncoder,
);
} else if (packet.qos === 2) {
if (typeof packet.id !== "number" || packet.id < 1) {
return this.protocolViolation(
"publish packet with qos 2 is missing id"
"publish packet with qos 2 is missing id",
);
}

const emitMessage =
!packet.dup || !(await this.incomingStore.has(packet.id));
const emitMessage = !packet.dup ||
!(await this.incomingStore.has(packet.id));

if (emitMessage) {
this.incomingStore.store(packet.id);
Expand All @@ -856,7 +859,7 @@ export abstract class Client {
type: "pubrec",
id: packet.id,
},
pubrecEncoder
pubrecEncoder,
);
}
}
Expand Down Expand Up @@ -893,7 +896,7 @@ export abstract class Client {
type: "pubcomp",
id: packet.id,
},
pubcompEncoder
pubcompEncoder,
);
}

Expand All @@ -912,7 +915,7 @@ export abstract class Client {

protected handleSuback(packet: SubackPacket) {
const unacknowledgedSubscribe = this.unacknowledgedSubscribes.get(
packet.id
packet.id,
);

// TODO: verify returnCodes length matches subscriptions.length
Expand All @@ -936,14 +939,14 @@ export abstract class Client {
}
} else {
throw new Error(
`received suback packet with unrecognized id ${packet.id}`
`received suback packet with unrecognized id ${packet.id}`,
);
}
}

protected handleUnsuback(packet: UnsubackPacket) {
const unacknowledgedUnsubscribe = this.unacknowledgedUnsubscribes.get(
packet.id
packet.id,
);

if (unacknowledgedUnsubscribe) {
Expand All @@ -968,7 +971,7 @@ export abstract class Client {
}
} else {
throw new Error(
`received unsuback packet with unrecognized id ${packet.id}`
`received unsuback packet with unrecognized id ${packet.id}`,
);
}
}
Expand All @@ -979,7 +982,7 @@ export abstract class Client {
() => {
this.connectTimedOut();
},
this.options.connectTimeout || defaultConnectTimeout
this.options.connectTimeout || defaultConnectTimeout,
);
}

Expand All @@ -989,7 +992,7 @@ export abstract class Client {
break;
default:
throw new Error(
`connect timer should not be timing out in ${this.connectionState} state`
`connect timer should not be timing out in ${this.connectionState} state`,
);
}

Expand Down Expand Up @@ -1032,6 +1035,7 @@ export abstract class Client {
defaultOptions = defaultReconnectOptions;
}

//@ts-expect-error Cannot be false
if (reconnectOptions === false) {
return;
} else if (reconnectOptions === true) {
Expand Down Expand Up @@ -1073,7 +1077,7 @@ export abstract class Client {
this.reconnectAttempt++;
this.openConnection();
},
delay
delay,
);

return true;
Expand Down Expand Up @@ -1114,7 +1118,7 @@ export abstract class Client {
{
type: "pingreq",
},
pingreqEncoder
pingreqEncoder,
);

// TODO: need a timer here to disconnect if we don't receive the pingres
Expand All @@ -1135,7 +1139,7 @@ export abstract class Client {
protected startTimer(
name: string,
cb: (...args: unknown[]) => void,
delay: number
delay: number,
) {
if (this.timerExists(name)) {
this.log(`timer ${name} already exists`);
Expand Down Expand Up @@ -1208,10 +1212,9 @@ export abstract class Client {
}

private getURL(): URL {
let url: URL | string | void =
typeof this.options.url === "function"
? this.options.url()
: this.options.url;
let url: URL | string | void = typeof this.options.url === "function"
? this.options.url()
: this.options.url;

if (!url) {
url = this.getDefaultURL();
Expand Down Expand Up @@ -1261,7 +1264,7 @@ export abstract class Client {

protected async send<T extends AnyPacket>(
packet: T,
encoder: PacketEncoder<T>
encoder: PacketEncoder<T>,
) {
this.log(`sending ${packet.type} packet`, packet);

Expand Down Expand Up @@ -1293,7 +1296,7 @@ export abstract class Client {
if (listeners) {
this.eventListeners.set(
eventName,
listeners.filter((l) => l !== listener)
listeners.filter((l) => l !== listener),
);
}
}
Expand Down
Loading