Skip to content

Commit

Permalink
Apply FMT
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed May 11, 2016
1 parent 47bb312 commit 768c5f9
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 59 deletions.
2 changes: 1 addition & 1 deletion input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (i *RAWInput) listen(address string) {
}

// Receiving TCPMessage object
m := <- ch
m := <-ch

i.data <- m
}
Expand Down
18 changes: 8 additions & 10 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ func TestRAWInput(t *testing.T) {
t.Fatal(err)
}
origin := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
go origin.Serve(listener)
defer listener.Close()

originAddr := listener.Addr().String()


var respCounter, reqCounter int64

input := NewRAWInput(originAddr, EnginePcap, testRawExpire)
Expand All @@ -63,7 +62,7 @@ func TestRAWInput(t *testing.T) {
Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}

client := NewHTTPClient("http://" + listener.Addr().String(), &HTTPClientConfig{})
client := NewHTTPClient("http://"+listener.Addr().String(), &HTTPClientConfig{})

go Start(quit)

Expand All @@ -87,16 +86,15 @@ func TestRAWInputIPv6(t *testing.T) {
t.Fatal(err)
}
origin := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
go origin.Serve(listener)
defer listener.Close()

originAddr := listener.Addr().String()


var respCounter, reqCounter int64

input := NewRAWInput(originAddr, EnginePcap, testRawExpire)
Expand All @@ -119,7 +117,7 @@ func TestRAWInputIPv6(t *testing.T) {
Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}

client := NewHTTPClient("http://" + listener.Addr().String(), &HTTPClientConfig{})
client := NewHTTPClient("http://"+listener.Addr().String(), &HTTPClientConfig{})

go Start(quit)

Expand Down
28 changes: 14 additions & 14 deletions raw_socket_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type Listener struct {

messageExpire time.Duration

conn net.PacketConn
quit chan bool
conn net.PacketConn
quit chan bool
readyCh chan bool
}

Expand Down Expand Up @@ -130,10 +130,10 @@ func (t *Listener) listen() {
t.conn.Close()
}
return
case data := <- t.packetsChan:
case data := <-t.packetsChan:
packet := ParseTCPPacket(data[:16], data[16:])
t.processTCPPacket(packet)
case <- gcTicker:
case <-gcTicker:
now := time.Now()

// Dispatch requests before responses
Expand Down Expand Up @@ -175,13 +175,13 @@ func (t *Listener) dispatchMessage(message *TCPMessage) {
if respID, ok := t.respWithoutReq[message.ResponseAck]; ok {
if resp, rok := t.messages[respID]; rok {
// if resp.AssocMessage == nil {
// log.Println("FOUND RESPONSE")
resp.AssocMessage = message
message.AssocMessage = resp
// log.Println("FOUND RESPONSE")
resp.AssocMessage = message
message.AssocMessage = resp

if resp.IsFinished() {
defer t.dispatchMessage(resp)
}
if resp.IsFinished() {
defer t.dispatchMessage(resp)
}
// }
}
}
Expand Down Expand Up @@ -333,7 +333,7 @@ func (t *Listener) readPcap() {
// We need only packets with data inside
// Check that the buffer is larger than the size of the TCP header
if len(data) > int(dataOffset*4) {
newBuf := make([]byte, len(data) + 16)
newBuf := make([]byte, len(data)+16)
copy(newBuf[:16], srcIP)
copy(newBuf[16:], data)

Expand Down Expand Up @@ -375,7 +375,7 @@ func (t *Listener) readRAWSocket() {

if n > 0 {
if t.isValidPacket(buf[:n]) {
newBuf := make([]byte, n + 4)
newBuf := make([]byte, n+4)
copy(newBuf[16:], buf[:n])
copy(newBuf[:16], []byte(addr.(*net.IPAddr).IP))

Expand Down Expand Up @@ -550,9 +550,9 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) {

func (t *Listener) IsReady() bool {
select {
case <- t.readyCh:
case <-t.readyCh:
return true
case <- time.After(5 * time.Second):
case <-time.After(5 * time.Second):
return false
}
}
Expand Down
37 changes: 18 additions & 19 deletions raw_socket_listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package rawSocket
import (
"bytes"
"log"
"testing"
"time"
"math/rand"
"sync/atomic"
"testing"
"time"
)

func TestRawListenerInput(t *testing.T) {
Expand Down Expand Up @@ -262,7 +262,7 @@ func testChunkedSequence(t *testing.T, listener *Listener, packets ...*TCPPacket
}

if len(listener.messagesChan) != 0 {
t.Fatal("messagesChan non empty:", <- listener.messagesChan)
t.Fatal("messagesChan non empty:", <-listener.messagesChan)
}

if len(listener.messages) != 0 {
Expand Down Expand Up @@ -331,14 +331,13 @@ func TestRawListenerChunkedWrongOrder(t *testing.T) {
}
}


func chunkedPostMessage() []*TCPPacket {
ack := uint32(rand.Int63())
seq := uint32(rand.Int63())

reqPacket1 := buildPacket(true, ack, seq, []byte("POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"))
// Packet with data have different Seq
reqPacket2 := buildPacket(true, ack, seq + 47, []byte("1\r\na\r\n"))
reqPacket2 := buildPacket(true, ack, seq+47, []byte("1\r\na\r\n"))
reqPacket3 := buildPacket(true, ack, reqPacket2.Seq+5, []byte("1\r\nb\r\n"))
reqPacket4 := buildPacket(true, ack, reqPacket3.Seq+5, []byte("0\r\n\r\n"))

Expand All @@ -359,13 +358,13 @@ func postMessage() []*TCPPacket {
rand.Read(data)

head := []byte("POST / HTTP/1.1\r\nContent-Length: 9958\r\n\r\n")
for i, _ := range head {
for i := range head {
data[i] = head[i]
}

return []*TCPPacket{
buildPacket(true, ack, seq, data),
buildPacket(false, seq + uint32(len(data)), seq2, []byte("HTTP/1.1 200 OK\r\n")),
buildPacket(false, seq+uint32(len(data)), seq2, []byte("HTTP/1.1 200 OK\r\n")),
}
}

Expand All @@ -376,7 +375,7 @@ func getMessage() []*TCPPacket {

return []*TCPPacket{
buildPacket(true, ack, seq, []byte("GET / HTTP/1.1\r\n\r\n")),
buildPacket(false, seq + 18, seq2, []byte("HTTP/1.1 200 OK\r\n")),
buildPacket(false, seq+18, seq2, []byte("HTTP/1.1 200 OK\r\n")),
}
}

Expand All @@ -387,22 +386,22 @@ func TestRawListenerBench(t *testing.T) {

// Should re-construct message from all possible combinations
for i := 0; i < 1000; i++ {
go func(){
go func() {
for j := 0; j < 100; j++ {
var packets []*TCPPacket

if j % 5 == 0 {
if j%5 == 0 {
packets = chunkedPostMessage()
} else if j % 3 == 0 {
packets = postMessage()
} else if j%3 == 0 {
packets = postMessage()
} else {
packets = getMessage()
}

for _, p := range packets {
// Randomly drop packets
if (i + j) % 5 == 0 {
if rand.Int63() % 3 == 0 {
if (i+j)%5 == 0 {
if rand.Int63()%3 == 0 {
continue
}
}
Expand All @@ -422,11 +421,11 @@ func TestRawListenerBench(t *testing.T) {

for {
select {
case <- ch:
atomic.AddInt32(&count, 1)
case <-time.After(2000 * time.Millisecond):
log.Println("Emitted 200000 messages, captured: ", count, len(l.ackAliases), len(l.seqWithData), len(l.respAliases), len(l.respWithoutReq), len(l.packetsChan))
return
case <-ch:
atomic.AddInt32(&count, 1)
case <-time.After(2000 * time.Millisecond):
log.Println("Emitted 200000 messages, captured: ", count, len(l.ackAliases), len(l.seqWithData), len(l.respAliases), len(l.respWithoutReq), len(l.packetsChan))
return
}
}
}
18 changes: 9 additions & 9 deletions raw_socket_listener/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package rawSocket
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"encoding/binary"
"encoding/hex"
"github.com/buger/gor/proto"
"log"
"strconv"
Expand All @@ -20,12 +20,12 @@ var _ = log.Println
// Message can be compiled from unique packets with same message_id which sorted by sequence
// Message is received if we didn't receive any packets for 2000ms
type TCPMessage struct {
Seq uint32
Ack uint32
ResponseAck uint32
ResponseID tcpID
DataAck uint32
DataSeq uint32
Seq uint32
Ack uint32
ResponseAck uint32
ResponseID tcpID
DataAck uint32
DataSeq uint32

AssocMessage *TCPMessage
Start time.Time
Expand Down Expand Up @@ -210,11 +210,11 @@ func (t *TCPMessage) UpdateResponseAck() uint32 {
copy(t.ResponseID[4:], lastPacket.Raw[2:4]) // Src port
copy(t.ResponseID[6:], lastPacket.Raw[0:2]) // Dest port
binary.BigEndian.PutUint32(t.ResponseID[8:12], t.ResponseAck)
}
}

return t.ResponseAck
}

func (t *TCPMessage) ID() tcpID {
return t.packets[0].ID
}
}
2 changes: 1 addition & 1 deletion raw_socket_listener/tcp_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package rawSocket

import (
"bytes"
"encoding/binary"
_ "log"
"testing"
"encoding/binary"
)

func buildPacket(isIncoming bool, Ack, Seq uint32, Data []byte) (packet *TCPPacket) {
Expand Down
10 changes: 5 additions & 5 deletions raw_socket_listener/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type TCPPacket struct {
OrigAck uint32
DataOffset uint8

Raw []byte
Raw []byte
Data []byte
Addr []byte
ID tcpID
ID tcpID
}

// ParseTCPPacket takes address and tcp payload and returns parsed TCPPacket
Expand All @@ -49,8 +49,8 @@ func ParseTCPPacket(addr []byte, data []byte) (p *TCPPacket) {

func (p *TCPPacket) GenID() {
copy(p.ID[:16], p.Addr)
copy(p.ID[16:], p.Raw[0:2]) // Src port
copy(p.ID[18:], p.Raw[2:4]) // Dest port
copy(p.ID[16:], p.Raw[0:2]) // Src port
copy(p.ID[18:], p.Raw[2:4]) // Dest port
copy(p.ID[20:], p.Raw[8:12]) // Ack
}

Expand All @@ -73,7 +73,7 @@ func (t *TCPPacket) ParseBasic() {
}

func (t *TCPPacket) Dump() []byte {
buf := make([]byte, len(t.Data) + 16 + 16)
buf := make([]byte, len(t.Data)+16+16)
tcpBuf := buf[16:]

binary.BigEndian.PutUint16(tcpBuf[2:4], t.DestPort)
Expand Down

0 comments on commit 768c5f9

Please sign in to comment.