diff --git a/core/tasks/timeouts/bulk_timeout.go b/core/tasks/timeouts/bulk_timeout.go new file mode 100644 index 000000000..d0cd48603 --- /dev/null +++ b/core/tasks/timeouts/bulk_timeout.go @@ -0,0 +1,53 @@ +package timeouts + +import ( + "context" + "fmt" + "time" + + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/tasks" + "github.com/nyaruka/mailroom/core/tasks/handler" + "github.com/nyaruka/mailroom/core/tasks/handler/ctasks" + "github.com/nyaruka/mailroom/runtime" +) + +// TypeBulkTimeout is the type of the task +const TypeBulkTimeout = "bulk_timeout" + +func init() { + tasks.RegisterType(TypeBulkTimeout, func() tasks.Task { return &BulkTimeoutTask{} }) +} + +// BulkTimeoutTask is the payload of the task +type BulkTimeoutTask struct { + Timeouts []Timeout `json:"timeouts"` +} + +func (t *BulkTimeoutTask) Type() string { + return TypeBulkTimeout +} + +// Timeout is the maximum amount of time the task can run for +func (t *BulkTimeoutTask) Timeout() time.Duration { + return time.Hour +} + +func (t *BulkTimeoutTask) WithAssets() models.Refresh { + return models.RefreshNone +} + +// Perform creates the actual task +func (t *BulkTimeoutTask) Perform(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets) error { + rc := rt.RP.Get() + defer rc.Close() + + for _, timeout := range t.Timeouts { + err := handler.QueueTask(rc, oa.OrgID(), timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn)) + if err != nil { + return fmt.Errorf("error queuing handle task for timeout on session #%d: %w", timeout.SessionID, err) + } + } + + return nil +} diff --git a/core/tasks/timeouts/bulk_timeout_test.go b/core/tasks/timeouts/bulk_timeout_test.go new file mode 100644 index 000000000..0ed5d4f66 --- /dev/null +++ b/core/tasks/timeouts/bulk_timeout_test.go @@ -0,0 +1,36 @@ +package timeouts_test + +import ( + "testing" + "time" + + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/mailroom/core/tasks/timeouts" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/stretchr/testify/assert" +) + +func TestBulkTimeout(t *testing.T) { + _, rt := testsuite.Runtime() + defer testsuite.Reset(testsuite.ResetRedis) + + defer dates.SetNowFunc(time.Now) + dates.SetNowFunc(dates.NewFixedNow(time.Date(2024, 11, 15, 13, 59, 0, 0, time.UTC))) + + testsuite.QueueBatchTask(t, rt, testdata.Org1, &timeouts.BulkTimeoutTask{ + Timeouts: []timeouts.Timeout{ + {SessionID: 123456, ContactID: testdata.Cathy.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 57, 0, 0, time.UTC)}, + {SessionID: 234567, ContactID: testdata.Bob.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 58, 0, 0, time.UTC)}, + }, + }) + + assert.Equal(t, map[string]int{"bulk_timeout": 1}, testsuite.FlushTasks(t, rt, []string{"batch", "throttled"})) + + testsuite.AssertContactTasks(t, testdata.Org1, testdata.Cathy, []string{ + `{"type":"timeout_event","task":{"session_id":123456,"time":"2024-11-15T13:57:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`, + }) + testsuite.AssertContactTasks(t, testdata.Org1, testdata.Bob, []string{ + `{"type":"timeout_event","task":{"session_id":234567,"time":"2024-11-15T13:58:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`, + }) +} diff --git a/core/tasks/timeouts/cron.go b/core/tasks/timeouts/cron.go index 465653cc2..93a5bd1f4 100644 --- a/core/tasks/timeouts/cron.go +++ b/core/tasks/timeouts/cron.go @@ -97,8 +97,8 @@ ORDER BY timeout_on ASC LIMIT 25000` type Timeout struct { - SessionID models.SessionID `db:"session_id"` - OrgID models.OrgID `db:"org_id"` - ContactID models.ContactID `db:"contact_id"` - TimeoutOn time.Time `db:"timeout_on"` + SessionID models.SessionID `db:"session_id" json:"session_id"` + OrgID models.OrgID `db:"org_id" json:"-"` + ContactID models.ContactID `db:"contact_id" json:"contact_id"` + TimeoutOn time.Time `db:"timeout_on" json:"timeout_on"` }