-
Notifications
You must be signed in to change notification settings - Fork 20
/
command.go
117 lines (95 loc) · 2.16 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package nsq
import (
"bufio"
"strings"
"github.com/pkg/errors"
)
// The Command interface is implemented by types that represent the different
// commands of the NSQ protocol.
type Command interface {
// Name returns the name of the command.
Name() string
// Write serializes the command to the given buffered output.
Write(*bufio.Writer) error
}
// ReadCommand reads a command from the buffered input r, returning it or an
// error if something went wrong.
//
// if cmd, err := nsq.ReadCommand(r); err != nil {
// // handle the error
// ...
// } else {
// switch c := cmd.(type) {
// case nsq.Pub:
// ...
// }
// }
//
func ReadCommand(r *bufio.Reader) (cmd Command, err error) {
var line string
if line, err = r.ReadString('\n'); err != nil {
err = errors.Wrap(err, "reading command")
return
}
if n := len(line); n == 0 || line[n-1] != '\n' {
err = errors.New("missing newline at the end of a command")
return
} else {
line = line[:n-1]
}
if line == "IDENTIFY" {
return readIdentify(r)
}
if strings.HasPrefix(line, "SUB ") {
return readSub(line[4:])
}
if strings.HasPrefix(line, "PUB ") {
return readPub(line[4:], r)
}
if strings.HasPrefix(line, "MPUB ") {
return readMPub(line[5:], r)
}
if strings.HasPrefix(line, "RDY ") {
return readRdy(line[4:])
}
if strings.HasPrefix(line, "FIN ") {
return readFin(line[4:])
}
if strings.HasPrefix(line, "REQ ") {
return readReq(line[4:])
}
if strings.HasPrefix(line, "TOUCH ") {
return readTouch(line[6:])
}
if line == "AUTH" {
return readAuth(r)
}
if line == "CLS" {
cmd = Cls{}
return
}
if line == "NOP" {
cmd = Nop{}
return
}
err = errors.New("unknown command " + line)
return
}
func readNextWord(text string) (word string, next string) {
if i := strings.IndexByte(text, ' '); i < 0 {
word = text
} else {
word = text[:i]
next = text[i+1:]
}
return
}
func sendCommand(cmdChan chan<- Command, cmd Command) bool {
defer func() { recover() }() // catch panics if the channel was already closed
cmdChan <- cmd
return true
}
func closeCommand(cmdChan chan<- Command) {
defer func() { recover() }() // make the close operation idempotent
close(cmdChan)
}