Skip to content

Commit

Permalink
Merge branch 'main' into beta492
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoshinonyaruko authored Oct 13, 2024
2 parents b2de7c5 + 107b096 commit dbf20f6
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
112 changes: 111 additions & 1 deletion botgo/websocket/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -26,6 +27,18 @@ const DefaultQueueSize = 10000
// 定义全局变量
var global_s int64

// PayloadWithTimestamp 存储带时间戳的 WSPayload
type PayloadWithTimestamp struct {
Payload *dto.WSPayload
Timestamp time.Time
}

var dataMap sync.Map

func init() {
StartCleanupRoutine()
}

// Setup 依赖注册
func Setup() {
websocket.Register(&Client{})
Expand Down Expand Up @@ -187,6 +200,33 @@ func (c *Client) Session() *dto.Session {
return c.session
}

// func (c *Client) readMessageToQueue() {
// for {
// _, message, err := c.conn.ReadMessage()
// if err != nil {
// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message))
// close(c.messageQueue)
// c.closeChan <- err
// return
// }
// payload := &dto.WSPayload{}
// if err := json.Unmarshal(message, payload); err != nil {
// log.Errorf("%s json failed, %v", c.session, err)
// continue
// }
// // 更新 global_s 的值
// atomic.StoreInt64(&global_s, payload.S)

// payload.RawMessage = message
// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))
// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
// if c.isHandleBuildIn(payload) {
// continue
// }
// c.messageQueue <- payload
// }
// }

func (c *Client) readMessageToQueue() {
for {
_, message, err := c.conn.ReadMessage()
Expand All @@ -201,19 +241,61 @@ func (c *Client) readMessageToQueue() {
log.Errorf("%s json failed, %v", c.session, err)
continue
}
// 更新 global_s 的值
atomic.StoreInt64(&global_s, payload.S)

payload.RawMessage = message
log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))

// 不过滤心跳事件
if payload.OPCode != 11 {
// 计算数据的哈希值
dataHash := calculateDataHash(payload.Data)

// 检查是否已存在相同的 Data
if existingPayload, ok := getDataFromSyncMap(dataHash); ok {
// 如果已存在相同的 Data,则丢弃当前消息
log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload)
continue
}

// 将新的 payload 存入 sync.Map
storeDataToSyncMap(dataHash, payload)
}

// 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
if c.isHandleBuildIn(payload) {
continue
}

c.messageQueue <- payload
}
}

func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) {
value, ok := dataMap.Load(dataHash)
if !ok {
return nil, false
}
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return nil, false
}
return payloadWithTimestamp.Payload, true
}

func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) {
payloadWithTimestamp := &PayloadWithTimestamp{
Payload: payload,
Timestamp: time.Now(),
}
dataMap.Store(dataHash, payloadWithTimestamp)
}

func calculateDataHash(data interface{}) string {
dataBytes, _ := json.Marshal(data)
return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法
}

// 在全局范围通过atomic访问s值与message_id的映射
func GetGlobalS() int64 {
return atomic.LoadInt64(&global_s)
Expand Down Expand Up @@ -301,3 +383,31 @@ func (c *Client) readyHandler(payload *dto.WSPayload) {
event.DefaultHandlers.Ready(payload, readyData)
}
}

const cleanupInterval = 5 * time.Minute // 清理间隔时间

func StartCleanupRoutine() {
go func() {
for {
<-time.After(cleanupInterval)
cleanupDataMap()
}
}()
}

func cleanupDataMap() {
now := time.Now()
dataMap.Range(func(key, value interface{}) bool {
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return true
}

// 检查时间戳,清理超过一定时间的数据
if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval {
dataMap.Delete(key)
}

return true
})
}
Empty file added go
Empty file.
78 changes: 78 additions & 0 deletions httpapi/httpapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,84 @@ func handleSendPrivateMessageSSESP(c *gin.Context, api openapi.OpenAPI, apiV2 op

}

// handleSendPrivateMessageSSE 处理发送私聊SSE消息的请求
func handleSendPrivateMessageSSESP(c *gin.Context, api openapi.OpenAPI, apiV2 openapi.OpenAPI) {
// 根据请求方法解析参数
if c.Request.Method == http.MethodGet {
var req struct {
GroupID string `json:"group_id" form:"group_id"`
UserID string `json:"user_id" form:"user_id"`
Message string `json:"message" form:"message"`
AutoEscape bool `json:"auto_escape" form:"auto_escape"`
}
// 从URL查询参数解析
if err := c.ShouldBindQuery(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
var InterfaceBody structs.InterfaceBody
if err := json.Unmarshal([]byte(req.Message), &InterfaceBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid message format"})
return
}

client := &HttpAPIClient{}
// 创建 ActionMessage 实例
message := callapi.ActionMessage{
Action: "send_private_msg_sse",
Params: callapi.ParamsContent{
GroupID: req.GroupID, // 注意这里需要转换类型,因为 GroupID 是 int64
UserID: req.UserID,
Message: InterfaceBody,
},
}
// 调用处理函数
retmsg, err := handlers.HandleSendPrivateMsgSSE(client, api, apiV2, message)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}

// 返回处理结果
c.Header("Content-Type", "application/json")
c.String(http.StatusOK, retmsg)
} else {
var req struct {
GroupID string `json:"group_id" form:"group_id"`
UserID string `json:"user_id" form:"user_id"`
Message interface{} `json:"message" form:"message"`
AutoEscape bool `json:"auto_escape" form:"auto_escape"`
}
// 从JSON或表单数据解析
if err := c.ShouldBind(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

client := &HttpAPIClient{}
// 创建 ActionMessage 实例
message := callapi.ActionMessage{
Action: "send_private_msg_sse",
Params: callapi.ParamsContent{
GroupID: req.GroupID, // 注意这里需要转换类型,因为 GroupID 是 int64
UserID: req.UserID,
Message: req.Message,
},
}
// 调用处理函数
retmsg, err := handlers.HandleSendPrivateMsgSSE(client, api, apiV2, message)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}

// 返回处理结果
c.Header("Content-Type", "application/json")
c.String(http.StatusOK, retmsg)
}

}

// handleSendPrivateMessageSSE 处理发送私聊SSE消息的请求
func handleSendPrivateMessageSSE(c *gin.Context, api openapi.OpenAPI, apiV2 openapi.OpenAPI) {
// 根据请求方法解析参数
Expand Down

0 comments on commit dbf20f6

Please sign in to comment.