Skip to content

Commit

Permalink
add mutex to destination but cc mutex need to be fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
janusec2 committed Feb 9, 2023
1 parent 95df93f commit 88e4539
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 24 deletions.
2 changes: 2 additions & 0 deletions backend/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func CheckOfflineDestinations(nowTimeStamp int64) {
if err != nil {
utils.DebugPrintln("Unmarshal K8S API", err)
}
dest.Mutex.Lock()
dest.Pods = ""
for _, podItem := range pods.Items {
if podItem.Status.Phase == "Running" {
Expand All @@ -67,6 +68,7 @@ func CheckOfflineDestinations(nowTimeStamp int64) {
}
dest.CheckTime = nowTimeStamp
dest.Online = true
dest.Mutex.Unlock()
}
}
}
Expand Down
73 changes: 51 additions & 22 deletions backend/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,64 @@ import (
"janusec/utils"
"net/http"
"strings"
"sync"
"time"
)

func UpdatePods(dest *models.Destination, nowTimeStamp int64) {
dest.IsUpdating = true
dest.Mutex.Lock() // write lock
request, _ := http.NewRequest("GET", dest.PodsAPI, nil)
request.Header.Set("Content-Type", "application/json")
resp, err := utils.GetResponse(request)
if err != nil {
utils.DebugPrintln("Check K8S API GetResponse", err)
dest.CheckTime = nowTimeStamp
dest.Online = false
}
pods := models.PODS{}
err = json.Unmarshal(resp, &pods)
if err != nil {
utils.DebugPrintln("Unmarshal K8S API", err)
}
dest.Pods = ""
for _, podItem := range pods.Items {
if podItem.Status.Phase == "Running" {
if len(dest.Pods) > 0 {
dest.Pods += "|"
}
dest.Pods += podItem.Status.PodIP + ":" + dest.PodPort
}
}
dest.Mutex.Unlock()
dest.IsUpdating = false
}

func SelectPodFromDestination(dest *models.Destination, srcIP string, r *http.Request) string {
nowTimeStamp := time.Now().Unix()
if len(dest.Pods) == 0 || (nowTimeStamp-dest.CheckTime) > 60 {
// check k8s api if exceed 60 seconds
request, _ := http.NewRequest("GET", dest.PodsAPI, nil)
request.Header.Set("Content-Type", "application/json")
resp, err := utils.GetResponse(request)
if err != nil {
utils.DebugPrintln("Check K8S API GetResponse", err)
dest.CheckTime = nowTimeStamp
dest.Online = false
}
pods := models.PODS{}
err = json.Unmarshal(resp, &pods)
if err != nil {
utils.DebugPrintln("Unmarshal K8S API", err)
var isEmptyPods bool
if len(dest.Pods) == 0 {
isEmptyPods = true
} else {
isEmptyPods = false
}
wg := new(sync.WaitGroup)
if !dest.IsUpdating && (isEmptyPods || (nowTimeStamp-dest.CheckTime) > 60) {
if isEmptyPods {
wg.Add(1)
}
dest.Pods = ""
for _, podItem := range pods.Items {
if podItem.Status.Phase == "Running" {
if len(dest.Pods) > 0 {
dest.Pods += "|"
}
dest.Pods += podItem.Status.PodIP + ":" + dest.PodPort
// check k8s api if exceed 60 seconds
go func(dest *models.Destination, nowTimeStamp int64, wg *sync.WaitGroup) {
UpdatePods(dest, nowTimeStamp)
if isEmptyPods {
wg.Done()
}
}
}(dest, nowTimeStamp, wg)
}
if isEmptyPods {
wg.Wait()
}
dest.Mutex.RLock()
// select target pod from dest.Pods directly
dests := strings.Split(dest.Pods, "|")
// According to Hash(IP+UA)
Expand All @@ -53,6 +81,7 @@ func SelectPodFromDestination(dest *models.Destination, srcIP string, r *http.Re
}
hashUInt32 := h.Sum32()
destIndex := hashUInt32 % uint32(len(dests))
dest.Mutex.RUnlock()
return dests[destIndex]
}

Expand Down
2 changes: 1 addition & 1 deletion data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
// IsPrimary i.e. Is Primary Node
IsPrimary bool
// Version of JANUSEC
Version = "1.3.0"
Version = "1.3.1"
// NodeKey share with all nodes
NodeKey []byte
)
Expand Down
4 changes: 4 additions & 0 deletions firewall/cc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func CCAttackTick(appID int64) {
clientID := key.(string)
stat := value.(*models.ClientStat)
//fmt.Println("CCAttackTick:", appID, clientID, stat)
stat.Mutex.Lock()
if stat.IsBadIP {
stat.RemainSeconds -= ccPolicy.IntervalMilliSeconds / 1000.0
if stat.RemainSeconds <= 0 {
Expand Down Expand Up @@ -83,6 +84,7 @@ func CCAttackTick(appID int64) {
}
stat.SlowCount += stat.QuickCount
stat.QuickCount = 0
stat.Mutex.Unlock()
return true
})
}
Expand Down Expand Up @@ -135,6 +137,8 @@ func IsCCAttack(r *http.Request, app *models.Application, srcIP string) (bool, *
clientID := data.SHA256Hash(preHashContent)
clientIDStat, _ := appCCCount.LoadOrStore(clientID, &models.ClientStat{QuickCount: 0, SlowCount: 0, TimeFrameCount: 0, IsBadIP: false, RemainSeconds: 0})
clientStat := clientIDStat.(*models.ClientStat)
clientStat.Mutex.Lock()
defer clientStat.Mutex.Unlock()
if clientStat.IsBadIP {
needLog := false
if clientStat.QuickCount == 0 {
Expand Down
2 changes: 2 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ func ReverseHandlerFunc(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", targetDest)
dest.CheckTime = nowTimeStamp
if err != nil {
dest.Mutex.Lock()
dest.Online = false
dest.Mutex.Unlock()
utils.DebugPrintln("DialContext error", err)
if data.NodeSetting.SMTP.SMTPEnabled {
sendOfflineNotification(app, targetDest)
Expand Down
4 changes: 4 additions & 0 deletions models/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ type Destination struct {
// Online status of Destination (IP:Port), added in V0.9.11
Online bool `json:"online"`
CheckTime int64 `json:"check_time"`

// added in 1.3.1, K8s routine updating and avoid race
Mutex sync.RWMutex `json:"-"`
IsUpdating bool `json:"-"`
}

// PODS for k8s /api/v1/namespaces/default/pods
Expand Down
4 changes: 4 additions & 0 deletions models/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package models

import (
"database/sql"
"sync"
)

type PolicyKey string
Expand Down Expand Up @@ -137,6 +138,9 @@ type ClientStat struct {

// RemainSeconds used for block time frame
RemainSeconds float64 //time.Duration

// added v1.3.1
Mutex sync.Mutex
}

type VulnType struct {
Expand Down
2 changes: 1 addition & 1 deletion release_batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ printf "Creating installation package\n"
printf "Checklist:\n"
printf "* Angular Admin Version Check. \n"
printf "* Janusec Version Check. \n"
version="1.3.0"
version="1.3.1"
printf "Version: ${version} \n"

read -r -p "Are You Sure? [Y/n] " option
Expand Down

0 comments on commit 88e4539

Please sign in to comment.