Skip to content

Commit

Permalink
[FIX] fixed an issue with ordered consumer where resetting the consum…
Browse files Browse the repository at this point in the history
…er didn't send the UNSUB, creating a situation where if there was no disconnect from the client, two subscriptions would be active on the server.
  • Loading branch information
aricart committed May 2, 2024
1 parent 7fe57c3 commit f7c41a8
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
2 changes: 2 additions & 0 deletions jetstream/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
this.js._request(subj, req, { retries: -1 })
.then((v) => {
const ci = v as ConsumerInfo;
const jinfo = this.sub.info as JetStreamSubscriptionInfo;
jinfo.last = ci;
this.info!.config = ci.config;
this.info!.name = ci.name;
})
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
if (!s || this.isClosed()) {
return;
}
this.unsub(s);
s.subject = subject;
this.subscriptions.resub(s);
// we don't auto-unsub here because we don't
Expand Down
54 changes: 54 additions & 0 deletions tests/helpers/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,45 @@ export interface JSZ {
};
}

export interface SubDetails {
subject: string;
sid: string;
msgs: number;
cid: number;
}

export interface Conn {
cid: number;
kind: string;
type: string;
ip: string;
port: number;
start: string;
"last_activity": string;
"rtt": string;
uptime: string;
idle: string;
"pending_bytes": number;
"in_msgs": number;
"out_msgs": number;
subscriptions: number;
name: string;
lang: string;
version: string;
subscriptions_list?: string[];
subscriptions_list_detail?: SubDetails[];
}

export interface ConnZ {
"server_id": string;
now: string;
"num_connections": number;
"total": number;
"offset": number;
"limit": number;
"connections": Conn[];
}

function parseHostport(
s?: string,
): { hostname: string; port: number } | undefined {
Expand Down Expand Up @@ -316,6 +355,21 @@ export class NatsServer implements PortInfo {
return await resp.json();
}

async connz(cid?: number, subs: boolean | "detail" = true): Promise<ConnZ> {
if (!this.monitoring) {
return Promise.reject(new Error("server is not monitoring"));
}
const args = [];
args.push(`subs=${subs}`);
if (cid) {
args.push(`cid=${cid}`);
}

const qs = args.length ? args.join("&") : "";
const resp = await fetch(`http://127.0.0.1:${this.monitoring}/connz?${qs}`);
return await resp.json();
}

async dataDir(): Promise<string | null> {
const jsz = await this.jsz();
return jsz.config.store_dir;
Expand Down
75 changes: 73 additions & 2 deletions tests/resub_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@

import { cleanup, setup } from "./helpers/mod.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import { assertEquals } from "https://deno.land/[email protected]/assert/mod.ts";
import { createInbox, Msg } from "../nats-base-client/core.ts";
import {
assertEquals,
assertExists,
fail,
} from "https://deno.land/[email protected]/assert/mod.ts";
import { createInbox, Msg, NatsConnection } from "../nats-base-client/core.ts";
import { NatsServer } from "./helpers/launcher.ts";

Deno.test("resub - iter", async () => {
const { ns, nc } = await setup();
Expand Down Expand Up @@ -75,3 +80,69 @@ Deno.test("resub - callback", async () => {
assertEquals(buf[1].subject, subjb);
await cleanup(ns, nc);
});

async function assertEqualSubs(
ns: NatsServer,
nc: NatsConnection,
): Promise<void> {
const nci = nc as NatsConnectionImpl;
const cid = nc.info?.client_id || -1;
if (cid === -1) {
fail("client_id not found");
}

const connz = await ns.connz(cid, "detail");

const conn = connz.connections.find((c) => {
return c.cid === cid;
});
assertExists(conn);
assertExists(conn.subscriptions_list_detail);

const subs = nci.protocol.subscriptions.all();
subs.forEach((sub) => {
const ssub = conn.subscriptions_list_detail?.find((d) => {
return d.sid === `${sub.sid}`;
});
assertExists(ssub);
assertEquals(ssub.subject, sub.subject);
});
}

Deno.test("resub - removes server interest", async () => {
const { ns, nc } = await setup();

nc.subscribe("a", {
callback() {
// nothing
},
});

const nci = nc as NatsConnectionImpl;
let sub = nci.protocol.subscriptions.all().find((s) => {
return s.subject === "a";
});
assertExists(sub);

// assert the server sees the same subscriptions
await assertEqualSubs(ns, nc);

// change it
nci._resub(sub, "b");

// make sure we don't find a
sub = nci.protocol.subscriptions.all().find((s) => {
return s.subject === "a";
});
assertEquals(sub, undefined);

// make sure we find b
sub = nci.protocol.subscriptions.all().find((s) => {
return s.subject === "b";
});
assertExists(sub);

// assert server thinks the same thing
await assertEqualSubs(ns, nc);
await cleanup(ns, nc);
});

0 comments on commit f7c41a8

Please sign in to comment.