Skip to content

Commit

Permalink
fix(desktop: optimize message rendering with dynamic buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ysfscream authored and Red-Asuka committed Dec 13, 2024
1 parent 11bc520 commit 2094d27
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
3 changes: 3 additions & 0 deletions src/utils/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ export const MAX_MESSAGES_COUNT = 40

// Maximum scroll offset in pixels to trigger loading more messages
export const SCROLL_OFFSET_MAX_NUM = 100

// Messages buffer time in milliseconds
export const MESSAGES_BUFFER_TIME = 1000
87 changes: 50 additions & 37 deletions src/views/connections/ConnectionsDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ import { ipcRenderer } from 'electron'
import { MqttClient, IConnackPacket, IPublishPacket, IClientPublishOptions, IDisconnectPacket, Packet } from 'mqtt'
import _ from 'lodash'
import { Subject, fromEvent } from 'rxjs'
import { bufferTime, map, filter, takeUntil, shareReplay } from 'rxjs/operators'
import { bufferTime, map, filter, takeUntil, shareReplay, distinctUntilChanged } from 'rxjs/operators'
import cbor from 'cbor'
import { pack, unpack } from 'msgpackr'
Expand All @@ -339,7 +339,6 @@ import matchMultipleSearch from '@/utils/matchMultipleSearch'
import { matchTopicMethod } from '@/utils/topicMatch'
import { createClient, ignoreQoS0Message } from '@/utils/mqttUtils'
import validFormatJson from '@/utils/validFormatJson'
import delay from '@/utils/delay'
import MessageList from '@/components/MessageList.vue'
import MsgPublish from '@/components/MsgPublish.vue'
Expand Down Expand Up @@ -369,7 +368,13 @@ import { serializeAvroToBuffer, deserializeBufferToAvro } from '@/utils/avro'
import { globalEventBus } from '@/utils/globalEventBus'
import SyncTopicTreeDialog from '@/widgets/SyncTopicTreeDialog.vue'
import ConnectionSelect from '@/components/ConnectionSelect.vue'
import { MAX_MESSAGES_COUNT, SCROLL_BOTTOM_THRESHOLD, SCROLL_HEIGHT_COMPENSATION } from '@/utils/constant'
import {
MAX_MESSAGES_COUNT,
SCROLL_BOTTOM_THRESHOLD,
SCROLL_HEIGHT_COMPENSATION,
MESSAGE_RATE_THRESHOLD,
MESSAGES_BUFFER_TIME,
} from '@/utils/constant'
type CommandType =
| 'searchContent'
Expand Down Expand Up @@ -1509,8 +1514,10 @@ export default class ConnectionsDetail extends Vue {
// received message
private onMessageArrived(client: MqttClient, id: string) {
const unsubscribe$ = new Subject<void>()
const messageBuffer$ = new Subject<MessageModel>()
let isBufferEnabled = false
// Add close event handler if not already present
// Add close event handler
if (client.listenerCount('close') <= 1) {
fromEvent(client, 'close').subscribe(() => {
unsubscribe$.next()
Expand All @@ -1528,39 +1535,6 @@ export default class ConnectionsDetail extends Vue {
shareReplay(1),
)
// 消息速率检测流
// const rateCheck$ = messageSubject$.pipe(
// bufferTime(1000),
// map((messages) => messages.length),
// map((rate) => {
// console.log('Message Rate', rate)
// if (rate > 10) {
// this.$log.info(`Message rate: ${rate}/s, enabling buffer`)
// return true
// }
// return false
// }),
// )
// // 消息速率检测流
// rateCheck$.subscribe((enableBuffer) => {
// console.log('Buffer Enabled', enableBuffer)
// })
// Print message log
messageSubject$.subscribe((message: MessageModel) => {
this.printMessageLog(id, message)
this.renderMessage(id, message)
})
// Render messages
// 需要接收到的数据量非常大的时候,才动态开启
// messageSubject$.pipe(bufferTime(500)).subscribe((messages: MessageModel[]) => {
// if (messages.length) {
// this.renderMessage(id, messages)
// }
// })
// Save messages with QoS filtering
messageSubject$
.pipe(
Expand All @@ -1572,6 +1546,45 @@ export default class ConnectionsDetail extends Vue {
this.saveMessage(id, messages)
}
})
// Rate monitoring stream
const rateCheck$ = messageSubject$.pipe(
bufferTime(MESSAGES_BUFFER_TIME),
map((messages) => messages.length),
map((rate) => rate > MESSAGE_RATE_THRESHOLD),
distinctUntilChanged(),
)
// Toggle buffer mode based on message rate
rateCheck$.subscribe((shouldEnableBuffer) => {
isBufferEnabled = shouldEnableBuffer
if (isBufferEnabled) {
this.$log.info(`Message buffer mode enabled (> ${MESSAGE_RATE_THRESHOLD}/s)`)
}
})
// Handle message rendering with dynamic buffering
messageSubject$.subscribe((message) => {
if (!message) return
this.printMessageLog(id, message)
if (isBufferEnabled) {
messageBuffer$.next(message)
} else {
this.renderMessage(id, message)
}
})
// Buffer rendering stream
messageBuffer$
.pipe(
bufferTime(MESSAGES_BUFFER_TIME),
filter((messages) => messages.length > 0),
)
.subscribe((messages) => {
if (messages.length) {
this.renderMessage(id, messages)
}
})
}
// Set timed message success
Expand Down

0 comments on commit 2094d27

Please sign in to comment.