Skip to content

Commit

Permalink
Better tracking of SFTP events
Browse files Browse the repository at this point in the history
  • Loading branch information
DaneEveritt committed Jul 10, 2022
1 parent 59fbd2b commit f28e062
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 60 deletions.
11 changes: 7 additions & 4 deletions internal/cron/activity_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (
)

var key = []byte("events")
var processing system.AtomicBool
var activityCron system.AtomicBool

func processActivityLogs(m *server.Manager, c int64) error {
// Don't execute this cron if there is currently one running. Once this task is completed
// go ahead and mark it as no longer running.
if !processing.SwapIf(true) {
log.WithField("subsystem", "cron").Warn("cron: process overlap detected, skipping this run")
if !activityCron.SwapIf(true) {
log.WithField("subsystem", "cron").WithField("cron", "activity_logs").Warn("cron: process overlap detected, skipping this run")
return nil
}
defer processing.Store(false)
defer activityCron.Store(false)

var list [][]byte
err := database.DB().View(func(tx *nutsdb.Tx) error {
Expand All @@ -30,6 +30,9 @@ func processActivityLogs(m *server.Manager, c int64) error {
// release the lock on this process.
end := int(c)
if s, err := tx.LSize(database.ServerActivityBucket, key); err != nil {
if errors.Is(err, nutsdb.ErrBucket) {
return nil
}
return errors.WithStackIf(err)
} else if s < end || s == 0 {
if s == 0 {
Expand Down
16 changes: 14 additions & 2 deletions internal/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"time"
)

const ErrCronRunning = errors.Sentinel("cron: job already running")

var o system.AtomicBool

// Scheduler configures the internal cronjob system for Wings and returns the scheduler
// instance to the caller. This should only be called once per application lifecycle, additional
// calls will result in an error being returned.
func Scheduler(m *server.Manager) (*gocron.Scheduler, error) {
if o.Load() {
if !o.SwapIf(true) {
return nil, errors.New("cron: cannot call scheduler more than once in application lifecycle")
}
o.Store(true)
l, err := time.LoadLocation(config.Get().System.Timezone)
if err != nil {
return nil, errors.Wrap(err, "cron: failed to parse configured system timezone")
Expand All @@ -32,5 +33,16 @@ func Scheduler(m *server.Manager) (*gocron.Scheduler, error) {
}
})

_, _ = s.Tag("sftp").Every(20).Seconds().Do(func() {
runner := sftpEventProcessor{mu: system.NewAtomicBool(false), manager: m}
if err := runner.Run(); err != nil {
if errors.Is(err, ErrCronRunning) {
log.WithField("cron", "sftp_events").Warn("cron: job already running, skipping...")
} else {
log.WithField("error", err).Error("cron: failed to process sftp events")
}
}
})

return s, nil
}
174 changes: 174 additions & 0 deletions internal/cron/sftp_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package cron

import (
"bytes"
"emperror.dev/errors"
"encoding/gob"
"github.com/pterodactyl/wings/internal/database"
"github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/sftp"
"github.com/pterodactyl/wings/system"
"github.com/xujiajun/nutsdb"
"path/filepath"
)

type UserDetail struct {
UUID string
IP string
}

type Users map[UserDetail][]sftp.EventRecord
type Events map[sftp.Event]Users

type sftpEventProcessor struct {
mu *system.AtomicBool
manager *server.Manager
}

// Run executes the cronjob and processes sftp activities into normal activity log entries
// by merging together similar records. This helps to reduce the sheer amount of data that
// gets passed back to the Panel and provides simpler activity logging.
func (sep *sftpEventProcessor) Run() error {
if !sep.mu.SwapIf(true) {
return errors.WithStack(ErrCronRunning)
}
defer sep.mu.Store(false)

set, err := sep.Events()
if err != nil {
return err
}

for s, el := range set {
events := make(Events)
// Take all of the events that we've pulled out of the system for every server and then
// parse them into a more usable format in order to create activity log entries for each
// user, ip, and server instance.
for _, e := range el {
u := UserDetail{UUID: e.User, IP: e.IP}
if _, ok := events[e.Event]; !ok {
events[e.Event] = make(Users)
}
if _, ok := events[e.Event][u]; !ok {
events[e.Event][u] = []sftp.EventRecord{}
}
events[e.Event][u] = append(events[e.Event][u], e)
}

// Now that we have all of the events, go ahead and create a normal activity log entry
// for each instance grouped by user & IP for easier Panel reporting.
for k, v := range events {
for u, records := range v {
files := make([]interface{}, len(records))
for i, r := range records {
if r.Action.Target != "" {
files[i] = map[string]string{
"from": filepath.Clean(r.Action.Entity),
"to": filepath.Clean(r.Action.Target),
}
} else {
files[i] = filepath.Clean(r.Action.Entity)
}
}

entry := server.Activity{
Server: s,
User: u.UUID,
Event: server.Event("server:sftp." + string(k)),
Metadata: server.ActivityMeta{"files": files},
IP: u.IP,
// Just assume that the first record in the set is the oldest and the most relevant
// of the timestamps to use.
Timestamp: records[0].Timestamp,
}

if err := entry.Save(); err != nil {
return errors.Wrap(err, "cron: failed to save new event for server")
}

if err := sep.Cleanup([]byte(s)); err != nil {
return errors.Wrap(err, "cron: failed to cleanup events")
}
}
}
}

return nil
}

// Cleanup runs through all of the events we have currently tracked in the bucket and removes
// them once we've managed to process them and created the associated server activity events.
func (sep *sftpEventProcessor) Cleanup(key []byte) error {
return database.DB().Update(func(tx *nutsdb.Tx) error {
s, err := sep.sizeOf(tx, key)
if err != nil {
return err
}
if s == 0 {
return nil
} else if s < sep.limit() {
for i := 0; i < s; i++ {
if _, err := tx.LPop(database.SftpActivityBucket, key); err != nil {
return errors.WithStack(err)
}
}
} else {
if err := tx.LTrim(database.ServerActivityBucket, key, sep.limit()-1, -1); err != nil {
return errors.WithStack(err)
}
}
return nil
})
}

// Events pulls all of the events in the SFTP event bucket and parses them into an iterable
// set allowing Wings to process the events and send them back to the Panel instance.
func (sep *sftpEventProcessor) Events() (map[string][]sftp.EventRecord, error) {
set := make(map[string][]sftp.EventRecord, len(sep.manager.Keys()))
err := database.DB().View(func(tx *nutsdb.Tx) error {
for _, k := range sep.manager.Keys() {
lim := sep.limit()
if s, err := sep.sizeOf(tx, []byte(k)); err != nil {
return err
} else if s == 0 {
continue
} else if s < lim {
lim = -1
}
list, err := tx.LRange(database.SftpActivityBucket, []byte(k), 0, lim)
if err != nil {
return errors.WithStack(err)
}
set[k] = make([]sftp.EventRecord, len(list))
for i, l := range list {
if err := gob.NewDecoder(bytes.NewBuffer(l)).Decode(&set[k][i]); err != nil {
return errors.WithStack(err)
}
}
}
return nil
})

return set, err
}

// sizeOf is a wrapper around a nutsdb transaction to get the size of a key in the
// bucket while also accounting for some expected error conditions and handling those
// automatically.
func (sep *sftpEventProcessor) sizeOf(tx *nutsdb.Tx, key []byte) (int, error) {
s, err := tx.LSize(database.SftpActivityBucket, key)
if err != nil {
if errors.Is(err, nutsdb.ErrBucket) {
return 0, nil
}
return 0, errors.WithStack(err)
}
return s, nil
}

// limit returns the number of records that are processed for each server at
// once. This will then be translated into a variable number of activity log
// events, with the worst case being a single event with "n" associated files.
func (sep *sftpEventProcessor) limit() int {
return 500
}
1 change: 1 addition & 0 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var syncer sync.Once

const (
ServerActivityBucket = "server_activity"
SftpActivityBucket = "sftp_activity"
)

func initialize() error {
Expand Down
4 changes: 4 additions & 0 deletions server/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (ra RequestActivity) IP() string {
return ra.ip
}

func (ra *RequestActivity) User() string {
return ra.user
}

// SetUser clones the RequestActivity struct and sets a new user value on the copy
// before returning it.
func (ra RequestActivity) SetUser(u string) RequestActivity {
Expand Down
18 changes: 18 additions & 0 deletions server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func (m *Manager) Client() remote.Client {
return m.client
}

// Len returns the count of servers stored in the manager instance.
func (m *Manager) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.servers)
}

// Keys returns all of the server UUIDs stored in the manager set.
func (m *Manager) Keys() []string {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([]string, len(m.servers))
for i, s := range m.servers {
keys[i] = s.ID()
}
return keys
}

// Put replaces all the current values in the collection with the value that
// is passed through.
func (m *Manager) Put(s []*Server) {
Expand Down
82 changes: 82 additions & 0 deletions sftp/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package sftp

import (
"bytes"
"emperror.dev/errors"
"encoding/gob"
"github.com/apex/log"
"github.com/pterodactyl/wings/internal/database"
"github.com/xujiajun/nutsdb"
"regexp"
"time"
)

type eventHandler struct {
ip string
user string
server string
}

type Event string
type FileAction struct {
// Entity is the targeted file or directory (depending on the event) that the action
// is being performed _against_, such as "/foo/test.txt". This will always be the full
// path to the element.
Entity string
// Target is an optional (often blank) field that only has a value in it when the event
// is specifically modifying the entity, such as a rename or move event. In that case
// the Target field will be the final value, such as "/bar/new.txt"
Target string
}

type EventRecord struct {
Event Event
Action FileAction
IP string
User string
Timestamp time.Time
}

const (
EventWrite = Event("write")
EventCreate = Event("create")
EventCreateDirectory = Event("create-directory")
EventRename = Event("rename")
EventDelete = Event("delete")
)

var ipTrimRegex = regexp.MustCompile(`(:\d*)?$`)

// Log logs an event into the Wings bucket for SFTP activity which then allows a seperate
// cron to run and parse the events into a more manageable stream of event data to send
// back to the Panel instance.
func (eh *eventHandler) Log(e Event, fa FileAction) error {
r := EventRecord{
Event: e,
Action: fa,
IP: ipTrimRegex.ReplaceAllString(eh.ip, ""),
User: eh.user,
Timestamp: time.Now().UTC(),
}

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(r); err != nil {
return errors.Wrap(err, "sftp: failed to encode event")
}

return database.DB().Update(func(tx *nutsdb.Tx) error {
if err := tx.RPush(database.SftpActivityBucket, []byte(eh.server), buf.Bytes()); err != nil {
return errors.Wrap(err, "sftp: failed to push event to stack")
}
return nil
})
}

// MustLog is a wrapper around log that will trigger a fatal error and exit the application
// if an error is encountered during the logging of the event.
func (eh *eventHandler) MustLog(e Event, fa FileAction) {
if err := eh.Log(e, fa); err != nil {
log.WithField("error", err).Fatal("sftp: failed to log event")
}
}
Loading

0 comments on commit f28e062

Please sign in to comment.