Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition while establishing connection due to multiplexing in ODSP Driver. #23085

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ export class DocumentDeltaConnection
this.removeTrackedListeners();

// Clear the connection/socket before letting the deltaManager/connection manager know about the disconnect.
this.disconnectCore();
this.disconnectCore(err);

// Let user of connection object know about disconnect.
this.emit("disconnect", err);
Expand All @@ -455,7 +455,7 @@ export class DocumentDeltaConnection
* Disconnect from the websocket.
* @param reason - reason for disconnect
*/
protected disconnectCore() {
protected disconnectCore(err: IAnyDriverError) {
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
this.socket.disconnect();
}

Expand Down
26 changes: 23 additions & 3 deletions packages/drivers/odsp-driver/src/odspDocumentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
private flushOpNonce: string | undefined;
private flushDeferred: Deferred<FlushResult> | undefined;
private connectionNotYetDisposedTimeout: ReturnType<typeof setTimeout> | undefined;
// Due to socket reuse(multiplexing), we can get "disconnect" event from other clients in the socket reference.
// So, a race condition could happen, where this client is establishing connection and listening for "connect_document_success"
// on the socket among other events, but we get "disconnect" event on the socket reference from other clients, in which case,
// we dispose connection object and stop listening to further events on the socket. Due to this we get stuck as the connection
// is not yet established and so we don't return any connection object to the client(connection manager). So, we remain stuck.
// In order to handle this, we use this deferred promise to keep track of connection initialization and reject this promise with
// error in the disconnectCore so that the caller can know and handle the error.
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
private connectionInitializeDeferredP: Deferred<void> | undefined;

/**
* Error raising for socket.io issues
Expand Down Expand Up @@ -634,15 +642,21 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
}
});

await super.initialize(connectMessage, timeout).finally(() => {
this.connectionInitializeDeferredP = new Deferred<void>();

super
.initialize(connectMessage, timeout)
.then(() => this.connectionInitializeDeferredP?.resolve())
.catch((error) => this.connectionInitializeDeferredP?.reject(error));

await this.connectionInitializeDeferredP.promise.finally(() => {
this.logger.sendTelemetryEvent({
eventName: "ConnectionAttemptInfo",
...this.getConnectionDetailsProps(),
});
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected addTrackedListener(event: string, listener: (...args: any[]) => void): void {
// override some event listeners in order to support multiple documents/clients over the same websocket
switch (event) {
Expand Down Expand Up @@ -809,7 +823,7 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
/**
* Disconnect from the websocket
*/
protected disconnectCore(): void {
protected disconnectCore(err: IAnyDriverError): void {
const socket = this.socketReference;
assert(socket !== undefined, 0x0a2 /* "reentrancy not supported!" */);
this.socketReference = undefined;
Expand All @@ -821,5 +835,11 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
}

socket.removeSocketIoReference();
if (
this.connectionInitializeDeferredP !== undefined &&
!this.connectionInitializeDeferredP.isCompleted
) {
this.connectionInitializeDeferredP.reject(err);
}
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
}
}
15 changes: 14 additions & 1 deletion packages/drivers/odsp-driver/src/test/socketTests/socketMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ export interface IMockSocketConnectResponse {
}
| {
eventToEmit: "connect_timeout";
}
| {
eventToEmit: undefined;
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
};
}

Expand All @@ -52,7 +55,7 @@ export interface IMockSocketConnectResponse {
export class ClientSocketMock extends TypedEventEmitter<SocketMockEvents> {
public disconnected = false;
constructor(
private readonly mockSocketConnectResponse: IMockSocketConnectResponse = {
private mockSocketConnectResponse: IMockSocketConnectResponse = {
connect_document: { eventToEmit: "connect_document_success" },
},
) {
Expand Down Expand Up @@ -104,6 +107,16 @@ export class ClientSocketMock extends TypedEventEmitter<SocketMockEvents> {
this.emit("server_disconnect", socketError, clientId);
}

/**
* Use this to set connect response when the socket is reused.
* @param connectResponse - response to be send on connect event.
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
*/
public setMockSocketConnectResponseForReuse(
connectResponse: IMockSocketConnectResponse,
): void {
this.mockSocketConnectResponse = connectResponse;
}

public emit(eventName: string, ...args: unknown[]): boolean {
switch (eventName) {
case "connect_document": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,61 @@ describe("OdspDocumentDeltaConnection tests", () => {
"1 get_ops_response listener should exiist",
);
});

it("Multiple connection objects should handle server_disconnect event when the second client is still waiting for connection to complete", async () => {
socket = new ClientSocketMock();
const connection1 = await mockSocket(socket as unknown as Socket, async () =>
OdspDocumentDeltaConnection.create(
tenantId,
documentId,
token,
client,
webSocketUrl,
logger,
60000,
epochTracker,
socketReferenceKeyPrefix,
),
);

socket.setMockSocketConnectResponseForReuse({
connect_document: { eventToEmit: undefined },
});
let connection2Fails = false;
let errorReceived: IAnyDriverError | undefined;
const connection2 = mockSocket(socket as unknown as Socket, async () =>
OdspDocumentDeltaConnection.create(
tenantId,
documentId,
token,
client,
webSocketUrl,
logger,
60000,
epochTracker,
socketReferenceKeyPrefix,
),
).catch((error: IAnyDriverError) => {
connection2Fails = true;
errorReceived = error;
});
let disconnectedEvent1 = false;

const errorToThrow = { message: "OdspSocketError", code: 400 };
connection1.on("disconnect", (reason: IAnyDriverError) => {
disconnectedEvent1 = true;
});

socket.sendServerDisconnectEvent(errorToThrow);

assert(disconnectedEvent1, "disconnect event should happen on first object");

assert(socket !== undefined && !socket.connected, "socket should be disconnected");
checkListenerCount(socket);
await connection2;
assert(connection2Fails, "connection2 should fail");
assert(errorReceived?.message.includes("server_disconnect"), "message should be correct");
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
assert((errorReceived as any).statusCode === 400, "status code should be correct");
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
});
});
Loading