Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(presence): Add support for signal batching #23075

Draft
wants to merge 61 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f7b7b1f
wip
tylerbutler Oct 23, 2024
c334545
wip
tylerbutler Oct 28, 2024
dbf7f39
wip
tylerbutler Oct 30, 2024
54a6226
Merge branch 'main' into presence-notification-events
tylerbutler Oct 31, 2024
93b12e9
compiles and test passes
tylerbutler Oct 31, 2024
e882b2a
process incoming signal
tylerbutler Oct 31, 2024
b44725a
sort of works
tylerbutler Oct 31, 2024
0f6b6f3
cleanup
tylerbutler Nov 1, 2024
f7e7d30
updates
tylerbutler Nov 1, 2024
a0b348d
Merge branch 'main' into presence-notification-events
tylerbutler Nov 5, 2024
b3dccb0
Merge branch 'main' into presence-notification-events
tylerbutler Nov 11, 2024
321ae56
Apply suggestions from code review
tylerbutler Nov 11, 2024
f396549
Apply suggestions from code review
tylerbutler Nov 11, 2024
f819b42
revert changes in example
tylerbutler Nov 11, 2024
47b8f87
cleanup
tylerbutler Nov 11, 2024
5e9022c
simplify and correct tests
tylerbutler Nov 12, 2024
af8f94d
Merge branch 'main' into presence-notification-events
tylerbutler Nov 12, 2024
01df10c
feedback
tylerbutler Nov 12, 2024
b378f62
build
tylerbutler Nov 12, 2024
850afed
Merge branch 'main' into presence-notification-events
tylerbutler Nov 12, 2024
15d2457
refactor(presence): Convert localUpdate to accept an options object
tylerbutler Nov 12, 2024
b0b0bd4
Merge branch 'main' into presence-batching
tylerbutler Nov 12, 2024
538992e
test(client-presence): rework Notifications tests
jason-ha Nov 13, 2024
eb731f6
test cases for signals in latestValueManager
tylerbutler Nov 13, 2024
24d43da
Merge pull request #30 from microsoft/presence/notification-events-tests
tylerbutler Nov 13, 2024
e78e7ee
initial implemtnation
tylerbutler Nov 13, 2024
7c7d9bd
Merge branch 'main' into presence-notification-events
tylerbutler Nov 13, 2024
f10c6fc
Merge branch 'main' into presence-notification-events
tylerbutler Nov 13, 2024
2575489
fix initial subscriptions not being triggered
tylerbutler Nov 13, 2024
9ff6a36
Merge branch 'presence-notification-events' into presence-batching
tylerbutler Nov 13, 2024
b4ad324
Remove outdated comments
tylerbutler Nov 13, 2024
e27b44f
Remove outdated comments
tylerbutler Nov 13, 2024
9be169e
Merge branch 'main' into presence-notification-events
tylerbutler Nov 13, 2024
39aeb75
wip
tylerbutler Nov 13, 2024
eac6314
restore datastore manager
tylerbutler Nov 13, 2024
57c077c
Revert "restore datastore manager"
tylerbutler Nov 13, 2024
b935bf6
wip
tylerbutler Nov 13, 2024
3204642
vitest
tylerbutler Nov 13, 2024
d445834
snapshot tests
tylerbutler Nov 13, 2024
e92b727
updates
tylerbutler Nov 14, 2024
58eaf28
Merge branch 'main' into presence-batching
tylerbutler Nov 14, 2024
68c91f5
test updates
tylerbutler Nov 14, 2024
138fe43
Update presenceDatastoreManager.ts
tylerbutler Nov 14, 2024
018f3eb
Update presenceDatastoreManager.ts
tylerbutler Nov 14, 2024
d76623e
Update presenceDatastoreManager.ts
tylerbutler Nov 14, 2024
5f58a86
Update presenceDatastoreManager.ts
tylerbutler Nov 14, 2024
7ebe231
Update presenceDatastoreManager.ts
tylerbutler Nov 14, 2024
5a0e88b
clean up typing
tylerbutler Nov 14, 2024
81a3679
Merge branch 'main' into presence-notification-events
tylerbutler Nov 14, 2024
2e06968
Merge branch 'presence-notification-events' into presence-batching
tylerbutler Nov 14, 2024
5627b10
Merge branch 'presence-notification-events' into presence-batching
tylerbutler Nov 14, 2024
d3a010e
update tests
tylerbutler Nov 14, 2024
902671c
disable object sorting in snapshots
tylerbutler Nov 14, 2024
9257e8c
Merge branch 'main' into presence-batching
tylerbutler Nov 14, 2024
944fc73
use ts-deepmerge
tylerbutler Nov 14, 2024
f22a9ae
stringify snapshot since order controls don't seem to work
tylerbutler Nov 14, 2024
77c5407
clarify that snapshot sort config doesn't work
tylerbutler Nov 14, 2024
8bd0558
Merge branch 'main' into presence-batching
tylerbutler Nov 14, 2024
2dde9ea
add code coverage support
tylerbutler Nov 14, 2024
50bcb3e
ignore test setup signals in snapshots
tylerbutler Nov 14, 2024
7114a8d
placeholder test
tylerbutler Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions packages/framework/presence/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@
"format:prettier": "prettier --write . --cache --ignore-path ../../../.prettierignore",
"lint": "fluid-build . --task lint",
"lint:fix": "fluid-build . --task eslint:fix --task format",
"test": "npm run test:mocha",
"test:coverage": "c8 npm test",
"test": "npm run test:mocha && npm run test:vitest",
"test:coverage": "c8 npm run test:mocha && npm run test:vitest:coverage",
"test:mocha": "npm run test:mocha:esm && npm run test:mocha:cjs",
"test:mocha:cjs": "mocha --recursive \"dist/test/**/*.spec.*js\" --exit",
"test:mocha:esm": "mocha --recursive \"lib/test/**/*.spec.*js\" --exit",
"test:mocha:verbose": "cross-env FLUID_TEST_VERBOSE=1 npm run test:mocha",
"test:vitest": "vitest run",
"test:vitest:coverage": "npm run test:vitest -- --coverage",
"tsc": "fluid-tsc commonjs --project ./tsconfig.cjs.json && copyfiles -f ./src/cjs/package.json ./dist"
},
"c8": {
Expand Down Expand Up @@ -129,7 +131,8 @@
"@fluidframework/runtime-definitions": "workspace:~",
"@fluidframework/runtime-utils": "workspace:~",
"@fluidframework/shared-object-base": "workspace:~",
"@fluidframework/telemetry-utils": "workspace:~"
"@fluidframework/telemetry-utils": "workspace:~",
"ts-deepmerge": "^7.0.1"
},
"devDependencies": {
"@arethetypeswrong/cli": "^0.16.4",
Expand All @@ -145,6 +148,7 @@
"@types/mocha": "^9.1.1",
"@types/node": "^18.19.0",
"@types/sinon": "^17.0.3",
"@vitest/coverage-v8": "2.1.4",
"c8": "^8.0.1",
"concurrently": "^8.2.1",
"copyfiles": "^2.4.1",
Expand All @@ -155,7 +159,8 @@
"prettier": "~3.0.3",
"rimraf": "^4.4.0",
"sinon": "^18.0.1",
"typescript": "~5.4.5"
"typescript": "~5.4.5",
"vitest": "^2.1.4"
},
"fluidBuild": {
"tasks": {
Expand Down
9 changes: 5 additions & 4 deletions packages/framework/presence/src/latestMapValueManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
LatestValueMetadata,
} from "./latestValueTypes.js";
import type { ClientSessionId, ISessionClient, SpecificSessionClient } from "./presence.js";
import type { LocalUpdateOptions } from "./presenceStates.js";
import { datastoreFromHandle, type StateDatastore } from "./stateDatastore.js";
import { brandIVM } from "./valueManager.js";

Expand Down Expand Up @@ -191,7 +192,7 @@ class ValueMapImpl<T, K extends string | number> implements ValueMap<K, T> {
private readonly value: InternalTypes.MapValueState<T>,
private readonly localUpdate: (
updates: InternalTypes.MapValueState<T>,
forceUpdate: boolean,
options: LocalUpdateOptions,
) => void,
) {
// All initial items are expected to be defined.
Expand All @@ -210,7 +211,7 @@ class ValueMapImpl<T, K extends string | number> implements ValueMap<K, T> {
item.value = value;
}
const update = { rev: this.value.rev, items: { [key]: item } };
this.localUpdate(update, /* forceUpdate */ false);
this.localUpdate(update, { forceBroadcast: false });
}

public clear(): void {
Expand Down Expand Up @@ -332,8 +333,8 @@ class LatestMapValueManagerImpl<

this.local = new ValueMapImpl<T, Keys>(
value,
(updates: InternalTypes.MapValueState<T>, forceUpdate: boolean) => {
datastore.localUpdate(key, updates, forceUpdate);
(updates: InternalTypes.MapValueState<T>, options: LocalUpdateOptions) => {
datastore.localUpdate(key, updates, options);
},
);
}
Expand Down
5 changes: 4 additions & 1 deletion packages/framework/presence/src/latestValueManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ class LatestValueManagerImpl<T, Key extends string>
this.value.rev += 1;
this.value.timestamp = Date.now();
this.value.value = value;
this.datastore.localUpdate(this.key, this.value, /* forceUpdate */ false);
this.datastore.localUpdate(this.key, this.value, {
forceBroadcast: false,
allowableUpdateLatency: this.controls.allowableUpdateLatency,
});
}

public *clientValues(): IterableIterator<LatestValueClientData<T>> {
Expand Down
2 changes: 1 addition & 1 deletion packages/framework/presence/src/notificationsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class NotificationsManagerImpl<
this.key,
// @ts-expect-error TODO
{ rev: 0, timestamp: 0, value: { name, args: [...args] }, ignoreUnmonitored: true },
true,
{ forceBroadcast: true },
);
},
unicast: (name, targetClient, ...args) => {
Expand Down
125 changes: 118 additions & 7 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
import { assert } from "@fluidframework/core-utils/internal";
import type { IInboundSignalMessage } from "@fluidframework/runtime-definitions/internal";
import type { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils/internal";
// TODO: This lib weighs 585 bytes minified and gzipped; 1.1 kb minified only
// https://bundlephobia.com/package/[email protected]
// If this is too heavy it should be possible to write a bespoke merger
import { merge } from "ts-deepmerge";

import type { ClientConnectionId } from "./baseTypes.js";
import type { IEphemeralRuntime } from "./internalTypes.js";
import type { ClientSessionId, ISessionClient } from "./presence.js";
import type {
ClientUpdateEntry,
LocalUpdateOptions,
PresenceStatesInternal,
ValueElementMap,
} from "./presenceStates.js";
Expand Down Expand Up @@ -101,6 +106,8 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {

private readonly workspaces = new Map<string, PresenceStatesEntry<PresenceStatesSchema>>();

private readonly timer = new TimerManager();

public constructor(
private readonly clientSessionId: ClientSessionId,
private readonly runtime: IEphemeralRuntime,
Expand Down Expand Up @@ -148,7 +155,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {

const localUpdate = (
states: { [key: string]: ClientUpdateEntry },
forceBroadcast: boolean,
options: LocalUpdateOptions,
): void => {
// Check for connectivity before sending updates.
if (!this.runtime.connected) {
Expand Down Expand Up @@ -177,7 +184,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
},
[internalWorkspaceAddress]: updates,
},
forceBroadcast,
options,
);
};

Expand All @@ -195,14 +202,63 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
return entry.public;
}

private localUpdate(data: DatastoreMessageContent, _forceBroadcast: boolean): void {
const content = {
sendTimestamp: Date.now(),
private queuedMessage: DatastoreUpdateMessage["content"] | undefined;

private sendMessageDeadline: number = 0;

private localUpdate(data: DatastoreMessageContent, options: LocalUpdateOptions): void {
const allowableUpdateLatency = options.allowableUpdateLatency ?? 0;

const now = Date.now();
const updateDeadline = now + allowableUpdateLatency;
if (this.queuedMessage === undefined) {
// no queued message, so set the deadline based on the current message's allowable latency
this.sendMessageDeadline = updateDeadline;
}

const queuedMessageData = this.queuedMessage?.data;
const currentMessageData: DatastoreMessageContent = data;

// Merge the queued data with the next update.
const newData =
queuedMessageData === undefined
? currentMessageData
: merge(queuedMessageData, currentMessageData);

const newContent = {
sendTimestamp: now,
avgLatency: this.averageLatency,
// isComplete: false,
data,
// @ts-expect-error TODO
data: newData,
} satisfies DatastoreUpdateMessage["content"];
this.runtime.submitSignal(datastoreUpdateMessageType, content);

if (updateDeadline >= this.sendMessageDeadline) {
// Queue the update
// @ts-expect-error TODO
this.queuedMessage = newContent;
// if the timer has not expired, we can short-circuit because the timer will fire
// and cover this update. in other words, queuing this will be fast enough to
// meet its deadline, because a timer is already scheduled to fire before its deadline.
if (!this.timer.hasExpired()) {
return;
}
}

const timeoutInMs = updateDeadline - now;
if (timeoutInMs > 0) {
this.timer.setTimeout(this.sendQueuedMessage.bind(this), timeoutInMs);
} else {
this.sendQueuedMessage();
}
}

private sendQueuedMessage(): void {
if (this.queuedMessage === undefined) {
return;
}
this.runtime.submitSignal(datastoreUpdateMessageType, this.queuedMessage);
this.queuedMessage = undefined;
}

private broadcastAllKnownState(): void {
Expand Down Expand Up @@ -366,3 +422,58 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
}
}
}

/**
* Wrapper around setTimeout to track whether the timeout has expired or not.
*/
class TimerManager {
private timeoutId: number | undefined;
public startTime: number = 0;
public delay: number = 0;
private expired: boolean = true;

public setTimeout(callback: () => void, delay: number): void {
this.clearTimeout(); // Clear any existing timeout
this.startTime = Date.now();
this.delay = delay;
this.expired = false;
this.timeoutId = setTimeout(() => {
this.expired = true;
callback();
}, delay);
}

public clearTimeout(): void {
if (this.timeoutId !== undefined) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
}

public hasExpired(): boolean {
return this.expired;
}
}

// function deepMerge<T extends object>(target: T, source: T): T {
// // eslint-disable-next-line no-restricted-syntax
// for (const key in source) {
// if (Object.prototype.hasOwnProperty.call(source, key)) {
// if (isObject(source[key])) {
// if (target[key] === undefined) {
// // @ts-expect-error TODO
// target[key] = {};
// }
// deepMerge(target[key] as T, source[key] as T);
// } else {
// target[key] = source[key];
// }
// }
// }
// return target;
// }

// function isObject(item: any): item is object {
// // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/strict-boolean-expressions
// return item && typeof item === "object" && !Array.isArray(item);
// }
18 changes: 13 additions & 5 deletions packages/framework/presence/src/presenceStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@ export type MapSchemaElement<
Keys extends keyof TSchema = keyof TSchema,
> = ReturnType<TSchema[Keys]>[Part];

/**
* @internal
*/
export interface LocalUpdateOptions {
forceBroadcast: boolean;
allowableUpdateLatency?: number;
}

/**
* @internal
*/
export interface PresenceRuntime {
readonly clientSessionId: ClientSessionId;
lookupClient(clientId: ClientConnectionId): ISessionClient;
localUpdate(states: { [key: string]: ClientUpdateEntry }, forceBroadcast: boolean): void;
localUpdate(states: { [key: string]: ClientUpdateEntry }, options: LocalUpdateOptions): void;
}

type PresenceSubSchemaFromWorkspaceSchema<
Expand Down Expand Up @@ -243,7 +251,7 @@ class PresenceStatesImpl<TSchema extends PresenceStatesSchema>
this.props = this.nodes as unknown as PresenceStates<TSchema>["props"];

if (anyInitialValues) {
this.runtime.localUpdate(initial.newValues, false);
this.runtime.localUpdate(initial.newValues, { forceBroadcast: false });
}
}
}
Expand All @@ -263,9 +271,9 @@ class PresenceStatesImpl<TSchema extends PresenceStatesSchema>
public localUpdate<Key extends keyof TSchema & string>(
key: Key,
value: MapSchemaElement<TSchema, "value", Key> & ClientUpdateEntry,
forceBroadcast: boolean,
options: LocalUpdateOptions,
): void {
this.runtime.localUpdate({ [key]: value }, forceBroadcast);
this.runtime.localUpdate({ [key]: value }, options);
}

public update<Key extends keyof TSchema & string>(
Expand Down Expand Up @@ -302,7 +310,7 @@ class PresenceStatesImpl<TSchema extends PresenceStatesSchema>
this.datastore[key] = {};
}
this.datastore[key][this.runtime.clientSessionId] = nodeData.value;
this.runtime.localUpdate({ [key]: nodeData.value }, false);
this.runtime.localUpdate({ [key]: nodeData.value }, { forceBroadcast: false });
}
}

Expand Down
3 changes: 2 additions & 1 deletion packages/framework/presence/src/stateDatastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { ClientConnectionId } from "./baseTypes.js";
import type { InternalTypes } from "./exposedInternalTypes.js";
import type { ClientRecord } from "./internalTypes.js";
import type { ClientSessionId, ISessionClient } from "./presence.js";
import type { LocalUpdateOptions } from "./presenceStates.js";

// type StateDatastoreSchemaNode<
// TValue extends InternalTypes.ValueDirectoryOrState<any> = InternalTypes.ValueDirectoryOrState<unknown>,
Expand Down Expand Up @@ -35,7 +36,7 @@ export interface StateDatastore<
value: TValue & {
ignoreUnmonitored?: true;
},
forceBroadcast: boolean,
options: LocalUpdateOptions,
): void;
update(key: TKey, clientSessionId: ClientSessionId, value: TValue): void;
knownValues(key: TKey): {
Expand Down
Loading
Loading