Skip to content

Commit

Permalink
feat: complete websocket agent
Browse files Browse the repository at this point in the history
  • Loading branch information
IllTamer committed Jan 25, 2024
1 parent c0190b9 commit 8bbb123
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 103 deletions.
3 changes: 2 additions & 1 deletion cmd/perpe/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package perp
import (
global "github.com/IUnlimit/perpetua/internal"
"github.com/IUnlimit/perpetua/internal/handle"
"github.com/IUnlimit/perpetua/internal/handle/api"
"github.com/bytedance/gopkg/util/gopool"
log "github.com/sirupsen/logrus"
)

func EnableAgent() {
var config = global.Config.Http
gopool.Go(func() {
handle.EnableHttpService(config.Port)
api.EnableHttpService(config.Port)
})

err := handle.CreateNTQQWebSocket()
Expand Down
15 changes: 12 additions & 3 deletions internal/global.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package global

import "github.com/IUnlimit/perpetua/internal/model"
import (
"github.com/IUnlimit/perpetua/internal/model"
"regexp"
)

// MsgData websocket message data type
type MsgData map[string]interface{}

// ParentPath 配置文件目录路径
// ParentPath perp files path
const ParentPath = "perpetua/"

// LgrFolder lgr文件存放路径
// LgrFolder lgr bin directory
const LgrFolder = "Lagrange.OneBot/"

// EchoPrefix is prefix for generating echos
const EchoPrefix = "perp"

// EchoRegx to match ${EchoPrefix}#${uuid}#client-echo
var EchoRegx = regexp.MustCompile(`([^#]+)#([^#]+)#(.+)`)

// Config perpetua config.yml
var Config *model.Config

Expand Down
5 changes: 3 additions & 2 deletions internal/handle/http.go → internal/handle/api/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package handle
package api

import (
"fmt"
"github.com/IUnlimit/perpetua/internal/handle"
"github.com/IUnlimit/perpetua/internal/logger"
"github.com/IUnlimit/perpetua/internal/utils"
"github.com/gin-gonic/gin"
Expand All @@ -24,7 +25,7 @@ func GetWebSocketPort(ctx *gin.Context) {

port := listen.Addr().(*net.TCPAddr).Port
// open new ws service, wait for minutes
CreateWSInstance(port)
handle.CreateWSInstance(port)
log.Info("Create new websocket connection on port: ", port)

utils.SendResponse(ctx, port)
Expand Down
47 changes: 47 additions & 0 deletions internal/handle/echo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package handle

import (
global "github.com/IUnlimit/perpetua/internal"
collections "github.com/chenjiandongx/go-queue"
)

var echoMap *EchoMap

// EchoMap 利用 echo 完成 NTQQ -> client 消息调度 (发送时标记echo)
type EchoMap struct {
Receive chan bool

dataMap map[string]*collections.Queue
}

func NewEchoMap() *EchoMap {
return &EchoMap{
dataMap: make(map[string]*collections.Queue),
Receive: make(chan bool),
}
}

// JustPut echo
func (em *EchoMap) JustPut(id string, data *global.MsgData) {
queue := echoMap.dataMap[id]
if queue == nil {
queue = collections.NewQueue()
echoMap.dataMap[id] = queue
}
queue.Put(data)
}

func (em *EchoMap) JustGet(id string, consumer func(*global.MsgData)) {
queue := echoMap.dataMap[id]
if queue == nil {
return
}

for {
e, ok := queue.Get()
if !ok {
return
}
consumer(e.(*global.MsgData))
}
}
94 changes: 94 additions & 0 deletions internal/handle/echo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package handle

import (
"context"
"encoding/json"
"fmt"
global "github.com/IUnlimit/perpetua/internal"
collections "github.com/chenjiandongx/go-queue"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"testing"
)

func TestEcho(t *testing.T) {
id := uuid.NewString()
str := fmt.Sprintf("%s#%s#a", global.EchoPrefix, id)
re := global.EchoRegx
matches := re.FindStringSubmatch(str)

if len(matches) == 4 {
group1 := matches[1]
group2 := matches[2]
group3 := matches[3]

fmt.Println("Group 1:", group1)
fmt.Println("Group 2:", group2)
fmt.Println("Group 3:", group3)
} else {
fmt.Println("No match found, len: ", len(uuid.NewString()))
}
}

func TestUpdateEcho(t *testing.T) {
message := `{
"action": "send_private_msg",
"params": {
"user_id": 765743073,
"message": "hello"
},
"echo": ""
}`
var msgData global.MsgData
err := json.Unmarshal([]byte(message), &msgData)
if err != nil {
log.Errorf("[<-Client] Failed to unmarshal client message: %s", string(message))
return
}

// sign with echo field
id := uuid.NewString()
echo := msgData["echo"].(string)
echo = fmt.Sprintf("%s#%s#%s", global.EchoPrefix, id, echo)
msgData["echo"] = echo
log.Printf("[<-Client] Update client(port-%d) message echo: %s", 666, echo)
}

func TestSend(t *testing.T) {
message := `{
"action": "send_private_msg",
"params": {
"user_id": 765743073,
"message": "hello"
},
"echo": ""
}`
var msgData global.MsgData
err := json.Unmarshal([]byte(message), &msgData)
if err != nil {
log.Errorf("[<-Client] Failed to unmarshal client message: %s", string(message))
return
}

bytes, _ := json.Marshal(msgData)
fmt.Println(string(bytes))
}

func TestQueue(t *testing.T) {
queue := collections.NewQueue()
queue.Put("666")
for {
e, ok := queue.Get()
if !ok {
return
}
fmt.Println(e)
}
}

func TestSlice(t *testing.T) {
receivers := make([]*Handler, 1)
handler := NewHandler(context.Background())
receivers = append(receivers, handler)
fmt.Println(len(receivers))
}
38 changes: 31 additions & 7 deletions internal/handle/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,42 @@ import (
"context"
global "github.com/IUnlimit/perpetua/internal"
collections "github.com/chenjiandongx/go-queue"
"github.com/google/uuid"
"sync"
)

// handleList stores handle.Handler for client websocket
var handleList []*Handler

type Handler struct {
ctx context.Context
wg sync.WaitGroup
receive chan bool
Receive chan bool
Lock sync.Mutex

id string
ctx context.Context
// waiting goroutine count
waitCount int
queue *collections.Queue
// thread safe queue
queue *collections.Queue
wg sync.WaitGroup
}

func (h *Handler) AddMessage(uuid string) {
h.queue.Put(uuid)
}

// GetMessage from local cache
func (h *Handler) GetMessage(invoke func(data *global.MsgData)) {
for e, _ := h.queue.Get(); e != nil; {
func (h *Handler) GetMessage(consumer func(data global.MsgData)) {
for {
e, ok := h.queue.Get()
if !ok {
return
}
data, _ := globalCache.cache.Get(e)
if data == nil {
continue
}
invoke(data.(*global.MsgData))
consumer(data.(global.MsgData))
}
}

Expand All @@ -57,10 +66,25 @@ func (h *Handler) WaitDone() {
h.wg.Wait()
}

func (h *Handler) GetId() string {
return h.id
}

func NewHandler(ctx context.Context) *Handler {
return &Handler{
ctx: ctx,
id: uuid.NewString(),
Receive: make(chan bool),
waitCount: 0,
queue: collections.NewQueue(),
}
}

func FindHandler(id string) *Handler {
for _, handler := range handleList {
if handler.id == id {
return handler
}
}
return nil
}
9 changes: 9 additions & 0 deletions internal/handle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ package handle

import (
global "github.com/IUnlimit/perpetua/internal"
"github.com/gorilla/websocket"
"net/http"
)

func Init() {
echoMap = NewEchoMap()
globalCache = NewCache(global.Config.MsgExpireTime)
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// allow all sources conn
return true
},
}
}
1 change: 0 additions & 1 deletion internal/handle/loadbalance.go

This file was deleted.

Loading

0 comments on commit 8bbb123

Please sign in to comment.