Skip to content

Commit

Permalink
Merge branch 'main' into beta482
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoshinonyaruko authored Aug 18, 2024
2 parents a3aa6c1 + 09741aa commit c49f871
Showing 1 changed file with 111 additions and 1 deletion.
112 changes: 111 additions & 1 deletion botgo/websocket/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -26,6 +27,18 @@ const DefaultQueueSize = 10000
// 定义全局变量
var global_s int64

// PayloadWithTimestamp 存储带时间戳的 WSPayload
type PayloadWithTimestamp struct {
Payload *dto.WSPayload
Timestamp time.Time
}

var dataMap sync.Map

func init() {
StartCleanupRoutine()
}

// Setup 依赖注册
func Setup() {
websocket.Register(&Client{})
Expand Down Expand Up @@ -187,6 +200,33 @@ func (c *Client) Session() *dto.Session {
return c.session
}

// func (c *Client) readMessageToQueue() {
// for {
// _, message, err := c.conn.ReadMessage()
// if err != nil {
// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message))
// close(c.messageQueue)
// c.closeChan <- err
// return
// }
// payload := &dto.WSPayload{}
// if err := json.Unmarshal(message, payload); err != nil {
// log.Errorf("%s json failed, %v", c.session, err)
// continue
// }
// // 更新 global_s 的值
// atomic.StoreInt64(&global_s, payload.S)

// payload.RawMessage = message
// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))
// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
// if c.isHandleBuildIn(payload) {
// continue
// }
// c.messageQueue <- payload
// }
// }

func (c *Client) readMessageToQueue() {
for {
_, message, err := c.conn.ReadMessage()
Expand All @@ -201,19 +241,61 @@ func (c *Client) readMessageToQueue() {
log.Errorf("%s json failed, %v", c.session, err)
continue
}
// 更新 global_s 的值
atomic.StoreInt64(&global_s, payload.S)

payload.RawMessage = message
log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))

// 不过滤心跳事件
if payload.OPCode != 11 {
// 计算数据的哈希值
dataHash := calculateDataHash(payload.Data)

// 检查是否已存在相同的 Data
if existingPayload, ok := getDataFromSyncMap(dataHash); ok {
// 如果已存在相同的 Data,则丢弃当前消息
log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload)
continue
}

// 将新的 payload 存入 sync.Map
storeDataToSyncMap(dataHash, payload)
}

// 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
if c.isHandleBuildIn(payload) {
continue
}

c.messageQueue <- payload
}
}

func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) {
value, ok := dataMap.Load(dataHash)
if !ok {
return nil, false
}
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return nil, false
}
return payloadWithTimestamp.Payload, true
}

func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) {
payloadWithTimestamp := &PayloadWithTimestamp{
Payload: payload,
Timestamp: time.Now(),
}
dataMap.Store(dataHash, payloadWithTimestamp)
}

func calculateDataHash(data interface{}) string {
dataBytes, _ := json.Marshal(data)
return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法
}

// 在全局范围通过atomic访问s值与message_id的映射
func GetGlobalS() int64 {
return atomic.LoadInt64(&global_s)
Expand Down Expand Up @@ -301,3 +383,31 @@ func (c *Client) readyHandler(payload *dto.WSPayload) {
event.DefaultHandlers.Ready(payload, readyData)
}
}

const cleanupInterval = 5 * time.Minute // 清理间隔时间

func StartCleanupRoutine() {
go func() {
for {
<-time.After(cleanupInterval)
cleanupDataMap()
}
}()
}

func cleanupDataMap() {
now := time.Now()
dataMap.Range(func(key, value interface{}) bool {
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return true
}

// 检查时间戳,清理超过一定时间的数据
if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval {
dataMap.Delete(key)
}

return true
})
}

0 comments on commit c49f871

Please sign in to comment.