diff --git a/pkg/blockbuilder/scheduler/jobs.go b/pkg/blockbuilder/scheduler/jobs.go index 56124b7061..a227f0268a 100644 --- a/pkg/blockbuilder/scheduler/jobs.go +++ b/pkg/blockbuilder/scheduler/jobs.go @@ -14,44 +14,92 @@ import ( var ( errNoJobAvailable = errors.New("no job available") errJobNotFound = errors.New("job not found") - errJobNotAssigned = errors.New("job not assigned to worker") + errJobNotAssigned = errors.New("job not assigned to given worker") + errBadEpoch = errors.New("bad epoch") ) type jobQueue struct { - leaseTime time.Duration - logger log.Logger + leaseExpiry time.Duration + logger log.Logger mu sync.Mutex + epoch int64 jobs map[string]*job unassigned jobHeap } -func newJobQueue(leaseTime time.Duration, logger log.Logger) *jobQueue { +func newJobQueue(leaseExpiry time.Duration, logger log.Logger) *jobQueue { return &jobQueue{ - leaseTime: leaseTime, - logger: logger, + leaseExpiry: leaseExpiry, + logger: logger, jobs: make(map[string]*job), } } // assign assigns the highest-priority unassigned job to the given worker. -func (s *jobQueue) assign(workerID string) (string, jobSpec, error) { +func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) { if workerID == "" { - return "", jobSpec{}, errors.New("workerID cannot not be empty") + return jobKey{}, jobSpec{}, errors.New("workerID cannot be empty") } s.mu.Lock() defer s.mu.Unlock() if s.unassigned.Len() == 0 { - return "", jobSpec{}, errNoJobAvailable + return jobKey{}, jobSpec{}, errNoJobAvailable } j := heap.Pop(&s.unassigned).(*job) + j.key.epoch = s.epoch + s.epoch++ j.assignee = workerID - j.leaseExpiry = time.Now().Add(s.leaseTime) - return j.id, j.spec, nil + j.leaseExpiry = time.Now().Add(s.leaseExpiry) + return j.key, j.spec, nil +} + +// importJob imports a job with the given ID and spec into the jobQueue. This is +// meant to be used during recovery, when we're reconstructing the jobQueue from +// worker updates. +func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error { + if key.id == "" { + return errors.New("jobID cannot be empty") + } + if workerID == "" { + return errors.New("workerID cannot be empty") + } + + s.mu.Lock() + defer s.mu.Unlock() + + // When we start assigning new jobs, the epochs need to be compatible with + // these "imported" jobs. + s.epoch = max(s.epoch, key.epoch+1) + + j, ok := s.jobs[key.id] + if ok { + if key.epoch < j.key.epoch { + return errBadEpoch + } else if key.epoch == j.key.epoch { + if j.assignee != workerID { + return errJobNotAssigned + } + } else { + // Otherwise, this caller is the new authority, so we accept the update. + j.assignee = workerID + j.key = key + j.spec = spec + } + } else { + s.jobs[key.id] = &job{ + key: key, + assignee: workerID, + leaseExpiry: time.Now().Add(s.leaseExpiry), + failCount: 0, + spec: spec, + } + } + return nil } // addOrUpdate adds a new job or updates an existing job with the given spec. @@ -60,8 +108,8 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) { defer s.mu.Unlock() if j, ok := s.jobs[id]; ok { + // We can only update an unassigned job. if j.assignee == "" { - // We can only update an unassigned job. j.spec = spec } return @@ -69,9 +117,12 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) { // Otherwise, add a new job. j := &job{ - id: id, + key: jobKey{ + id: id, + epoch: 0, + }, assignee: "", - leaseExpiry: time.Now().Add(s.leaseTime), + leaseExpiry: time.Now().Add(s.leaseExpiry), failCount: 0, spec: spec, } @@ -81,8 +132,8 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) { // renewLease renews the lease of the job with the given ID for the given // worker. -func (s *jobQueue) renewLease(jobID, workerID string) error { - if jobID == "" { +func (s *jobQueue) renewLease(key jobKey, workerID string) error { + if key.id == "" { return errors.New("jobID cannot be empty") } if workerID == "" { @@ -92,22 +143,25 @@ func (s *jobQueue) renewLease(jobID, workerID string) error { s.mu.Lock() defer s.mu.Unlock() - j, ok := s.jobs[jobID] + j, ok := s.jobs[key.id] if !ok { return errJobNotFound } if j.assignee != workerID { return errJobNotAssigned } + if j.key.epoch != key.epoch { + return errBadEpoch + } - j.leaseExpiry = time.Now().Add(s.leaseTime) + j.leaseExpiry = time.Now().Add(s.leaseExpiry) return nil } // completeJob completes the job with the given ID for the given worker, // removing it from the jobQueue. -func (s *jobQueue) completeJob(jobID, workerID string) error { - if jobID == "" { +func (s *jobQueue) completeJob(key jobKey, workerID string) error { + if key.id == "" { return errors.New("jobID cannot be empty") } if workerID == "" { @@ -117,15 +171,18 @@ func (s *jobQueue) completeJob(jobID, workerID string) error { s.mu.Lock() defer s.mu.Unlock() - j, ok := s.jobs[jobID] + j, ok := s.jobs[key.id] if !ok { return errJobNotFound } if j.assignee != workerID { return errJobNotAssigned } + if j.key.epoch != key.epoch { + return errBadEpoch + } - delete(s.jobs, jobID) + delete(s.jobs, key.id) return nil } @@ -147,7 +204,7 @@ func (s *jobQueue) clearExpiredLeases() { } type job struct { - id string + key jobKey assignee string leaseExpiry time.Time @@ -157,6 +214,13 @@ type job struct { spec jobSpec } +type jobKey struct { + id string + // The assignment epoch. This is used to break ties when multiple workers + // have knowledge of the same job. + epoch int64 +} + type jobSpec struct { topic string partition int32 diff --git a/pkg/blockbuilder/scheduler/jobs_test.go b/pkg/blockbuilder/scheduler/jobs_test.go index e9bc7d5540..dae0327c47 100644 --- a/pkg/blockbuilder/scheduler/jobs_test.go +++ b/pkg/blockbuilder/scheduler/jobs_test.go @@ -17,67 +17,71 @@ import ( func TestAssign(t *testing.T) { s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) - j0id, j0spec, err := s.assign("w0") - require.Empty(t, j0id) + j0, j0spec, err := s.assign("w0") + require.Empty(t, j0.id) require.Zero(t, j0spec) require.ErrorIs(t, err, errNoJobAvailable) s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) - j1id, j1spec, err := s.assign("w0") - require.NotEmpty(t, j1id) + j1, j1spec, err := s.assign("w0") + require.NotEmpty(t, j1.id) require.NotZero(t, j1spec) require.NoError(t, err) - require.Equal(t, "w0", s.jobs[j1id].assignee) + require.Equal(t, "w0", s.jobs[j1.id].assignee) - j2id, j2spec, err := s.assign("w0") - require.Zero(t, j2id) + j2, j2spec, err := s.assign("w0") + require.Zero(t, j2.id) require.Zero(t, j2spec) require.ErrorIs(t, err, errNoJobAvailable) s.addOrUpdate("job2", jobSpec{topic: "hello2", commitRecTs: time.Now()}) - j3id, j3spec, err := s.assign("w0") - require.NotZero(t, j3id) + j3, j3spec, err := s.assign("w0") + require.NotZero(t, j3.id) require.NotZero(t, j3spec) require.NoError(t, err) - require.Equal(t, "w0", s.jobs[j3id].assignee) + require.Equal(t, "w0", s.jobs[j3.id].assignee) } func TestAssignComplete(t *testing.T) { s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) { - err := s.completeJob("rando job", "w0") + err := s.completeJob(jobKey{"rando job", 965}, "w0") require.ErrorIs(t, err, errJobNotFound) } s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) - jid, jspec, err := s.assign("w0") - require.NotZero(t, jid) + jk, jspec, err := s.assign("w0") + require.NotZero(t, jk) require.NotZero(t, jspec) require.NoError(t, err) - j, ok := s.jobs[jid] + j, ok := s.jobs[jk.id] require.True(t, ok) require.Equal(t, "w0", j.assignee) { - err := s.completeJob("rando job", "w0") + err := s.completeJob(jobKey{"rando job", 64}, "w0") require.ErrorIs(t, err, errJobNotFound) } { - err := s.completeJob(j.id, "rando worker") + err := s.completeJob(jk, "rando worker") require.ErrorIs(t, err, errJobNotAssigned) } + { + err := s.completeJob(jobKey{jk.id, 9999}, "w0") + require.ErrorIs(t, err, errBadEpoch) + } { - err := s.completeJob(j.id, "w0") + err := s.completeJob(jk, "w0") require.NoError(t, err) - err2 := s.completeJob(j.id, "w0") + err2 := s.completeJob(jk, "w0") require.ErrorIs(t, err2, errJobNotFound) } - j2id, j2spec, err := s.assign("w0") - require.Zero(t, j2id, "should be no job available") + j2k, j2spec, err := s.assign("w0") + require.Zero(t, j2k.id, "should be no job available") require.Zero(t, j2spec, "should be no job available") require.ErrorIs(t, err, errNoJobAvailable) } @@ -85,12 +89,12 @@ func TestAssignComplete(t *testing.T) { func TestLease(t *testing.T) { s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) - jid, jspec, err := s.assign("w0") - require.NotZero(t, jid) + jk, jspec, err := s.assign("w0") + require.NotZero(t, jk.id) require.NotZero(t, jspec) require.NoError(t, err) - j, ok := s.jobs[jid] + j, ok := s.jobs[jk.id] require.True(t, ok) require.Equal(t, "w0", j.assignee) @@ -98,36 +102,58 @@ func TestLease(t *testing.T) { j.leaseExpiry = time.Now().Add(-1 * time.Minute) s.clearExpiredLeases() - j2id, j2spec, err := s.assign("w1") - require.NotZero(t, j2id, "should be able to assign a job whose lease was invalidated") + j2k, j2spec, err := s.assign("w1") + require.NotZero(t, j2k.id, "should be able to assign a job whose lease was invalidated") require.NotZero(t, j2spec, "should be able to assign a job whose lease was invalidated") require.Equal(t, j.spec, j2spec) require.NoError(t, err) - j2, ok := s.jobs[j2id] + j2, ok := s.jobs[j2k.id] require.True(t, ok) require.Equal(t, "w1", j2.assignee) t.Run("renewals", func(t *testing.T) { prevExpiry := j2.leaseExpiry - e1 := s.renewLease(j2.id, "w1") + e1 := s.renewLease(j2k, "w1") require.NoError(t, e1) require.True(t, j2.leaseExpiry.After(prevExpiry)) - e2 := s.renewLease(j2.id, "w0") + e2 := s.renewLease(j2k, "w0") require.ErrorIs(t, e2, errJobNotAssigned) - e3 := s.renewLease("job_404", "w0") + e3 := s.renewLease(jobKey{"job_404", 1}, "w0") require.ErrorIs(t, e3, errJobNotFound) }) } +// TestImportJob tests the importJob method - the method that is called to learn +// about jobs in-flight from a previous scheduler instance. +func TestImportJob(t *testing.T) { + s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) + spec := jobSpec{commitRecTs: time.Now().Add(-1 * time.Hour)} + require.NoError(t, s.importJob(jobKey{"job1", 122}, "w0", spec)) + require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec)) + require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 122}, "w0", spec)) + require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 60}, "w98", spec)) + require.ErrorIs(t, errJobNotAssigned, s.importJob(jobKey{"job1", 123}, "w512", spec)) + require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec)) + + j, ok := s.jobs["job1"] + require.True(t, ok) + require.Equal(t, jobKey{"job1", 123}, j.key) + require.Equal(t, spec, j.spec) + require.Equal(t, "w2", j.assignee) +} + func TestMinHeap(t *testing.T) { n := 517 jobs := make([]*job, n) order := make([]int, n) for i := 0; i < n; i++ { jobs[i] = &job{ - id: fmt.Sprintf("job%d", i), + key: jobKey{ + id: fmt.Sprintf("job%d", i), + epoch: 0, + }, spec: jobSpec{topic: "hello", commitRecTs: time.Unix(int64(i), 0)}, } order[i] = i diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 6625af7e43..466062c5ed 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -6,13 +6,17 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/status" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kadm" + "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/blockbuilder" "github.com/grafana/mimir/pkg/storage/ingest" @@ -27,6 +31,11 @@ type BlockBuilderScheduler struct { logger log.Logger register prometheus.Registerer metrics schedulerMetrics + + mu sync.Mutex + committed kadm.Offsets + observations obsMap + observationComplete bool } func New( @@ -35,11 +44,14 @@ func New( reg prometheus.Registerer, ) (*BlockBuilderScheduler, error) { s := &BlockBuilderScheduler{ - jobs: newJobQueue(cfg.JobLeaseExpiry, logger), + jobs: nil, cfg: cfg, logger: logger, register: reg, metrics: newSchedulerMetrics(reg), + + committed: make(kadm.Offsets), + observations: make(obsMap), } s.Service = services.NewBasicService(s.starting, s.running, s.stopping) return s, nil @@ -65,6 +77,37 @@ func (s *BlockBuilderScheduler) stopping(_ error) error { } func (s *BlockBuilderScheduler) running(ctx context.Context) error { + // The first thing we do when starting up is to complete the startup process where we: + // 1. obtain an initial set of offset info from Kafka + // 2. listen to worker updates for a while to learn the state of the world + // When both of those are complete, we transition from observation mode to normal operation. + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + lag, err := s.fetchLag(ctx) + if err != nil { + panic(err) + } + s.committed = commitOffsetsFromLag(lag) + }() + go func() { + defer wg.Done() + time.Sleep(s.cfg.StartupObserveTime) + }() + + wg.Wait() + + if err := ctx.Err(); err != nil { + return err + } + + s.completeObservationMode() + + // Now that that's done, we can start the main loop. + updateTick := time.NewTicker(s.cfg.SchedulingInterval) defer updateTick.Stop() for { @@ -78,14 +121,31 @@ func (s *BlockBuilderScheduler) running(ctx context.Context) error { } } +func (s *BlockBuilderScheduler) completeObservationMode() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.observationComplete { + return + } + + s.jobs = newJobQueue(s.cfg.JobLeaseExpiry, s.logger) + + if err := s.finalizeObservations(); err != nil { + level.Warn(s.logger).Log("msg", "failed to compute state from observations", "err", err) + // (what to do here?) + } + + s.observations = nil + s.observationComplete = true +} + func (s *BlockBuilderScheduler) updateSchedule(ctx context.Context) { startTime := time.Now() defer func() { s.metrics.updateScheduleDuration.Observe(time.Since(startTime).Seconds()) }() - // TODO: Commit the offsets back to Kafka if dirty. - lag, err := blockbuilder.GetGroupLag(ctx, s.adminClient, s.cfg.Kafka.Topic, s.cfg.ConsumerGroup, 0) if err != nil { level.Warn(s.logger).Log("msg", "failed to get group lag", "err", err) @@ -136,32 +196,159 @@ func (s *BlockBuilderScheduler) updateSchedule(ctx context.Context) { }) } +func (s *BlockBuilderScheduler) fetchLag(ctx context.Context) (kadm.GroupLag, error) { + boff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Second, + MaxRetries: 10, + }) + var lastErr error + for boff.Ongoing() { + groupLag, err := blockbuilder.GetGroupLag(ctx, s.adminClient, s.cfg.Kafka.Topic, s.cfg.ConsumerGroup, 0) + if err != nil { + lastErr = fmt.Errorf("lag: %w", err) + boff.Wait() + continue + } + + return groupLag, nil + } + + return kadm.GroupLag{}, lastErr +} + +// commitOffsetsFromLag computes the committed offset info from the given lag. +func commitOffsetsFromLag(lag kadm.GroupLag) kadm.Offsets { + offsets := make(kadm.Offsets) + for _, ps := range lag { + for _, gl := range ps { + offsets.Add(gl.Commit) + } + } + return offsets +} + // assignJob returns an assigned job for the given workerID. // (This is a temporary method for unit tests until we have RPCs.) -func (s *BlockBuilderScheduler) assignJob(workerID string) (string, jobSpec, error) { +func (s *BlockBuilderScheduler) assignJob(workerID string) (jobKey, jobSpec, error) { + s.mu.Lock() + doneObserving := s.observationComplete + s.mu.Unlock() + + if !doneObserving { + return jobKey{}, jobSpec{}, status.Error(codes.Unavailable, "observation period not complete") + } + return s.jobs.assign(workerID) } // updateJob takes a job update from the client and records it, if necessary. // (This is a temporary method for unit tests until we have RPCs.) -func (s *BlockBuilderScheduler) updateJob(jobID, workerID string, complete bool, _ jobSpec) error { - // TODO: Right here we should ignore the update if the job isn't beyond - // what's in our local snapshot of committed offsets. +func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete bool, j jobSpec) error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.observationComplete { + if err := s.updateObservation(key, workerID, complete, j); err != nil { + return fmt.Errorf("observe update: %w", err) + } + + s.logger.Log("msg", "recovered job", "key", key, "worker", workerID) + return nil + } + + if c, ok := s.committed.Lookup(s.cfg.Kafka.Topic, j.partition); ok { + if j.startOffset <= c.At { + // Update of a completed/committed job. Ignore. + s.logger.Log("msg", "ignored historical job", "key", key, "worker", workerID) + return nil + } + } if complete { - if err := s.jobs.completeJob(jobID, workerID); err != nil { + if err := s.jobs.completeJob(key, workerID); err != nil { // job not found is fine, as clients will be re-informing us. if !errors.Is(err, errJobNotFound) { return fmt.Errorf("complete job: %w", err) } } - // TODO: Move forward our local snapshot of committed offsets. + // TODO: Push forward the local notion of the committed offset. + + s.logger.Log("msg", "completed job", "key", key, "worker", workerID) } else { // It's an in-progress job whose lease we need to renew. - if err := s.jobs.renewLease(jobID, workerID); err != nil { + if err := s.jobs.renewLease(key, workerID); err != nil { return fmt.Errorf("renew lease: %w", err) } + s.logger.Log("msg", "renewed lease", "key", key, "worker", workerID) } return nil } + +func (s *BlockBuilderScheduler) updateObservation(key jobKey, workerID string, complete bool, j jobSpec) error { + rj, ok := s.observations[key.id] + if !ok { + s.observations[key.id] = &observation{ + key: key, + spec: j, + workerID: workerID, + complete: complete, + } + return nil + } + + // Otherwise, we've seen it before. Higher epochs win, and cause earlier ones to fail. + + if key.epoch < rj.key.epoch { + return errBadEpoch + } + + rj.key = key + rj.spec = j + rj.workerID = workerID + rj.complete = complete + return nil +} + +// finalizeObservations considers the observations and offsets from Kafka, rectifying them into +// the starting state of the scheduler's normal operation. +func (s *BlockBuilderScheduler) finalizeObservations() error { + for _, rj := range s.observations { + if rj.complete { + // Completed. + if o, ok := s.committed.Lookup(rj.spec.topic, rj.spec.partition); ok { + if rj.spec.endOffset > o.At { + // Completed jobs can push forward the offsets we've learned from Kafka. + o.At = rj.spec.endOffset + o.Metadata = "{}" // TODO: take the new meta from the completion message. + s.committed[rj.spec.topic][rj.spec.partition] = o + } + } else { + s.committed.Add(kadm.Offset{ + Topic: rj.spec.topic, + Partition: rj.spec.partition, + At: rj.spec.endOffset, + Metadata: "{}", // TODO: take the new meta from the completion message. + }) + } + } else { + // An in-progress job. + // These don't affect offsets (yet), they just get added to the job queue. + if err := s.jobs.importJob(rj.key, rj.workerID, rj.spec); err != nil { + return fmt.Errorf("import job: %w", err) + } + } + } + + return nil +} + +type obsMap map[string]*observation + +type observation struct { + key jobKey + spec jobSpec + workerID string + complete bool +} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 0e2c62807a..c9721192ef 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "math/rand" "strings" "testing" "time" @@ -33,10 +34,8 @@ func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client { return writeClient } -func TestClientInterface(t *testing.T) { - _, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 4, "ingest") - cli := mustKafkaClient(t, kafkaAddr) - +func mustSchedulerWithKafkaAddr(t *testing.T, addr string) (*BlockBuilderScheduler, *kgo.Client) { + cli := mustKafkaClient(t, addr) cfg := Config{ Kafka: ingest.KafkaConfig{ Topic: "ingest", @@ -48,23 +47,100 @@ func TestClientInterface(t *testing.T) { sched, err := New(cfg, test.NewTestingLogger(t), reg) sched.adminClient = kadm.NewClient(cli) require.NoError(t, err) + return sched, cli +} - // Do some things a client might do. +func mustScheduler(t *testing.T) (*BlockBuilderScheduler, *kgo.Client) { + _, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 4, "ingest") + return mustSchedulerWithKafkaAddr(t, kafkaAddr) +} - require.ErrorIs(t, - sched.jobs.completeJob("job1", "w0"), - errJobNotFound, - ) +func TestStartup(t *testing.T) { + sched, _ := mustScheduler(t) + // (a new scheduler starts in observation mode.) + + { + _, _, err := sched.assignJob("w0") + require.ErrorContains(t, err, "observation period not complete") + } now := time.Now() - sched.jobs.addOrUpdate("ingest/64/1000", jobSpec{ - topic: "ingest", - partition: 64, - startOffset: 1000, - endOffset: 2000, - commitRecTs: now.Add(-2 * time.Hour), - }) + // Some jobs that ostensibly exist, but scheduler doesn't know about. + j1 := job{ + key: jobKey{ + id: "ingest/64/1000", + epoch: 10, + }, + spec: jobSpec{ + topic: "ingest", + partition: 64, + startOffset: 1000, + commitRecTs: now.Add(-1 * time.Hour), + }, + } + j2 := job{ + key: jobKey{ + id: "ingest/65/256", + epoch: 11, + }, + spec: jobSpec{ + topic: "ingest", + partition: 65, + startOffset: 256, + commitRecTs: now.Add(-2 * time.Hour), + }, + } + j3 := job{ + key: jobKey{ + id: "ingest/66/57", + epoch: 12, + }, + spec: jobSpec{ + topic: "ingest", + partition: 66, + startOffset: 57, + commitRecTs: now.Add(-3 * time.Hour), + }, + } + + // Clients will be pinging with their updates for some time. + + require.NoError(t, sched.updateJob(j1.key, "w0", false, j1.spec)) + + require.NoError(t, sched.updateJob(j2.key, "w0", true, j2.spec)) + + require.NoError(t, sched.updateJob(j3.key, "w0", false, j3.spec)) + require.NoError(t, sched.updateJob(j3.key, "w0", false, j3.spec)) + require.NoError(t, sched.updateJob(j3.key, "w0", false, j3.spec)) + require.NoError(t, sched.updateJob(j3.key, "w0", true, j3.spec)) + + // Convert the observations to actual jobs. + sched.completeObservationMode() + + // Now that we're out of observation mode, we should know about all the jobs. + + require.NoError(t, sched.updateJob(j1.key, "w0", false, j1.spec)) + require.NoError(t, sched.updateJob(j1.key, "w0", false, j1.spec)) + + require.NoError(t, sched.updateJob(j2.key, "w0", true, j2.spec)) + + require.NoError(t, sched.updateJob(j3.key, "w0", true, j3.spec)) + + _, ok := sched.jobs.jobs[j1.key.id] + require.True(t, ok) + + // And eventually they'll all complete. + require.NoError(t, sched.updateJob(j1.key, "w0", true, j1.spec)) + require.NoError(t, sched.updateJob(j2.key, "w0", true, j2.spec)) + require.NoError(t, sched.updateJob(j3.key, "w0", true, j3.spec)) + + { + _, _, err := sched.assignJob("w0") + require.ErrorIs(t, err, errNoJobAvailable) + } + + // And we can resume normal operation: sched.jobs.addOrUpdate("ingest/65/256", jobSpec{ topic: "ingest", partition: 65, @@ -73,58 +149,157 @@ func TestClientInterface(t *testing.T) { commitRecTs: now.Add(-1 * time.Hour), }) - jobID, jobSpec, err := sched.assignJob("w0") + a1key, a1spec, err := sched.assignJob("w0") require.NoError(t, err) - require.NotZero(t, jobSpec) - require.Equal(t, "ingest/64/1000", jobID) - - // Heartbeat a bunch of times. - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - - // Complete a bunch of times. - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - - // Take the next job. - jobID, jobSpec, err = sched.assignJob("w0") - require.NoError(t, err) - require.NotZero(t, jobSpec) - require.Equal(t, "ingest/65/256", jobID) - - // Heartbeat a bunch of times. - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", false, jobSpec)) - - // Complete a bunch of times. - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - require.NoError(t, sched.updateJob(jobID, "w0", true, jobSpec)) - - // And repeat completion with the first job. Like clients will do. - require.NoError(t, sched.updateJob("ingest/64/1000", "w0", true, jobSpec)) - require.NoError(t, sched.updateJob("ingest/64/1000", "w0", true, jobSpec)) - require.NoError(t, sched.updateJob("ingest/64/1000", "w0", true, jobSpec)) - require.NoError(t, sched.updateJob("ingest/64/1000", "w0", true, jobSpec)) - require.NoError(t, sched.updateJob("ingest/64/1000", "w0", true, jobSpec)) + require.NotZero(t, a1spec) + require.Equal(t, "ingest/65/256", a1key.id) +} + +func TestObservations(t *testing.T) { + sched, _ := mustScheduler(t) + // Initially we're in observation mode. We have Kafka's start offsets, but no client jobs. + + sched.committed = kadm.Offsets{ + "ingest": { + 1: kadm.Offset{ + Topic: "ingest", + Partition: 1, + At: 5000, + }, + 2: kadm.Offset{ + Topic: "ingest", + Partition: 2, + At: 800, + }, + 3: kadm.Offset{ + Topic: "ingest", + Partition: 3, + At: 974, + }, + 4: kadm.Offset{ + Topic: "ingest", + Partition: 4, + At: 500, + }, + 5: kadm.Offset{ + Topic: "ingest", + Partition: 5, + At: 12000, + }, + }, + } { - jobID, jobSpec, err := sched.assignJob("w0") - require.ErrorIs(t, err, errNoJobAvailable) - require.Zero(t, jobSpec) - require.Zero(t, jobID) + nq := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) + sched.jobs = nq + err := sched.finalizeObservations() + require.NoError(t, err) + require.Len(t, nq.jobs, 0, "No observations, no jobs") + } + + type observation struct { + key jobKey + spec jobSpec + workerID string + complete bool + expectErr error + } + var clientData []observation + const ( + complete = true + inProgress = false + ) + maybeBadEpoch := errors.New("maybe bad epoch") + mkJob := func(isComplete bool, worker string, partition int32, id string, epoch int64, commitRecTs time.Time, endOffset int64, expectErr error) { + clientData = append(clientData, observation{ + key: jobKey{id: id, epoch: epoch}, + spec: jobSpec{ + topic: "ingest", + partition: partition, + commitRecTs: commitRecTs, + endOffset: endOffset, + }, + workerID: worker, + complete: isComplete, + expectErr: expectErr, + }) + } + + // Rig up a bunch of data that clients are collectively sending. + + // Partition 1: one job in progress. + mkJob(inProgress, "w0", 1, "ingest/1/5524", 10, time.Unix(200, 0), 6000, nil) + + // Partition 2: Many complete jobs, followed by an in-progress job. + mkJob(complete, "w0", 2, "ingest/2/1", 3, time.Unix(1, 0), 15, nil) + mkJob(complete, "w0", 2, "ingest/2/16", 4, time.Unix(2, 0), 31, nil) + mkJob(complete, "w0", 2, "ingest/2/32", 4, time.Unix(3, 0), 45, nil) + mkJob(complete, "w0", 2, "ingest/2/1000", 11, time.Unix(500, 0), 2000, nil) + mkJob(inProgress, "w0", 2, "ingest/2/2001", 12, time.Unix(600, 0), 2199, nil) + + // (Partition 3 has no updates.) + + // Partition 4 has a series of completed jobs that are entirely after what was found in Kafka. + mkJob(complete, "w0", 4, "ingest/4/500", 15, time.Unix(500, 0), 599, nil) + mkJob(complete, "w1", 4, "ingest/4/600", 16, time.Unix(600, 0), 699, nil) + mkJob(complete, "w2", 4, "ingest/4/700", 17, time.Unix(700, 0), 799, nil) + mkJob(complete, "w3", 4, "ingest/4/800", 18, time.Unix(800, 0), 899, nil) + // Here's a conflicting completion report from a worker whose lease was revoked at one point. It should be effectively dropped. + mkJob(complete, "w99", 4, "ingest/4/600", 6, time.Unix(600, 0), 699, maybeBadEpoch) + + // Partition 5 has a number of conflicting in-progress reports. + mkJob(inProgress, "w100", 5, "ingest/5/12000", 30, time.Unix(200, 0), 6000, maybeBadEpoch) + mkJob(inProgress, "w101", 5, "ingest/5/12000", 31, time.Unix(200, 0), 6000, maybeBadEpoch) + mkJob(inProgress, "w102", 5, "ingest/5/12000", 32, time.Unix(200, 0), 6000, maybeBadEpoch) + mkJob(inProgress, "w103", 5, "ingest/5/12000", 33, time.Unix(200, 0), 6000, maybeBadEpoch) + mkJob(inProgress, "w104", 5, "ingest/5/12000", 34, time.Unix(200, 0), 6000, nil) + + // Partition 6 has a complete job, but wasn't among the offsets we learned from Kafka. + mkJob(complete, "w0", 6, "ingest/6/500", 48, time.Unix(500, 0), 599, nil) + // Partition 7 has an in-progress job, but wasn't among the offsets we learned from Kafka. + mkJob(complete, "w1", 7, "ingest/7/92874", 52, time.Unix(1500, 0), 93874, nil) + + rnd := rand.New(rand.NewSource(64_000)) + + sendUpdates := func() { + for range 3 { + // Simulate the arbitrary order of client updates. + rnd.Shuffle(len(clientData), func(i, j int) { clientData[i], clientData[j] = clientData[j], clientData[i] }) + for _, c := range clientData { + t.Log("sending update", c.key, c.workerID) + err := sched.updateJob(c.key, c.workerID, c.complete, c.spec) + if errors.Is(c.expectErr, maybeBadEpoch) { + require.True(t, errors.Is(err, errBadEpoch) || err == nil, "expected either bad epoch or no error, got %v", err) + } else { + require.NoError(t, err) + } + } + } } + + sendUpdates() + + sched.completeObservationMode() + requireOffset(t, sched.committed, "ingest", 1, 5000, "ingest/1 is in progress, so we should not move the offset") + requireOffset(t, sched.committed, "ingest", 2, 2000, "ingest/2 job was complete, so it should move the offset forward") + requireOffset(t, sched.committed, "ingest", 3, 974, "ingest/3 should be unchanged - no updates") + requireOffset(t, sched.committed, "ingest", 4, 899, "ingest/4 should be moved forward to account for the completed jobs") + requireOffset(t, sched.committed, "ingest", 5, 12000, "ingest/5 has nothing new completed") + requireOffset(t, sched.committed, "ingest", 6, 599, "ingest/6 should have been added to the offsets") + + require.Len(t, sched.jobs.jobs, 3) + require.Equal(t, 35, int(sched.jobs.epoch)) + + // Now verify that the same set of updates can be sent now that we're out of observation mode. + + sendUpdates() +} + +func requireOffset(t *testing.T, offs kadm.Offsets, topic string, partition int32, expected int64, msgAndArgs ...interface{}) { + t.Helper() + o, ok := offs.Lookup(topic, partition) + require.True(t, ok, msgAndArgs...) + require.Equal(t, expected, o.At, msgAndArgs...) } func TestMonitor(t *testing.T) { @@ -132,19 +307,10 @@ func TestMonitor(t *testing.T) { t.Cleanup(func() { cancel(errors.New("test done")) }) _, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 4, "ingest") - cli := mustKafkaClient(t, kafkaAddr) + sched, cli := mustSchedulerWithKafkaAddr(t, kafkaAddr) + reg := sched.register.(*prometheus.Registry) - cfg := Config{ - Kafka: ingest.KafkaConfig{ - Topic: "ingest", - }, - ConsumerGroup: "test-builder", - SchedulingInterval: 1000000 * time.Hour, - } - reg := prometheus.NewPedanticRegistry() - sched, err := New(cfg, test.NewTestingLogger(t), reg) - sched.adminClient = kadm.NewClient(cli) - require.NoError(t, err) + sched.completeObservationMode() // Partition i gets i records. for i := int32(0); i < 4; i++ {