Skip to content

Commit

Permalink
feat: updated notification
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jun 9, 2024
1 parent 886c1ad commit d55156c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484
github.com/crawlab-team/crawlab-fs v0.6.3-2
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef
github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185
github.com/crawlab-team/go-trace v0.1.1
github.com/crawlab-team/template-parser v0.0.4-0.20221006034646-9bb77a7ae86e
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606060845-268ae28818d2 h1:S4F
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606060845-268ae28818d2/go.mod h1:m5H3K6QV7L//0MqBbmt0QvnxD7aEcQPny5VmZi+O+H8=
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96 h1:DXvaWwOYD6tsiN80sS/+yQxcApX7pvaEvmug5ROA0Wo=
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240606065258-08b640f15e96/go.mod h1:O7/zw0CRzX3DbFw5SAl9DIpbAsKthEvgm7R1ikxcIa0=
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef h1:FGDfwLfTr2WZSy3cUVYBX4Xz9xrHZzxQrFf3DA93RqY=
github.com/crawlab-team/crawlab-grpc v0.6.4-0.20240609070140-04a938cd57ef/go.mod h1:j5FaFuWfIxHbmFXedFaUZHO2DpLqS3QZ0x7W3APN0fQ=
github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185 h1:A/XSUuGgGMn+z+lFd2ye2ClgIKhDZYUerhOL5jePQhU=
github.com/crawlab-team/crawlab-vcs v0.6.2-0.20230629045457-afe0be0e2185/go.mod h1:YHMYUEoSqfXUZHsWW/M/DaLh/zOpRtiElaRWcrGyv/I=
github.com/crawlab-team/go-trace v0.1.0/go.mod h1:LcWyn68HoT+d29CHM8L41pFHxsAcBMF1xjqJmWdyFh8=
Expand Down Expand Up @@ -301,6 +303,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
Expand Down
39 changes: 39 additions & 0 deletions grpc/server/task_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/crawlab-team/crawlab-core/models/delegate"
"github.com/crawlab-team/crawlab-core/models/models"
"github.com/crawlab-team/crawlab-core/models/service"
"github.com/crawlab-team/crawlab-core/notification"
"github.com/crawlab-team/crawlab-core/utils"
"github.com/crawlab-team/crawlab-db/mongo"
grpc "github.com/crawlab-team/crawlab-grpc"
Expand Down Expand Up @@ -109,6 +110,44 @@ func (svr TaskServer) Fetch(ctx context.Context, request *grpc.Request) (respons
return HandleSuccessWithData(tid)
}

func (svr TaskServer) SendNotification(ctx context.Context, request *grpc.Request) (response *grpc.Response, err error) {
svc := notification.GetService()
var e bson.M
if err := json.Unmarshal(request.Data, &e); err != nil {
return nil, trace.TraceError(err)
}
var t models.Task
if err := json.Unmarshal(request.Data, &t); err != nil {
return nil, trace.TraceError(err)
}
ts, err := svr.modelSvc.GetTaskStatById(t.Id)
if err != nil {
return nil, trace.TraceError(err)
}
settings, _, err := svc.GetSettingList(bson.M{
"enabled": true,
}, nil, nil)
if err != nil {
return nil, trace.TraceError(err)
}
for _, s := range settings {
switch s.TaskTrigger {
case constants.NotificationTriggerTaskFinish:
_ = svc.Send(&s, e)
case constants.NotificationTriggerTaskError:
if t.Status == constants.TaskStatusError || t.Status == constants.TaskStatusAbnormal {
_ = svc.Send(&s, e)
}
case constants.NotificationTriggerTaskEmptyResults:
if ts.ResultCount == 0 {
_ = svc.Send(&s, e)
}
case constants.NotificationTriggerTaskNever:
}
}
return nil, nil
}

func (svr TaskServer) handleInsertData(msg *grpc.StreamMessage) (err error) {
data, err := svr.deserialize(msg)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions notification/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,17 @@ Please find the task data as below.
return nil
}

func (svc *Service) sendMail(s *Setting, entity bson.M) (err error) {
func (svc *Service) Send(s *Setting, entity bson.M) (err error) {
switch s.Type {
case TypeMail:
return svc.SendMail(s, entity)
case TypeMobile:
return svc.SendMobile(s, entity)
}
return nil
}

func (svc *Service) SendMail(s *Setting, entity bson.M) (err error) {
// to
to, err := parser.Parse(s.Mail.To, entity)
if err != nil {
Expand Down Expand Up @@ -228,7 +238,7 @@ func (svc *Service) sendMail(s *Setting, entity bson.M) (err error) {
return nil
}

func (svc *Service) sendMobile(s *Setting, entity bson.M) (err error) {
func (svc *Service) SendMobile(s *Setting, entity bson.M) (err error) {
// webhook
webhook, err := parser.Parse(s.Mobile.Webhook, entity)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func (r *Runner) Run() (err error) {
// configure logging
r.configureLogging()

// send notification
r.sendNotification()

// start process
if err := r.cmd.Start(); err != nil {
return r.updateTask(constants.TaskStatusError, err)
Expand Down Expand Up @@ -483,6 +486,9 @@ func (r *Runner) updateTask(status string, e error) (err error) {
}
}

// send notification
r.sendNotification()

// update stats
go func() {
r._updateTaskStat(status)
Expand Down Expand Up @@ -556,6 +562,23 @@ func (r *Runner) _updateTaskStat(status string) {
}
}

func (r *Runner) sendNotification() {
data, err := json.Marshal(r.t)
if err != nil {
trace.PrintError(err)
return
}
req := &grpc.Request{
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
Data: data,
}
_, err = r.c.GetTaskClient().SendNotification(context.Background(), req)
if err != nil {
trace.PrintError(err)
return
}
}

func (r *Runner) _updateSpiderStat(status string) {
// task stat
ts, err := r.svc.GetModelTaskStatService().GetTaskStatById(r.tid)
Expand Down

0 comments on commit d55156c

Please sign in to comment.