diff --git a/go.mod b/go.mod index 67c81a0..c037f1f 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.0 github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484 github.com/crawlab-team/crawlab-fs v0.6.3-2 - github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96 + github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185 github.com/crawlab-team/go-trace v0.1.1 github.com/crawlab-team/template-parser v0.0.4-0.20221006034646-9bb77a7ae86e diff --git a/go.sum b/go.sum index 00505b0..65aacd7 100644 --- a/go.sum +++ b/go.sum @@ -159,6 +159,8 @@ github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606060845-268ae28818d2 h1:S4F github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606060845-268ae28818d2/go.mod h1:m5H3K6QV7L//0MqBbmt0QvnxD7aEcQPny5VmZi+O+H8= github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96 h1:DXvaWwOYD6tsiN80sS/+yQxcApX7pvaEvmug5ROA0Wo= github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96/go.mod h1:O7/zw0CRzX3DbFw5SAl9DIpbAsKthEvgm7R1ikxcIa0= +github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef h1:FGDfwLfTr2WZSy3cUVYBX4Xz9xrHZzxQrFf3DA93RqY= +github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef/go.mod h1:j5FaFuWfIxHbmFXedFaUZHO2DpLqS3QZ0x7W3APN0fQ= github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185 h1:A/XSUuGgGMn+z+lFd2ye2ClgIKhDZYUerhOL5jePQhU= github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185/go.mod h1:YHMYUEoSqfXUZHsWW/M/DaLh/zOpRtiElaRWcrGyv/I= github.com/crawlab-team/go-trace v0.1.0/go.mod h1:LcWyn68HoT+d29CHM8L41pFHxsAcBMF1xjqJmWdyFh8= @@ -301,6 +303,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= diff --git a/grpc/server/task_server.go b/grpc/server/task_server.go index 8992f1b..7d28461 100644 --- a/grpc/server/task_server.go +++ b/grpc/server/task_server.go @@ -12,6 +12,7 @@ import ( "github.com/crawlab-team/crawlab-core/models/delegate" "github.com/crawlab-team/crawlab-core/models/models" "github.com/crawlab-team/crawlab-core/models/service" + "github.com/crawlab-team/crawlab-core/notification" "github.com/crawlab-team/crawlab-core/utils" "github.com/crawlab-team/crawlab-db/mongo" grpc "github.com/crawlab-team/crawlab-grpc" @@ -109,6 +110,44 @@ func (svr TaskServer) Fetch(ctx context.Context, request *grpc.Request) (respons return HandleSuccessWithData(tid) } +func (svr TaskServer) SendNotification(ctx context.Context, request *grpc.Request) (response *grpc.Response, err error) { + svc := notification.GetService() + var e bson.M + if err := json.Unmarshal(request.Data, &e); err != nil { + return nil, trace.TraceError(err) + } + var t models.Task + if err := json.Unmarshal(request.Data, &t); err != nil { + return nil, trace.TraceError(err) + } + ts, err := svr.modelSvc.GetTaskStatById(t.Id) + if err != nil { + return nil, trace.TraceError(err) + } + settings, _, err := svc.GetSettingList(bson.M{ + "enabled": true, + }, nil, nil) + if err != nil { + return nil, trace.TraceError(err) + } + for _, s := range settings { + switch s.TaskTrigger { + case constants.NotificationTriggerTaskFinish: + _ = svc.Send(&s, e) + case constants.NotificationTriggerTaskError: + if t.Status == constants.TaskStatusError || t.Status == constants.TaskStatusAbnormal { + _ = svc.Send(&s, e) + } + case constants.NotificationTriggerTaskEmptyResults: + if ts.ResultCount == 0 { + _ = svc.Send(&s, e) + } + case constants.NotificationTriggerTaskNever: + } + } + return nil, nil +} + func (svr TaskServer) handleInsertData(msg *grpc.StreamMessage) (err error) { data, err := svr.deserialize(msg) if err != nil { diff --git a/notification/service.go b/notification/service.go index 16151ff..b1dda58 100644 --- a/notification/service.go +++ b/notification/service.go @@ -192,7 +192,17 @@ Please find the task data as below. return nil } -func (svc *Service) sendMail(s *Setting, entity bson.M) (err error) { +func (svc *Service) Send(s *Setting, entity bson.M) (err error) { + switch s.Type { + case TypeMail: + return svc.SendMail(s, entity) + case TypeMobile: + return svc.SendMobile(s, entity) + } + return nil +} + +func (svc *Service) SendMail(s *Setting, entity bson.M) (err error) { // to to, err := parser.Parse(s.Mail.To, entity) if err != nil { @@ -228,7 +238,7 @@ func (svc *Service) sendMail(s *Setting, entity bson.M) (err error) { return nil } -func (svc *Service) sendMobile(s *Setting, entity bson.M) (err error) { +func (svc *Service) SendMobile(s *Setting, entity bson.M) (err error) { // webhook webhook, err := parser.Parse(s.Mobile.Webhook, entity) if err != nil { diff --git a/task/handler/runner.go b/task/handler/runner.go index 510d4c6..0c83449 100644 --- a/task/handler/runner.go +++ b/task/handler/runner.go @@ -115,6 +115,9 @@ func (r *Runner) Run() (err error) { // configure logging r.configureLogging() + // send notification + r.sendNotification() + // start process if err := r.cmd.Start(); err != nil { return r.updateTask(constants.TaskStatusError, err) @@ -483,6 +486,9 @@ func (r *Runner) updateTask(status string, e error) (err error) { } } + // send notification + r.sendNotification() + // update stats go func() { r._updateTaskStat(status) @@ -556,6 +562,23 @@ func (r *Runner) _updateTaskStat(status string) { } } +func (r *Runner) sendNotification() { + data, err := json.Marshal(r.t) + if err != nil { + trace.PrintError(err) + return + } + req := &grpc.Request{ + NodeKey: r.svc.GetNodeConfigService().GetNodeKey(), + Data: data, + } + _, err = r.c.GetTaskClient().SendNotification(context.Background(), req) + if err != nil { + trace.PrintError(err) + return + } +} + func (r *Runner) _updateSpiderStat(status string) { // task stat ts, err := r.svc.GetModelTaskStatService().GetTaskStatById(r.tid)