diff --git a/internal/pkg/queue/dequeue.go b/internal/pkg/queue/dequeue.go index 2d3a762e..501d081c 100644 --- a/internal/pkg/queue/dequeue.go +++ b/internal/pkg/queue/dequeue.go @@ -2,6 +2,7 @@ package queue import ( "fmt" + "time" "github.com/internetarchive/Zeno/internal/pkg/queue/index" ) @@ -9,6 +10,8 @@ import ( // Dequeue removes and returns the next item from the queue // It blocks until an item is available func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { + startTime := time.Now() + if q.closed { return nil, ErrQueueClosed } @@ -64,5 +67,8 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { q.updateDequeueStats(item.URL.Host) + // We only evaluate the average time for the dequeue that succeeded + q.addSample(time.Since(startTime), DequeueSample) + return item, nil } diff --git a/internal/pkg/queue/enqueue.go b/internal/pkg/queue/enqueue.go index 899334b8..ae6d2cb0 100644 --- a/internal/pkg/queue/enqueue.go +++ b/internal/pkg/queue/enqueue.go @@ -4,9 +4,12 @@ import ( "errors" "fmt" "io" + "time" ) func (q *PersistentGroupedQueue) Enqueue(item *Item) error { + startTime := time.Now() + if item == nil { return errors.New("cannot enqueue nil item") } @@ -47,5 +50,8 @@ func (q *PersistentGroupedQueue) Enqueue(item *Item) error { // Update empty status q.Empty.Set(false) + // We only evaluate the average time for the dequeue that succeeded + q.addSample(time.Since(startTime), EnqueueSample) + return nil } diff --git a/internal/pkg/queue/queue.go b/internal/pkg/queue/queue.go index 79207c68..c7f525a4 100644 --- a/internal/pkg/queue/queue.go +++ b/internal/pkg/queue/queue.go @@ -4,7 +4,6 @@ import ( "encoding/gob" "errors" "fmt" - "log/slog" "net/url" "os" "path" @@ -26,12 +25,9 @@ type PersistentGroupedQueue struct { metadataDecoder *gob.Decoder index *index.IndexManager stats *QueueStats - hostOrder []string currentHost int mutex sync.RWMutex closed bool - - logger *slog.Logger } type Item struct { diff --git a/internal/pkg/queue/stats.go b/internal/pkg/queue/stats.go index fe204474..3f4035c4 100644 --- a/internal/pkg/queue/stats.go +++ b/internal/pkg/queue/stats.go @@ -8,23 +8,48 @@ import ( "time" ) +type Sample struct { + timestamp time.Time + duration time.Duration +} + +type Window struct { + duration time.Duration + count int + sum time.Duration +} + +type OpAverageDuration struct { + Minute int64 `json:"minute"` + Minute10 int64 `json:"minute_10"` + Minute30 int64 `json:"minute_30"` + Hour int64 `json:"hour"` +} + type QueueStats struct { sync.Mutex `json:"-"` - FirstEnqueueTime time.Time `json:"first_enqueue_time"` - LastEnqueueTime time.Time `json:"last_enqueue_time"` - FirstDequeueTime time.Time `json:"first_dequeue_time"` - LastDequeueTime time.Time `json:"last_dequeue_time"` - ElementsPerHost map[string]int `json:"elements_per_host"` - HostDistribution map[string]float64 `json:"host_distribution"` - TopHosts []HostStat `json:"top_hosts"` - TotalElements int `json:"total_elements"` - UniqueHosts int `json:"unique_hosts"` - EnqueueCount int `json:"enqueue_count"` - DequeueCount int `json:"dequeue_count"` - AverageTimeBetweenEnqueues time.Duration `json:"average_time_between_enqueues"` - AverageTimeBetweenDequeues time.Duration `json:"average_time_between_dequeues"` - AverageElementsPerHost float64 `json:"average_elements_per_host"` + FirstEnqueueTime time.Time `json:"first_enqueue_time"` + LastEnqueueTime time.Time `json:"last_enqueue_time"` + FirstDequeueTime time.Time `json:"first_dequeue_time"` + LastDequeueTime time.Time `json:"last_dequeue_time"` + ElementsPerHost map[string]int `json:"elements_per_host"` + HostDistribution map[string]float64 `json:"host_distribution"` + TopHosts []HostStat `json:"top_hosts"` + TotalElements int `json:"total_elements"` + UniqueHosts int `json:"unique_hosts"` + EnqueueCount int `json:"enqueue_count"` + DequeueCount int `json:"dequeue_count"` + AverageElementsPerHost float64 `json:"average_elements_per_host"` + + // Sample durations used to calculate average operation durations + EnqueueSamples []Sample `json:"-"` + DequeueSamples []Sample `json:"-"` + UpdateEnqueueStatsSamples []Sample `json:"-"` + UpdateDequeueStatsSamples []Sample `json:"-"` + + AverageEnqueueDuration OpAverageDuration `json:"average_enqueue_duration_ms"` + AverageDequeueDuration OpAverageDuration `json:"average_dequeue_duration_ms"` } type HostStat struct { @@ -32,6 +57,13 @@ type HostStat struct { Elements int `json:"elements"` } +type SampleType int + +const ( + EnqueueSample SampleType = iota + DequeueSample +) + func (q *PersistentGroupedQueue) GetStats() *QueueStats { q.genStats() return q.stats @@ -67,18 +99,100 @@ func (q *PersistentGroupedQueue) genStats() { } } - // Calculate additional q.Stats + // Calculate additional stats if q.stats.UniqueHosts > 0 { q.stats.AverageElementsPerHost = float64(q.stats.TotalElements) / float64(q.stats.UniqueHosts) } else { q.stats.AverageElementsPerHost = 0 } - if q.stats.DequeueCount > 0 { - q.stats.AverageTimeBetweenDequeues = time.Since(q.stats.FirstDequeueTime) / time.Duration(q.stats.DequeueCount) + // Calculate average operation durations + q.stats.AverageEnqueueDuration = q.calculateAverageDuration(q.stats.EnqueueSamples) + q.stats.AverageDequeueDuration = q.calculateAverageDuration(q.stats.DequeueSamples) +} + +func (q *PersistentGroupedQueue) calculateAverageDuration(samples []Sample) OpAverageDuration { + opAvgDuration := OpAverageDuration{} + + if len(samples) == 0 { + return opAvgDuration + } + + var totalMinute, totalMinute10, totalMinute30, totalHour time.Duration + var countMinute, countMinute10, countMinute30, countHour int + + now := time.Now() + + for _, sample := range samples { + if sample.timestamp.After(now.Add(-1 * time.Minute)) { + totalMinute += sample.duration + countMinute++ + } + if sample.timestamp.After(now.Add(-10 * time.Minute)) { + totalMinute10 += sample.duration + countMinute10++ + } + if sample.timestamp.After(now.Add(-30 * time.Minute)) { + totalMinute30 += sample.duration + countMinute30++ + } + if sample.timestamp.After(now.Add(-1 * time.Hour)) { + totalHour += sample.duration + countHour++ + } + } + + if countMinute > 0 { + opAvgDuration.Minute = (totalMinute / time.Duration(countMinute)).Milliseconds() + } + if countMinute10 > 0 { + opAvgDuration.Minute10 = (totalMinute10 / time.Duration(countMinute10)).Milliseconds() + } + if countMinute30 > 0 { + opAvgDuration.Minute30 = (totalMinute30 / time.Duration(countMinute30)).Milliseconds() + } + if countHour > 0 { + opAvgDuration.Hour = (totalHour / time.Duration(countHour)).Milliseconds() + } + + return opAvgDuration +} + +func (q *PersistentGroupedQueue) addSample(duration time.Duration, sampleType SampleType) { + newSample := Sample{ + timestamp: time.Now(), + duration: duration, + } + + switch sampleType { + case EnqueueSample: + q.stats.EnqueueSamples = append(q.stats.EnqueueSamples, newSample) + case DequeueSample: + q.stats.DequeueSamples = append(q.stats.DequeueSamples, newSample) + } + + // Remove old samples to prevent unbounded growth + q.cleanupSamples(sampleType) +} + +func (q *PersistentGroupedQueue) cleanupSamples(sampleType SampleType) { + threshold := time.Now().Add(-time.Hour) + + cleanup := func(samples []Sample) []Sample { + newSamples := make([]Sample, 0, len(samples)) + for _, sample := range samples { + if sample.timestamp.After(threshold) { + newSamples = append(newSamples, sample) + } + } + return newSamples } - if q.stats.EnqueueCount > 0 { - q.stats.AverageTimeBetweenEnqueues = time.Since(q.stats.FirstEnqueueTime) / time.Duration(q.stats.EnqueueCount) + + switch sampleType { + case EnqueueSample: + q.stats.EnqueueSamples = cleanup(q.stats.EnqueueSamples) + case DequeueSample: + q.stats.DequeueSamples = cleanup(q.stats.DequeueSamples) } }