Skip to content

Commit

Permalink
TF-3334 Add more concurrence test cases for WebSocketQueueHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangdat authored and dab246 committed Dec 24, 2024
1 parent f7dd28a commit 2e47f18
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1465,10 +1465,6 @@ class MailboxDashBoardController extends ReloadableController with UserSettingPo
void dispatchRoute(DashboardRoutes route) {
log('MailboxDashBoardController::dispatchRoute(): $route');
dashboardRoute.value = route;

if (dashboardRoute.value == DashboardRoutes.searchEmail) {
searchController.activateSimpleSearch();
}
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ class WebSocketQueueHandler {
return;
}

if (queueSize >= _maxQueueSize) {
log('WebSocketQueueHandler::enqueue:Queue full, removing oldest message');
_messageQueue.removeFirst();
try {
if (queueSize >= _maxQueueSize) {
log('WebSocketQueueHandler::enqueue:Queue full, removing oldest message');
_messageQueue.removeFirst();
}
} catch (e) {
logError('WebSocketQueueHandler::enqueue:Exception = $e');
}

log('WebSocketQueueHandler::enqueue(): ${message.id}');
_messageQueue.add(message);
_queueController.add(message);
}
Expand All @@ -57,6 +62,7 @@ class WebSocketQueueHandler {
try {
while (queueSize > 0) {
final message = _messageQueue.removeFirst();
log('WebSocketQueueHandler::_processQueue(): processing message ${message.id}');

try {
await processMessageCallback(message);
Expand All @@ -78,27 +84,40 @@ class WebSocketQueueHandler {
}

void _addToProcessedMessages(String messageId) {
if (_processedMessageIds.length >= _maxProcessedIdsSize) {
_processedMessageIds.removeFirst();
log('WebSocketQueueHandler::_addToProcessedMessages(): adding message $messageId to processed messages');
try {
if (_processedMessageIds.length >= _maxProcessedIdsSize) {
_processedMessageIds.removeFirst();
}
} catch (e) {
logError('WebSocketQueueHandler::_addToProcessedMessages:Exception = $e');
}

_processedMessageIds.add(messageId);
}

void removeMessagesUpToCurrent(String messageId) {
final isCurrentStateExist = _messageQueue
.any((message) => message.id == messageId);
try {
log('WebSocketQueueHandler::removeMessagesUpToCurrent(): removing messages up to $messageId');
final isCurrentStateExist = _messageQueue
.any((message) => message.id == messageId);

if (!isCurrentStateExist) {
log('WebSocketQueueHandler::removeMessagesUpToCurrent:Current state $messageId not found in the queue.');
return;
}
while (queueSize > 0) {
final removedMessage = _messageQueue.removeFirst();
if (removedMessage.id == messageId) {
break;
if (!isCurrentStateExist) {
log('WebSocketQueueHandler::removeMessagesUpToCurrent:Current state $messageId not found in the queue.');
return;
}

while (queueSize > 0) {
final removedMessage = _messageQueue.removeFirst();
log('WebSocketQueueHandler::removeMessagesUpToCurrent(): removing message ${removedMessage.id} up to $messageId');
if (removedMessage.id == messageId) {
break;
}
}
log('WebSocketQueueHandler::removeMessagesUpToCurrent:Updated Queue: $queueSize');
} catch (e) {
logError('WebSocketQueueHandler::removeMessagesUpToCurrent:Exception = $e');
}
log('WebSocketQueueHandler::removeMessagesUpToCurrent:Updated Queue: $queueSize');
}

@visibleForTesting
Expand Down
2 changes: 0 additions & 2 deletions lib/features/thread/presentation/thread_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ class ThreadController extends BaseController with EmailActionController {
),
beforeOption: const None(),
);
searchController.activateSimpleSearch();

final searchViewState = await _searchEmailInteractor.execute(
_session!,
_accountId!,
Expand Down
Loading

0 comments on commit 2e47f18

Please sign in to comment.