Skip to content

Commit

Permalink
fix(bonsai-core): retry channels who fail initial subscription (#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyleroooo authored Jan 8, 2025
1 parent dc4c816 commit 46c2ffe
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 35 deletions.
125 changes: 93 additions & 32 deletions src/abacus-ts/websocket/lib/indexerWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { logAbacusTsError } from '@/abacus-ts/logs';
import typia from 'typia';

import { timeUnits } from '@/constants/time';

import { assertNever } from '@/lib/assertNever';
import { isTruthy } from '@/lib/isTruthy';

import { ReconnectingWebSocket } from './reconnectingWebsocket';

const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______';
const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute;

export class IndexerWebsocket {
private socket: ReconnectingWebSocket | null = null;
Expand All @@ -24,6 +27,8 @@ export class IndexerWebsocket {
};
} = {};

private lastRetryTimeMsByChannel: { [channel: string]: number } = {};

constructor(url: string) {
this.socket = new ReconnectingWebSocket({
url,
Expand Down Expand Up @@ -55,6 +60,25 @@ export class IndexerWebsocket {
handleBaseData: (data: any, fullMessage: any) => void;
handleUpdates: (data: any[], fullMessage: any) => void;
}): () => void {
this._addSub({ channel, id, batched, handleUpdates, handleBaseData });
return () => {
this._performUnsub({ channel, id });
};
}

private _addSub = ({
channel,
id,
batched = true,
handleUpdates,
handleBaseData,
}: {
channel: string;
id: string | undefined;
batched?: boolean;
handleBaseData: (data: any, fullMessage: any) => void;
handleUpdates: (data: any[], fullMessage: any) => void;
}) => {
this.subscriptions[channel] ??= {};
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) {
logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`);
Expand All @@ -77,46 +101,81 @@ export class IndexerWebsocket {
type: 'subscribe',
});
}
};

return () => {
if (this.subscriptions[channel] == null) {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel
);
return;
}
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel,
id
);
private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => {
if (this.subscriptions[channel] == null) {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel
);
return;
}
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel,
id
);
return;
}
if (
this.socket != null &&
this.socket.isActive() &&
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage
) {
this.socket.send({
channel,
id,
type: 'unsubscribe',
});
}
delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID];
};

private _refreshChannelSubs = (channel: string) => {
const allSubs = Object.values(this.subscriptions[channel] ?? {});
allSubs.forEach((sub) => {
this._performUnsub(sub);
this._addSub(sub);
});
};

// if we get a "could not fetch data" error, we retry once as long as this channel is not on cooldown
// TODO: when backend adds the channel and id to the error message, use that to retry only one subscription
// TODO: remove this entirely when backend is more reliable
private _handleErrorReceived = (message: string) => {
if (message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message
.trim()
.split(/[\s,.]/)
.at(-2);
if (maybeChannel != null && maybeChannel.startsWith('v4_')) {
const lastRefresh = this.lastRetryTimeMsByChannel[maybeChannel] ?? 0;
if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) {
this.lastRetryTimeMsByChannel[maybeChannel] = Date.now();
this._refreshChannelSubs(maybeChannel);
logAbacusTsError(
'IndexerWebsocket',
'error fetching data for channel, refetching',
maybeChannel
);
return;
}
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel);
return;
}
if (
this.socket != null &&
this.socket.isActive() &&
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage
) {
this.socket.send({
channel,
id,
type: 'unsubscribe',
});
}
delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID];
};
}
}
logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message);
};

private _handleMessage = (messagePre: any) => {
try {
const message = isWsMessage(messagePre);
if (message.type === 'error') {
// todo we should unsub and resub to the connection if we can
logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message.message);
this._handleErrorReceived(message.message);
} else if (message.type === 'connected' || message.type === 'unsubscribed') {
// do nothing
} else if (
Expand All @@ -128,6 +187,7 @@ export class IndexerWebsocket {
const channel = message.channel;
const id = message.id;
if (this.subscriptions[channel] == null) {
// hide error for channel we expect to see it on
if (channel !== 'v4_orderbook') {
logAbacusTsError(
'IndexerWebsocket',
Expand All @@ -139,6 +199,7 @@ export class IndexerWebsocket {
return;
}
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) {
// hide error for channel we expect to see it on
if (channel !== 'v4_orderbook') {
logAbacusTsError(
'IndexerWebsocket',
Expand Down
20 changes: 17 additions & 3 deletions src/abacus-ts/websocket/lib/reconnectingWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,26 @@ class WebSocketConnection {
}
};

this.ws.onerror = (error) => {
logAbacusTsError('WebSocketConnection', `socket ${this.id} error encountered`, error);
this.ws.onerror = () => {
this.close();
};

this.ws.onclose = () => {
this.ws.onclose = (close) => {
const allowedCodes = new Set([
// normal
1000,
// going away (nav or graceful server shutdown)
1001,
// normal but no code
1005,
]);
if (!allowedCodes.has(close.code)) {
logAbacusTsError('WebSocketConnection', `socket ${this.id} closed abnormally`, {
code: close.code,
reason: close.reason,
clean: close.wasClean,
});
}
this.close();
};
}
Expand Down

0 comments on commit 46c2ffe

Please sign in to comment.