forked from go-zeromq/zmq4
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmsg.go
143 lines (123 loc) · 2.68 KB
/
msg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2018 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4
import (
"bytes"
"fmt"
"io"
)
type MsgType byte
const (
UsrMsg MsgType = 0
CmdMsg MsgType = 1
)
// Msg is a ZMTP message, possibly composed of multiple frames.
type Msg struct {
Frames [][]byte
Type MsgType
multipart bool
err error
}
func NewMsg(frame []byte) Msg {
return Msg{Frames: [][]byte{frame}}
}
func NewMsgFrom(frames ...[]byte) Msg {
return Msg{Frames: frames}
}
func NewMsgString(frame string) Msg {
return NewMsg([]byte(frame))
}
func NewMsgFromString(frames []string) Msg {
msg := Msg{Frames: make([][]byte, len(frames))}
for i, frame := range frames {
msg.Frames[i] = append(msg.Frames[i], []byte(frame)...)
}
return msg
}
func (msg Msg) isCmd() bool {
return msg.Type == CmdMsg
}
func (msg Msg) Err() error {
return msg.err
}
// Bytes returns the concatenated content of all its frames.
func (msg Msg) Bytes() []byte {
buf := make([]byte, 0, msg.size())
for _, frame := range msg.Frames {
buf = append(buf, frame...)
}
return buf
}
func (msg Msg) size() int {
n := 0
for _, frame := range msg.Frames {
n += len(frame)
}
return n
}
func (msg Msg) String() string {
buf := new(bytes.Buffer)
buf.WriteString("Msg{Frames:{")
for i, frame := range msg.Frames {
if i > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(buf, "%q", frame)
}
buf.WriteString("}}")
return buf.String()
}
func (msg Msg) Clone() Msg {
o := Msg{Frames: make([][]byte, len(msg.Frames))}
for i, frame := range msg.Frames {
o.Frames[i] = make([]byte, len(frame))
copy(o.Frames[i], frame)
}
return o
}
// Cmd is a ZMTP Cmd as per:
//
// https://rfc.zeromq.org/spec:23/ZMTP/#formal-grammar
type Cmd struct {
Name string
Body []byte
}
func (cmd *Cmd) unmarshalZMTP(data []byte) error {
if len(data) == 0 {
return io.ErrUnexpectedEOF
}
n := int(data[0])
if n > len(data)-1 {
return ErrBadCmd
}
cmd.Name = string(data[1 : n+1])
cmd.Body = data[n+1:]
return nil
}
func (cmd *Cmd) marshalZMTP() ([]byte, error) {
n := len(cmd.Name)
if n > 255 {
return nil, ErrBadCmd
}
buf := make([]byte, 0, 1+n+len(cmd.Body))
buf = append(buf, byte(n))
buf = append(buf, []byte(cmd.Name)...)
buf = append(buf, cmd.Body...)
return buf, nil
}
// ZMTP commands as per:
//
// https://rfc.zeromq.org/spec:23/ZMTP/#commands
const (
CmdCancel = "CANCEL"
CmdError = "ERROR"
CmdHello = "HELLO"
CmdInitiate = "INITIATE"
CmdPing = "PING"
CmdPong = "PONG"
CmdReady = "READY"
CmdSubscribe = "SUBSCRIBE"
CmdUnsubscribe = "UNSUBSCRIBE"
CmdWelcome = "WELCOME"
)