Skip to content

Commit

Permalink
Create absraction for fair queues.. so it's possible to switch out th…
Browse files Browse the repository at this point in the history
…e sorted set implementation
  • Loading branch information
rowanseymour committed Nov 14, 2024
1 parent 1fafb6f commit 632710f
Show file tree
Hide file tree
Showing 26 changed files with 96 additions and 100 deletions.
3 changes: 1 addition & 2 deletions core/hooks/create_broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// CreateBroadcastsHook is our hook for creating broadcasts
Expand All @@ -34,7 +33,7 @@ func (h *createBroadcastsHook) Apply(ctx context.Context, rt *runtime.Runtime, t
return fmt.Errorf("error creating broadcast: %w", err)
}

err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &msgs.SendBroadcastTask{Broadcast: bcast}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &msgs.SendBroadcastTask{Broadcast: bcast}, false)
if err != nil {
return fmt.Errorf("error queuing broadcast task: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/hooks/create_starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// CreateStartsHook is our hook to fire our scene starts
Expand Down Expand Up @@ -74,7 +73,7 @@ func (h *createStartsHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *s
}
}

err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &starts.StartFlowTask{FlowStart: start}, false)
if err != nil {
return fmt.Errorf("error queuing flow start: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/nyaruka/redisx"
)

Expand Down Expand Up @@ -118,7 +117,7 @@ func (c *QueueEventsCron) queueFiresTask(rp *redis.Pool, orgID models.OrgID, tas
rc := rp.Get()
defer rc.Close()

err := tasks.Queue(rc, tasks.BatchQueue, orgID, task, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.BatchQueue, orgID, task, false)
if err != nil {
return fmt.Errorf("error queuing task: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/campaigns/fire_campaign_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/campaigns"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/nyaruka/redisx"
"github.com/nyaruka/redisx/assertredis"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -54,7 +53,7 @@ func TestFireCampaignEvents(t *testing.T) {
CampaignName: campaign.Name,
}

err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand Down
5 changes: 2 additions & 3 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// TypeHandleContactEvent is the task type for flagging that a contact has handler tasks to be handled
Expand Down Expand Up @@ -57,7 +56,7 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim
if len(locks) == 0 {
rc := rt.RP.Get()
defer rc.Close()
err = tasks.Queue(rc, tasks.HandlerQueue, oa.OrgID(), &HandleContactEventTask{ContactID: t.ContactID}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.HandlerQueue, oa.OrgID(), &HandleContactEventTask{ContactID: t.ContactID}, false)
if err != nil {
return fmt.Errorf("error re-adding contact task after failing to get lock: %w", err)
}
Expand Down Expand Up @@ -186,7 +185,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID
// queue this to our ivr starter, it will take care of creating the calls then calling back in
rc := rt.RP.Get()
defer rc.Close()
err = tasks.Queue(rc, tasks.BatchQueue, orgID, task, queues.HighPriority)
err = tasks.Queue(rc, tasks.BatchQueue, orgID, task, true)
if err != nil {
return fmt.Errorf("error queuing ivr flow start: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/handler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// Task is the interface for all contact tasks - tasks which operate on a single contact in real time
Expand Down Expand Up @@ -73,7 +72,7 @@ func queueTask(rc redis.Conn, orgID models.OrgID, contactID models.ContactID, ta
}

// then add a handle task for that contact on our global handler queue to
err = tasks.Queue(rc, tasks.HandlerQueue, orgID, &HandleContactEventTask{ContactID: contactID}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.HandlerQueue, orgID, &HandleContactEventTask{ContactID: contactID}, false)
if err != nil {
return fmt.Errorf("error queuing handle task: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions core/tasks/interrupts/interrupt_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ func TestInterruptChannel(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.TwilioChannel.ID).Returns(0)

// queue and perform a task to interrupt the Twilio channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, queues.DefaultPriority)
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, false)
testsuite.FlushTasks(t, rt)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1)
Expand Down Expand Up @@ -83,7 +82,7 @@ func TestInterruptChannel(t *testing.T) {
})

// queue and perform a task to interrupt the Vonage channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, queues.DefaultPriority)
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, false)
testsuite.FlushTasks(t, rt)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(6)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -35,7 +34,7 @@ func TestRetryCallsCron(t *testing.T) {
err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start})
require.NoError(t, err)

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

service.callError = nil
Expand Down
7 changes: 3 additions & 4 deletions core/tasks/ivr/start_ivr_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/require"
)

Expand All @@ -41,7 +40,7 @@ func TestIVR(t *testing.T) {

service.callError = fmt.Errorf("unable to create call")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand All @@ -53,7 +52,7 @@ func TestIVR(t *testing.T) {
service.callError = nil
service.callID = ivr.CallID("call1")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand All @@ -64,7 +63,7 @@ func TestIVR(t *testing.T) {
service.callError = nil
service.callID = ivr.CallID("call1")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/msgs/send_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/search"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

const (
Expand Down Expand Up @@ -116,7 +115,7 @@ func createBroadcastBatches(ctx context.Context, rt *runtime.Runtime, oa *models
isLast := (i == len(idBatches)-1)

batch := bcast.CreateBatch(idBatch, isFirst, isLast)
err = tasks.Queue(rc, q, bcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, queues.DefaultPriority)
err = tasks.Queue(rc, q, bcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, false)
if err != nil {
if i == 0 {
return fmt.Errorf("error queuing broadcast batch: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions core/tasks/msgs/send_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestBroadcastsFromEvents(t *testing.T) {
groups []*assets.GroupReference
contacts []*flows.ContactReference
urns []urns.URN
queue *queues.FairSorted
queue queues.Fair
expectedBatchCount int
expectedMsgCount int
expectedMsgText string
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestBroadcastsFromEvents(t *testing.T) {
bcast, err := models.NewBroadcastFromEvent(ctx, rt.DB, oa, event)
assert.NoError(t, err)

err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, queues.DefaultPriority)
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSendBroadcastTask(t *testing.T) {
query string
exclusions models.Exclusions
createdByID models.UserID
queue *queues.FairSorted
queue queues.Fair
expectedBatches int
expectedMsgs map[string]int
}{
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestSendBroadcastTask(t *testing.T) {

task := &msgs.SendBroadcastTask{Broadcast: bcast}

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

func init() {
Expand Down Expand Up @@ -135,7 +134,7 @@ func (c *schedulesCron) Run(ctx context.Context, rt *runtime.Runtime) (map[strin

// add our task if we have one
if task != nil {
err = tasks.Queue(rc, tasks.BatchQueue, s.OrgID, task, queues.HighPriority)
err = tasks.Queue(rc, tasks.BatchQueue, s.OrgID, task, true)
if err != nil {
log.Error(fmt.Sprintf("error queueing %s task from schedule", task.Type()), "error", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/starts/start_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

const (
Expand Down Expand Up @@ -136,7 +135,7 @@ func createFlowStartBatches(ctx context.Context, rt *runtime.Runtime, oa *models
batchTask = &StartFlowBatchTask{FlowStartBatch: batch}
}

err = tasks.Queue(rc, q, start.OrgID, batchTask, queues.DefaultPriority)
err = tasks.Queue(rc, q, start.OrgID, batchTask, false)
if err != nil {
if i == 0 {
return fmt.Errorf("error queuing flow start batch: %w", err)
Expand Down
11 changes: 5 additions & 6 deletions core/tasks/starts/start_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -35,7 +34,7 @@ func TestStartFlowBatchTask(t *testing.T) {
batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, false, true, 4)

// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -54,7 +53,7 @@ func TestStartFlowBatchTask(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT status FROM flows_flowstart WHERE id = $1`, start1.ID).Returns("S")

// start the second and final batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -71,7 +70,7 @@ func TestStartFlowBatchTask(t *testing.T) {
start2Batch2 := start2.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, false, true, 4)

// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -81,7 +80,7 @@ func TestStartFlowBatchTask(t *testing.T) {
rt.DB.MustExec(`UPDATE flows_flowstart SET status = 'I' WHERE id = $1`, start2.ID)

// start the second batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -104,7 +103,7 @@ func TestStartFlowBatchTaskNonPersistedStart(t *testing.T) {
batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, true, true, 2)

// start the first batch...
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand Down
6 changes: 3 additions & 3 deletions core/tasks/starts/start_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestStartFlowTask(t *testing.T) {
query string
excludeInAFlow bool
excludeStartedPreviously bool
queue *queues.FairSorted
queue queues.Fair
expectedContactCount int
expectedBatchCount int
expectedTotalCount int
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestStartFlowTask(t *testing.T) {
err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start})
assert.NoError(t, err)

err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestStartFlowTaskNonPersistedStart(t *testing.T) {
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.SingleMessage.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID})

err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand Down
2 changes: 1 addition & 1 deletion core/tasks/starts/throttle_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func init() {
}

type ThrottleQueueCron struct {
Queue *queues.FairSorted
Queue queues.Fair
}

func (c *ThrottleQueueCron) Next(last time.Time) time.Time {
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/starts/throttle_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestThrottleQueue(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 0, "resumed": 0}, res)

queue.Push(rc, "type1", 1, "task1", queues.DefaultPriority)
queue.Push(rc, "type1", 1, "task1", false)

res, err = cron.Run(ctx, rt)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Perform(ctx context.Context, rt *runtime.Runtime, task *queues.Task) error
}

// Queue adds the given task to the given queue
func Queue(rc redis.Conn, q *queues.FairSorted, orgID models.OrgID, task Task, priority queues.Priority) error {
func Queue(rc redis.Conn, q queues.Fair, orgID models.OrgID, task Task, priority bool) error {
return q.Push(rc, task.Type(), int(orgID), task, priority)
}

Expand Down
Loading

0 comments on commit 632710f

Please sign in to comment.