diff --git a/jetstream/jsclient.ts b/jetstream/jsclient.ts index fea4be6a..58b4487c 100644 --- a/jetstream/jsclient.ts +++ b/jetstream/jsclient.ts @@ -795,6 +795,8 @@ export class JetStreamSubscriptionImpl extends TypedSubscription this.js._request(subj, req, { retries: -1 }) .then((v) => { const ci = v as ConsumerInfo; + const jinfo = this.sub.info as JetStreamSubscriptionInfo; + jinfo.last = ci; this.info!.config = ci.config; this.info!.name = ci.name; }) diff --git a/nats-base-client/protocol.ts b/nats-base-client/protocol.ts index e0e49ea9..58bd4761 100644 --- a/nats-base-client/protocol.ts +++ b/nats-base-client/protocol.ts @@ -995,6 +995,7 @@ export class ProtocolHandler implements Dispatcher { if (!s || this.isClosed()) { return; } + this.unsub(s); s.subject = subject; this.subscriptions.resub(s); // we don't auto-unsub here because we don't diff --git a/tests/helpers/launcher.ts b/tests/helpers/launcher.ts index 63de5a64..8e0c646f 100644 --- a/tests/helpers/launcher.ts +++ b/tests/helpers/launcher.ts @@ -72,6 +72,45 @@ export interface JSZ { }; } +export interface SubDetails { + subject: string; + sid: string; + msgs: number; + cid: number; +} + +export interface Conn { + cid: number; + kind: string; + type: string; + ip: string; + port: number; + start: string; + "last_activity": string; + "rtt": string; + uptime: string; + idle: string; + "pending_bytes": number; + "in_msgs": number; + "out_msgs": number; + subscriptions: number; + name: string; + lang: string; + version: string; + subscriptions_list?: string[]; + subscriptions_list_detail?: SubDetails[]; +} + +export interface ConnZ { + "server_id": string; + now: string; + "num_connections": number; + "total": number; + "offset": number; + "limit": number; + "connections": Conn[]; +} + function parseHostport( s?: string, ): { hostname: string; port: number } | undefined { @@ -316,6 +355,21 @@ export class NatsServer implements PortInfo { return await resp.json(); } + async connz(cid?: number, subs: boolean | "detail" = true): Promise { + if (!this.monitoring) { + return Promise.reject(new Error("server is not monitoring")); + } + const args = []; + args.push(`subs=${subs}`); + if (cid) { + args.push(`cid=${cid}`); + } + + const qs = args.length ? args.join("&") : ""; + const resp = await fetch(`http://127.0.0.1:${this.monitoring}/connz?${qs}`); + return await resp.json(); + } + async dataDir(): Promise { const jsz = await this.jsz(); return jsz.config.store_dir; diff --git a/tests/resub_test.ts b/tests/resub_test.ts index 98153d69..6991d046 100644 --- a/tests/resub_test.ts +++ b/tests/resub_test.ts @@ -15,8 +15,13 @@ import { cleanup, setup } from "./helpers/mod.ts"; import { NatsConnectionImpl } from "../nats-base-client/nats.ts"; -import { assertEquals } from "https://deno.land/std@0.221.0/assert/mod.ts"; -import { createInbox, Msg } from "../nats-base-client/core.ts"; +import { + assertEquals, + assertExists, + fail, +} from "https://deno.land/std@0.221.0/assert/mod.ts"; +import { createInbox, Msg, NatsConnection } from "../nats-base-client/core.ts"; +import { NatsServer } from "./helpers/launcher.ts"; Deno.test("resub - iter", async () => { const { ns, nc } = await setup(); @@ -75,3 +80,69 @@ Deno.test("resub - callback", async () => { assertEquals(buf[1].subject, subjb); await cleanup(ns, nc); }); + +async function assertEqualSubs( + ns: NatsServer, + nc: NatsConnection, +): Promise { + const nci = nc as NatsConnectionImpl; + const cid = nc.info?.client_id || -1; + if (cid === -1) { + fail("client_id not found"); + } + + const connz = await ns.connz(cid, "detail"); + + const conn = connz.connections.find((c) => { + return c.cid === cid; + }); + assertExists(conn); + assertExists(conn.subscriptions_list_detail); + + const subs = nci.protocol.subscriptions.all(); + subs.forEach((sub) => { + const ssub = conn.subscriptions_list_detail?.find((d) => { + return d.sid === `${sub.sid}`; + }); + assertExists(ssub); + assertEquals(ssub.subject, sub.subject); + }); +} + +Deno.test("resub - removes server interest", async () => { + const { ns, nc } = await setup(); + + nc.subscribe("a", { + callback() { + // nothing + }, + }); + + const nci = nc as NatsConnectionImpl; + let sub = nci.protocol.subscriptions.all().find((s) => { + return s.subject === "a"; + }); + assertExists(sub); + + // assert the server sees the same subscriptions + await assertEqualSubs(ns, nc); + + // change it + nci._resub(sub, "b"); + + // make sure we don't find a + sub = nci.protocol.subscriptions.all().find((s) => { + return s.subject === "a"; + }); + assertEquals(sub, undefined); + + // make sure we find b + sub = nci.protocol.subscriptions.all().find((s) => { + return s.subject === "b"; + }); + assertExists(sub); + + // assert server thinks the same thing + await assertEqualSubs(ns, nc); + await cleanup(ns, nc); +});