Skip to content

Commit

Permalink
[lib] Create a queue and use it to process queued operations
Browse files Browse the repository at this point in the history
Summary:
It is possible that we need an updated state when processing the next operation, so we need to make sure that the new state is propagated.

https://linear.app/comm/issue/ENG-9470/mitigate-risks-of-effects-running-on-outdated-data

Test Plan:
Tested that this diff doesn't introduce a regression:
1. Receiving an edit entry operation before create entry
2. Receiving a change thread settings operation before create thread
3. Receiving a change thread subscription before adding as a member
Each of these produces the correct state and clears the Redux queue.

Also tested that we have a bug that this diff fixes. Created three operations that were processed in the following order:
1. Change thread name at timestamp T+1
2. Change thread description at T+1
3. Create thread at T

Before this diff, there were robotexts displayed for each of these, but the thread name wasn't changed. After this diff both thread name and description are updated.

Reviewers: kamil, ashoat

Reviewed By: ashoat

Differential Revision: https://phab.comm.dev/D13572
  • Loading branch information
palys-swm committed Oct 2, 2024
1 parent 85789c8 commit 642f52d
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 43 deletions.
44 changes: 44 additions & 0 deletions lib/hooks/actions-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// @flow

import * as React from 'react';

type MessageQueueHook<T> = {
+enqueue: (items: $ReadOnlyArray<T>) => void,
};

function useActionsQueue<T>(
performAction: (item: T) => mixed | Promise<mixed>,
): MessageQueueHook<T> {
const [queue, setQueue] = React.useState<$ReadOnlyArray<T>>([]);
const isProcessing = React.useRef(false);

const process = React.useCallback(
async (action: T) => {
isProcessing.current = true;
try {
await performAction(action);
} finally {
isProcessing.current = false;
setQueue(currentQueue => currentQueue.slice(1));
}
},
[performAction],
);

const enqueue = React.useCallback(
(items: $ReadOnlyArray<T>) =>
setQueue(prevQueue => [...prevQueue, ...items]),
[],
);

React.useEffect(() => {
if (isProcessing.current || queue.length === 0) {
return;
}
void process(queue[0]);
}, [process, queue]);

return { enqueue };
}

export { useActionsQueue };
141 changes: 98 additions & 43 deletions lib/shared/dm-ops/dm-ops-queue-handler.react.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as React from 'react';

import { dmOperationSpecificationTypes } from './dm-op-utils.js';
import { useProcessDMOperation } from './process-dm-ops.js';
import { useActionsQueue } from '../../hooks/actions-queue.js';
import { messageInfoSelector } from '../../selectors/chat-selectors.js';
import {
entryInfoSelector,
Expand All @@ -16,13 +17,28 @@ import {
clearQueuedThreadDMOpsActionType,
pruneDMOpsQueueActionType,
} from '../../types/dm-ops.js';
import type { OperationsQueue } from '../../types/dm-ops.js';
import type { DMOperation } from '../../types/dm-ops.js';
import type { BaseAction } from '../../types/redux-types.js';
import { useDispatch, useSelector } from '../../utils/redux-utils.js';

const PRUNING_FREQUENCY = 60 * 60 * 1000;
const FIRST_PRUNING_DELAY = 10 * 60 * 1000;
const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000;

type QueueItem =
| {
+type: 'operation',
+operation: DMOperation,
}
| {
+type: 'action',
+action: BaseAction,
}
| {
+type: 'function',
+itemFunction: () => mixed,
};

function DMOpsQueueHandler(): React.Node {
const dispatch = useDispatch();

Expand Down Expand Up @@ -54,24 +70,31 @@ function DMOpsQueueHandler(): React.Node {
);

const processDMOperation = useProcessDMOperation();
const processOperationsQueue = React.useCallback(
(queue: OperationsQueue) => {
for (const dmOp of queue) {
void processDMOperation({

const processItem = React.useCallback(
async (item: QueueItem) => {
if (item.type === 'operation') {
await processDMOperation({
// This is `INBOUND` because we assume that when generating
// `dmOperationSpecificationTypes.OUBOUND` it should be possible
// to be processed and never queued up.
type: dmOperationSpecificationTypes.INBOUND,
op: dmOp.operation,
op: item.operation,
// There is no metadata, because messages were confirmed when
// adding to the queue.
metadata: null,
});
} else if (item.type === 'action') {
dispatch(item.action);
} else {
item.itemFunction();
}
},
[processDMOperation],
[dispatch, processDMOperation],
);

const { enqueue } = useActionsQueue(processItem);

React.useEffect(() => {
const prevThreadInfos = prevThreadInfosRef.current;
prevThreadInfosRef.current = threadInfos;
Expand All @@ -80,15 +103,23 @@ function DMOpsQueueHandler(): React.Node {
if (!threadInfos[threadID] || prevThreadInfos[threadID]) {
continue;
}
processOperationsQueue(queuedThreadOperations[threadID]);
dispatch({
type: clearQueuedThreadDMOpsActionType,
payload: {
threadID,
enqueue([
...queuedThreadOperations[threadID].map(item => ({
type: 'operation',
operation: item.operation,
})),
{
type: 'action',
action: {
type: clearQueuedThreadDMOpsActionType,
payload: {
threadID,
},
},
},
});
]);
}
}, [dispatch, processOperationsQueue, queuedThreadOperations, threadInfos]);
}, [dispatch, enqueue, queuedThreadOperations, threadInfos]);

const messageInfos = useSelector(messageInfoSelector);
const prevMessageInfosRef = React.useRef({});
Expand All @@ -105,15 +136,23 @@ function DMOpsQueueHandler(): React.Node {
if (!messageInfos[messageID] || prevMessageInfos[messageID]) {
continue;
}
processOperationsQueue(queuedMessageOperations[messageID]);
dispatch({
type: clearQueuedMessageDMOpsActionType,
payload: {
messageID,
enqueue([
...queuedMessageOperations[messageID].map(item => ({
type: 'operation',
operation: item.operation,
})),
{
type: 'action',
action: {
type: clearQueuedMessageDMOpsActionType,
payload: {
messageID,
},
},
},
});
]);
}
}, [dispatch, messageInfos, processOperationsQueue, queuedMessageOperations]);
}, [dispatch, enqueue, messageInfos, queuedMessageOperations]);

const entryInfos = useSelector(entryInfoSelector);
const prevEntryInfosRef = React.useRef({});
Expand All @@ -130,15 +169,23 @@ function DMOpsQueueHandler(): React.Node {
if (!entryInfos[entryID] || prevEntryInfos[entryID]) {
continue;
}
processOperationsQueue(queuedEntryOperations[entryID]);
dispatch({
type: clearQueuedEntryDMOpsActionType,
payload: {
entryID,
enqueue([
...queuedEntryOperations[entryID].map(item => ({
type: 'operation',
operation: item.operation,
})),
{
type: 'action',
action: {
type: clearQueuedEntryDMOpsActionType,
payload: {
entryID,
},
},
},
});
]);
}
}, [dispatch, entryInfos, processOperationsQueue, queuedEntryOperations]);
}, [dispatch, enqueue, entryInfos, queuedEntryOperations]);

const queuedMembershipOperations = useSelector(
state => state.queuedDMOperations.membershipQueue,
Expand Down Expand Up @@ -169,24 +216,32 @@ function DMOpsQueueHandler(): React.Node {

runningMembershipOperations.current.get(threadID)?.add(member.id);

processOperationsQueue(queuedMembershipOperations[threadID][member.id]);

dispatch({
type: clearQueuedMembershipDMOpsActionType,
payload: {
threadID,
userID: member.id,
enqueue([
...queuedMembershipOperations[threadID][member.id].map(item => ({
type: 'operation',
operation: item.operation,
})),
{
type: 'action',
action: {
type: clearQueuedMembershipDMOpsActionType,
payload: {
threadID,
userID: member.id,
},
},
},
});
runningMembershipOperations.current.get(threadID)?.delete(member.id);
{
type: 'function',
itemFunction: () =>
runningMembershipOperations.current
.get(threadID)
?.delete(member.id),
},
]);
}
}
}, [
dispatch,
processOperationsQueue,
queuedMembershipOperations,
threadInfos,
]);
}, [dispatch, enqueue, queuedMembershipOperations, threadInfos]);

return null;
}
Expand Down

0 comments on commit 642f52d

Please sign in to comment.