-
Notifications
You must be signed in to change notification settings - Fork 4
/
worker.go
196 lines (180 loc) · 4.53 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package imapmq
import (
"bytes"
"errors"
"fmt"
"io"
"net/mail"
"strconv"
"time"
"github.com/mxk/go-imap/imap"
)
// ErrParseMail is returned when the mail returned by IMAP server couldn't be
// parsed correctly.
var ErrParseMail = errors.New("mail parse error")
// The job interface defines the `exec` method which is implemented by all jobs.
type job interface {
exec(*imap.Client)
}
// jobResult is used by the dequeueJob to report result asynchronously.
type jobResult struct {
msg *Message
err error
}
// The dequeueJob represents a dequeue intent. The dequeue is done on a queue,
// and the result is passed to the channel.
type dequeueJob struct {
q *Queue
c chan *jobResult
}
// Dequeuing use the Conditional Store IMAP extension (RFC4551) the prevent
// race conditions when multiple clients dequeue concurrently.
func (j *dequeueJob) exec(c *imap.Client) {
err := j.q.switchTo(c, true)
if err != nil {
j.c <- &jobResult{nil, err}
return
}
var msg *Message
for {
msg, err = dequeue(c, j.q)
if err != nil && err != io.EOF {
continue
}
break
}
j.c <- &jobResult{msg, err}
}
// Dequeues from a queue. It fetches the oldest email (sequence number 1).
// Returns io.EOF when no message could be found.
func dequeue(c *imap.Client, q *Queue) (*Message, error) {
mail, info, err := fetchMail(c, 1)
if err != nil {
return nil, err
}
cmd, err := flagDelete(c, info)
if err != nil {
return nil, err
}
rsp, err := cmd.Result(imap.OK)
if err != nil {
return nil, err
}
if rsp.Label == "MODIFIED" {
return nil, fmt.Errorf("Race condition")
}
return (*Message)(mail), nil
}
// Marks a message for deletion.
// It uses CONDSTORE's MODSEQ to prevent race conditions.
func flagDelete(c *imap.Client, info *imap.MessageInfo) (*imap.Command, error) {
mseq := (info.Attrs["MODSEQ"]).([]imap.Field)[0]
suid, _ := imap.NewSeqSet(strconv.Itoa(int(info.UID)))
q := fmt.Sprintf("(UNCHANGEDSINCE %d) FLAGS", mseq)
return c.UIDStore(suid, q, imap.NewFlagSet("\\Deleted"))
}
// The publishJob represents the intent of publishing a message to a topic in a
// queue. `Literal` holds the complete mail (subject and body.)
type publishJob struct {
q *Queue
literal imap.Literal
}
func (j *publishJob) exec(c *imap.Client) {
err := j.q.switchTo(c, false)
if err != nil {
j.q.mq.handleErr(err)
return
}
_, err = imap.Wait(c.Append(j.q.name, nil, nil, j.literal))
if err != nil {
j.q.mq.handleErr(err)
return
}
}
// The notifyJob represents the intent of notifying subscribers of a new message.
type notifyJob struct {
q *Queue
msgID uint32
}
// Fetches the new message and notifies subscribers that subscribed to the
// message's subject. Any subscriber that subscribed to "*" will receieve all
// messages from the queue.
func (j *notifyJob) exec(c *imap.Client) {
err := j.q.switchTo(c, true)
if err != nil {
j.q.mq.handleErr(err)
return
}
mail, _, err := fetchMail(c, int(j.msgID))
if err != nil {
j.q.mq.handleErr(err)
return
}
select {
case j.q.subs["*"] <- (*Message)(mail):
default:
}
t := mail.Header.Get("Subject")
select {
case j.q.subs[t] <- (*Message)(mail):
default:
}
}
// Fetches a mail requesting the correct IMAP headers, and returns a parsed
// `mail.Message` instance along with metadata or io.EOF when no mail found.
func fetchMail(c *imap.Client, seq int) (*mail.Message, *imap.MessageInfo, error) {
s, _ := imap.NewSeqSet(strconv.Itoa(seq))
cmd, err := imap.Wait(c.Fetch(s, "RFC822 UID MODSEQ"))
if err != nil {
return nil, nil, err
}
if len(cmd.Data) == 0 {
return nil, nil, io.EOF
}
d := cmd.Data[0]
m, err := getMail(d)
if err != nil {
return nil, nil, err
}
return m, d.MessageInfo(), nil
}
// getMail builds a `mail.Message` from the response.
func getMail(rsp *imap.Response) (*mail.Message, error) {
if rsp == nil {
return nil, ErrParseMail
}
msgInfo := rsp.MessageInfo()
if msgInfo == nil {
return nil, ErrParseMail
}
msgField := msgInfo.Attrs["RFC822"]
if msgField == nil {
return nil, ErrParseMail
}
mailBytes := imap.AsBytes(msgField)
return mail.ReadMessage(bytes.NewReader(mailBytes))
}
// A worker is associated to a IMAPMQ instance. It processes incoming jobs
// from all the different queues synchronously.
func worker(cfg Config, done <-chan interface{}) (chan<- job, error) {
c, err := newIMAPClient(cfg)
if err != nil {
return nil, err
}
jobs := make(chan job)
go func() {
defer func() {
close(jobs)
c.Logout(30 * time.Second)
}()
for {
select {
case j := <-jobs:
j.exec(c)
case <-done:
return
}
}
}()
return jobs, nil
}