-
-
Notifications
You must be signed in to change notification settings - Fork 95
/
fetch.go
525 lines (463 loc) · 14.4 KB
/
fetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
// Copyright 2014 Martin Angers and Contributors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fetchbot
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/temoto/robotstxt"
)
var (
// ErrEmptyHost is returned if a command to be enqueued has an URL with an empty host.
ErrEmptyHost = errors.New("fetchbot: invalid empty host")
// ErrDisallowed is returned when the requested URL is disallowed by the robots.txt
// policy.
ErrDisallowed = errors.New("fetchbot: disallowed by robots.txt")
// ErrQueueClosed is returned when a Send call is made on a closed Queue.
ErrQueueClosed = errors.New("fetchbot: send on a closed queue")
)
// Parse the robots.txt relative path a single time at startup, this can't
// return an error.
var robotsTxtParsedPath, _ = url.Parse("/robots.txt")
const (
// DefaultCrawlDelay represents the delay to use if there is no robots.txt
// specified delay.
DefaultCrawlDelay = 5 * time.Second
// DefaultUserAgent is the default user agent string.
DefaultUserAgent = "Fetchbot (https://github.com/PuerkitoBio/fetchbot)"
// DefaultWorkerIdleTTL is the default time-to-live of an idle host worker goroutine.
// If no URL is sent for a given host within this duration, this host's goroutine
// is disposed of.
DefaultWorkerIdleTTL = 30 * time.Second
)
// Doer defines the method required to use a type as HttpClient.
// The net/*http.Client type satisfies this interface.
type Doer interface {
Do(*http.Request) (*http.Response, error)
}
// A Fetcher defines the parameters for running a web crawler.
type Fetcher struct {
// The Handler to be called for each request. All successfully enqueued requests
// produce a Handler call.
Handler Handler
// DisablePoliteness disables fetching and using the robots.txt policies of
// hosts.
DisablePoliteness bool
// Default delay to use between requests to a same host if there is no robots.txt
// crawl delay or if DisablePoliteness is true.
CrawlDelay time.Duration
// The *http.Client to use for the requests. If nil, defaults to the net/http
// package's default client. Should be HTTPClient to comply with go lint, but
// this is a breaking change, won't fix.
HttpClient Doer
// The user-agent string to use for robots.txt validation and URL fetching.
UserAgent string
// The time a host-dedicated worker goroutine can stay idle, with no Command to enqueue,
// before it is stopped and cleared from memory.
WorkerIdleTTL time.Duration
// AutoClose makes the fetcher close its queue automatically once the number
// of hosts reach 0. A host is removed once it has been idle for WorkerIdleTTL
// duration.
AutoClose bool
// q holds the Queue to send data to the fetcher and optionally close (stop) it.
q *Queue
// dbg is a channel used to push debug information.
dbgmu sync.Mutex
dbg chan *DebugInfo
debugging bool
// hosts maps the host names to its dedicated requests channel, and mu protects
// concurrent access to the hosts field.
mu sync.Mutex
hosts map[string]chan Command
}
// The DebugInfo holds information to introspect the Fetcher's state.
type DebugInfo struct {
NumHosts int
}
// New returns an initialized Fetcher.
func New(h Handler) *Fetcher {
return &Fetcher{
Handler: h,
CrawlDelay: DefaultCrawlDelay,
HttpClient: http.DefaultClient,
UserAgent: DefaultUserAgent,
WorkerIdleTTL: DefaultWorkerIdleTTL,
dbg: make(chan *DebugInfo, 1),
}
}
// Queue offers methods to send Commands to the Fetcher, and to Stop the crawling process.
// It is safe to use from concurrent goroutines.
type Queue struct {
ch chan Command
// signal channels
closed, cancelled, done chan struct{}
wg sync.WaitGroup
}
// Close closes the Queue so that no more Commands can be sent. It blocks until
// the Fetcher drains all pending commands. After the call, the Fetcher is stopped.
// Attempts to enqueue new URLs after Close has been called will always result in
// a ErrQueueClosed error.
func (q *Queue) Close() error {
// Make sure it is not already closed, as this is a run-time panic
select {
case <-q.closed:
// Already closed, no-op
return nil
default:
// Close the signal-channel
close(q.closed)
// Send a nil Command to make sure the processQueue method sees the close signal.
q.ch <- nil
// Wait for the Fetcher to drain.
q.wg.Wait()
// Unblock any callers waiting on q.Block
close(q.done)
return nil
}
}
// Block blocks the current goroutine until the Queue is closed and all pending
// commands are drained.
func (q *Queue) Block() {
<-q.done
}
// Done returns a channel that is closed when the Queue is closed (either
// via Close or Cancel). Multiple calls always return the same channel.
func (q *Queue) Done() <-chan struct{} {
return q.done
}
// Cancel closes the Queue and drains the pending commands without processing
// them, allowing for a fast "stop immediately"-ish operation.
func (q *Queue) Cancel() error {
select {
case <-q.cancelled:
// already cancelled, no-op
return nil
default:
// mark the queue as cancelled
close(q.cancelled)
// Close the Queue, that will wait for pending commands to drain
// will unblock any callers waiting on q.Block
return q.Close()
}
}
// Send enqueues a Command into the Fetcher. If the Queue has been closed, it
// returns ErrQueueClosed. The Command's URL must have a Host.
func (q *Queue) Send(c Command) error {
if c == nil {
return ErrEmptyHost
}
if u := c.URL(); u == nil || u.Host == "" {
return ErrEmptyHost
}
select {
case <-q.closed:
return ErrQueueClosed
default:
q.ch <- c
}
return nil
}
// SendString enqueues a method and some URL strings into the Fetcher. It returns an error
// if the URL string cannot be parsed, or if the Queue has been closed.
// The first return value is the number of URLs successfully enqueued.
func (q *Queue) SendString(method string, rawurl ...string) (int, error) {
return q.sendWithMethod(method, rawurl)
}
// SendStringHead enqueues the URL strings to be fetched with a HEAD method.
// It returns an error if the URL string cannot be parsed, or if the Queue has been closed.
// The first return value is the number of URLs successfully enqueued.
func (q *Queue) SendStringHead(rawurl ...string) (int, error) {
return q.sendWithMethod("HEAD", rawurl)
}
// SendStringGet enqueues the URL strings to be fetched with a GET method.
// It returns an error if the URL string cannot be parsed, or if the Queue has been closed.
// The first return value is the number of URLs successfully enqueued.
func (q *Queue) SendStringGet(rawurl ...string) (int, error) {
return q.sendWithMethod("GET", rawurl)
}
// Parses the URL strings and enqueues them as *Cmd. It returns the number of URLs
// successfully enqueued, and an error if the URL string cannot be parsed or
// the Queue has been closed.
func (q *Queue) sendWithMethod(method string, rawurl []string) (int, error) {
for i, v := range rawurl {
parsed, err := url.Parse(v)
if err != nil {
return i, err
}
if err := q.Send(&Cmd{U: parsed, M: method}); err != nil {
return i, err
}
}
return len(rawurl), nil
}
// Start starts the Fetcher, and returns the Queue to use to send Commands to be fetched.
func (f *Fetcher) Start() *Queue {
f.hosts = make(map[string]chan Command)
f.q = &Queue{
ch: make(chan Command, 1),
closed: make(chan struct{}),
cancelled: make(chan struct{}),
done: make(chan struct{}),
}
// Start the one and only queue processing goroutine.
f.q.wg.Add(1)
go f.processQueue()
return f.q
}
// Debug returns the channel to use to receive the debugging information. It is not intended
// to be used by package users.
func (f *Fetcher) Debug() <-chan *DebugInfo {
f.dbgmu.Lock()
defer f.dbgmu.Unlock()
f.debugging = true
return f.dbg
}
// processQueue runs the queue in its own goroutine.
func (f *Fetcher) processQueue() {
loop:
for v := range f.q.ch {
if v == nil {
// Special case, when the Queue is closed, a nil command is sent, use this
// indicator to check for the closed signal, instead of looking on every loop.
select {
case <-f.q.closed:
// Close signal, exit loop
break loop
default:
// Keep going
}
}
select {
case <-f.q.cancelled:
// queue got cancelled, drain
continue
default:
// go on
}
// Get the URL to enqueue
u := v.URL()
// Check if a channel is already started for this host
f.mu.Lock()
in, ok := f.hosts[u.Host]
if !ok {
// Start a new channel and goroutine for this host.
var rob *url.URL
if !f.DisablePoliteness {
// Must send the robots.txt request.
rob = u.ResolveReference(robotsTxtParsedPath)
}
// Create the infinite queue: the in channel to send on, and the out channel
// to read from in the host's goroutine, and add to the hosts map
var out chan Command
in, out = make(chan Command, 1), make(chan Command, 1)
f.hosts[u.Host] = in
f.mu.Unlock()
f.q.wg.Add(1)
// Start the infinite queue goroutine for this host
go sliceIQ(in, out)
// Start the working goroutine for this host
go f.processChan(out, u.Host)
if !f.DisablePoliteness {
// Enqueue the robots.txt request first.
in <- robotCommand{&Cmd{U: rob, M: "GET"}}
}
} else {
f.mu.Unlock()
}
// Send the request
in <- v
// Send debug info, but do not block if full
f.dbgmu.Lock()
if f.debugging {
f.mu.Lock()
select {
case f.dbg <- &DebugInfo{len(f.hosts)}:
default:
}
f.mu.Unlock()
}
f.dbgmu.Unlock()
}
// Close all host channels now that it is impossible to send on those. Those are the `in`
// channels of the infinite queue. It will then drain any pending events, triggering the
// handlers for each in the worker goro, and then the infinite queue goro will terminate
// and close the `out` channel, which in turn will terminate the worker goro.
f.mu.Lock()
for _, ch := range f.hosts {
close(ch)
}
f.hosts = make(map[string]chan Command)
f.mu.Unlock()
f.q.wg.Done()
}
// Goroutine for a host's worker, processing requests for all its URLs.
func (f *Fetcher) processChan(ch <-chan Command, hostKey string) {
var (
agent *robotstxt.Group
wait <-chan time.Time
ttl <-chan time.Time
delay = f.CrawlDelay
)
loop:
for {
select {
case <-f.q.cancelled:
break loop
case v, ok := <-ch:
if !ok {
// Terminate this goroutine, channel is closed
break loop
}
// Wait for the prescribed delay
if wait != nil {
<-wait
}
// was it cancelled during the wait? check again
select {
case <-f.q.cancelled:
break loop
default:
// go on
}
switch r, ok := v.(robotCommand); {
case ok:
// This is the robots.txt request
agent = f.getRobotAgent(r)
// Initialize the crawl delay
if agent != nil && agent.CrawlDelay > 0 {
delay = agent.CrawlDelay
}
wait = time.After(delay)
case agent == nil || agent.Test(v.URL().Path):
// Path allowed, process the request
res, err := f.doRequest(v)
f.visit(v, res, err)
// No delay on error - the remote host was not reached
if err == nil {
wait = time.After(delay)
} else {
wait = nil
}
default:
// Path disallowed by robots.txt
f.visit(v, nil, ErrDisallowed)
wait = nil
}
// Every time a command is received, reset the ttl channel
ttl = time.After(f.WorkerIdleTTL)
case <-ttl:
// Worker has been idle for WorkerIdleTTL, terminate it
f.mu.Lock()
inch, ok := f.hosts[hostKey]
delete(f.hosts, hostKey)
// Close the queue if AutoClose is set and there are no more hosts.
if f.AutoClose && len(f.hosts) == 0 {
go f.q.Close()
}
f.mu.Unlock()
if ok {
close(inch)
}
break loop
}
}
// need to drain ch until it is closed, to prevent the producer goroutine
// from leaking.
for _ = range ch {
}
f.q.wg.Done()
}
// Get the robots.txt User-Agent-specific group.
func (f *Fetcher) getRobotAgent(r robotCommand) *robotstxt.Group {
res, err := f.doRequest(r)
if err != nil {
// TODO: Ignore robots.txt request error?
fmt.Fprintf(os.Stderr, "fetchbot: error fetching robots.txt: %s\n", err)
return nil
}
if res.Body != nil {
defer res.Body.Close()
}
robData, err := robotstxt.FromResponse(res)
if err != nil {
// TODO : Ignore robots.txt parse error?
fmt.Fprintf(os.Stderr, "fetchbot: error parsing robots.txt: %s\n", err)
return nil
}
return robData.FindGroup(f.UserAgent)
}
// Call the Handler for this Command. Closes the response's body.
func (f *Fetcher) visit(cmd Command, res *http.Response, err error) {
if res != nil && res.Body != nil {
defer res.Body.Close()
}
// if the Command implements Handler, call that handler, otherwise
// dispatch to the Fetcher's Handler.
if h, ok := cmd.(Handler); ok {
h.Handle(&Context{Cmd: cmd, Q: f.q}, res, err)
return
}
f.Handler.Handle(&Context{Cmd: cmd, Q: f.q}, res, err)
}
// Prepare and execute the request for this Command.
func (f *Fetcher) doRequest(cmd Command) (*http.Response, error) {
req, err := http.NewRequest(cmd.Method(), cmd.URL().String(), nil)
if err != nil {
return nil, err
}
// If the Command implements some other recognized interfaces, set
// the request accordingly (see cmd.go for the list of interfaces).
// First, the Header values.
if hd, ok := cmd.(HeaderProvider); ok {
for k, v := range hd.Header() {
req.Header[k] = v
}
}
// BasicAuth has higher priority than an Authorization header set by
// a HeaderProvider.
if ba, ok := cmd.(BasicAuthProvider); ok {
req.SetBasicAuth(ba.BasicAuth())
}
// Cookies are added to the request, even if some cookies were set
// by a HeaderProvider.
if ck, ok := cmd.(CookiesProvider); ok {
for _, c := range ck.Cookies() {
req.AddCookie(c)
}
}
// For the body of the request, ReaderProvider has higher priority
// than ValuesProvider.
if rd, ok := cmd.(ReaderProvider); ok {
rdr := rd.Reader()
rc, ok := rdr.(io.ReadCloser)
if !ok {
rc = ioutil.NopCloser(rdr)
}
req.Body = rc
} else if val, ok := cmd.(ValuesProvider); ok {
v := val.Values()
req.Body = ioutil.NopCloser(strings.NewReader(v.Encode()))
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}
}
// If there was no User-Agent implicitly set by the HeaderProvider,
// set it to the default value.
if req.Header.Get("User-Agent") == "" {
req.Header.Set("User-Agent", f.UserAgent)
}
// Do the request.
res, err := f.HttpClient.Do(req)
if err != nil {
return nil, err
}
return res, nil
}