diff --git a/input_raw.go b/input_raw.go index 50a68d8a..a88a2039 100644 --- a/input_raw.go +++ b/input_raw.go @@ -81,7 +81,7 @@ func (i *RAWInput) listen(address string) { } // Receiving TCPMessage object - m := <- ch + m := <-ch i.data <- m } diff --git a/input_raw_test.go b/input_raw_test.go index 27408590..d61e72ea 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -31,16 +31,15 @@ func TestRAWInput(t *testing.T) { t.Fatal(err) } origin := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, } go origin.Serve(listener) defer listener.Close() originAddr := listener.Addr().String() - var respCounter, reqCounter int64 input := NewRAWInput(originAddr, EnginePcap, testRawExpire) @@ -63,7 +62,7 @@ func TestRAWInput(t *testing.T) { Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} - client := NewHTTPClient("http://" + listener.Addr().String(), &HTTPClientConfig{}) + client := NewHTTPClient("http://"+listener.Addr().String(), &HTTPClientConfig{}) go Start(quit) @@ -87,16 +86,15 @@ func TestRAWInputIPv6(t *testing.T) { t.Fatal(err) } origin := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, } go origin.Serve(listener) defer listener.Close() originAddr := listener.Addr().String() - var respCounter, reqCounter int64 input := NewRAWInput(originAddr, EnginePcap, testRawExpire) @@ -119,7 +117,7 @@ func TestRAWInputIPv6(t *testing.T) { Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} - client := NewHTTPClient("http://" + listener.Addr().String(), &HTTPClientConfig{}) + client := NewHTTPClient("http://"+listener.Addr().String(), &HTTPClientConfig{}) go Start(quit) diff --git a/raw_socket_listener/listener.go b/raw_socket_listener/listener.go index 045dfaa8..54a02eff 100644 --- a/raw_socket_listener/listener.go +++ b/raw_socket_listener/listener.go @@ -61,8 +61,8 @@ type Listener struct { messageExpire time.Duration - conn net.PacketConn - quit chan bool + conn net.PacketConn + quit chan bool readyCh chan bool } @@ -130,10 +130,10 @@ func (t *Listener) listen() { t.conn.Close() } return - case data := <- t.packetsChan: + case data := <-t.packetsChan: packet := ParseTCPPacket(data[:16], data[16:]) t.processTCPPacket(packet) - case <- gcTicker: + case <-gcTicker: now := time.Now() // Dispatch requests before responses @@ -175,13 +175,13 @@ func (t *Listener) dispatchMessage(message *TCPMessage) { if respID, ok := t.respWithoutReq[message.ResponseAck]; ok { if resp, rok := t.messages[respID]; rok { // if resp.AssocMessage == nil { - // log.Println("FOUND RESPONSE") - resp.AssocMessage = message - message.AssocMessage = resp + // log.Println("FOUND RESPONSE") + resp.AssocMessage = message + message.AssocMessage = resp - if resp.IsFinished() { - defer t.dispatchMessage(resp) - } + if resp.IsFinished() { + defer t.dispatchMessage(resp) + } // } } } @@ -333,7 +333,7 @@ func (t *Listener) readPcap() { // We need only packets with data inside // Check that the buffer is larger than the size of the TCP header if len(data) > int(dataOffset*4) { - newBuf := make([]byte, len(data) + 16) + newBuf := make([]byte, len(data)+16) copy(newBuf[:16], srcIP) copy(newBuf[16:], data) @@ -375,7 +375,7 @@ func (t *Listener) readRAWSocket() { if n > 0 { if t.isValidPacket(buf[:n]) { - newBuf := make([]byte, n + 4) + newBuf := make([]byte, n+4) copy(newBuf[16:], buf[:n]) copy(newBuf[:16], []byte(addr.(*net.IPAddr).IP)) @@ -550,9 +550,9 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) { func (t *Listener) IsReady() bool { select { - case <- t.readyCh: + case <-t.readyCh: return true - case <- time.After(5 * time.Second): + case <-time.After(5 * time.Second): return false } } diff --git a/raw_socket_listener/listener_test.go b/raw_socket_listener/listener_test.go index 5e7f06c5..80c09776 100644 --- a/raw_socket_listener/listener_test.go +++ b/raw_socket_listener/listener_test.go @@ -3,10 +3,10 @@ package rawSocket import ( "bytes" "log" - "testing" - "time" "math/rand" "sync/atomic" + "testing" + "time" ) func TestRawListenerInput(t *testing.T) { @@ -262,7 +262,7 @@ func testChunkedSequence(t *testing.T, listener *Listener, packets ...*TCPPacket } if len(listener.messagesChan) != 0 { - t.Fatal("messagesChan non empty:", <- listener.messagesChan) + t.Fatal("messagesChan non empty:", <-listener.messagesChan) } if len(listener.messages) != 0 { @@ -331,14 +331,13 @@ func TestRawListenerChunkedWrongOrder(t *testing.T) { } } - func chunkedPostMessage() []*TCPPacket { ack := uint32(rand.Int63()) seq := uint32(rand.Int63()) reqPacket1 := buildPacket(true, ack, seq, []byte("POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n")) // Packet with data have different Seq - reqPacket2 := buildPacket(true, ack, seq + 47, []byte("1\r\na\r\n")) + reqPacket2 := buildPacket(true, ack, seq+47, []byte("1\r\na\r\n")) reqPacket3 := buildPacket(true, ack, reqPacket2.Seq+5, []byte("1\r\nb\r\n")) reqPacket4 := buildPacket(true, ack, reqPacket3.Seq+5, []byte("0\r\n\r\n")) @@ -359,13 +358,13 @@ func postMessage() []*TCPPacket { rand.Read(data) head := []byte("POST / HTTP/1.1\r\nContent-Length: 9958\r\n\r\n") - for i, _ := range head { + for i := range head { data[i] = head[i] } return []*TCPPacket{ buildPacket(true, ack, seq, data), - buildPacket(false, seq + uint32(len(data)), seq2, []byte("HTTP/1.1 200 OK\r\n")), + buildPacket(false, seq+uint32(len(data)), seq2, []byte("HTTP/1.1 200 OK\r\n")), } } @@ -376,7 +375,7 @@ func getMessage() []*TCPPacket { return []*TCPPacket{ buildPacket(true, ack, seq, []byte("GET / HTTP/1.1\r\n\r\n")), - buildPacket(false, seq + 18, seq2, []byte("HTTP/1.1 200 OK\r\n")), + buildPacket(false, seq+18, seq2, []byte("HTTP/1.1 200 OK\r\n")), } } @@ -387,22 +386,22 @@ func TestRawListenerBench(t *testing.T) { // Should re-construct message from all possible combinations for i := 0; i < 1000; i++ { - go func(){ + go func() { for j := 0; j < 100; j++ { var packets []*TCPPacket - if j % 5 == 0 { + if j%5 == 0 { packets = chunkedPostMessage() - } else if j % 3 == 0 { - packets = postMessage() + } else if j%3 == 0 { + packets = postMessage() } else { packets = getMessage() } for _, p := range packets { // Randomly drop packets - if (i + j) % 5 == 0 { - if rand.Int63() % 3 == 0 { + if (i+j)%5 == 0 { + if rand.Int63()%3 == 0 { continue } } @@ -422,11 +421,11 @@ func TestRawListenerBench(t *testing.T) { for { select { - case <- ch: - atomic.AddInt32(&count, 1) - case <-time.After(2000 * time.Millisecond): - log.Println("Emitted 200000 messages, captured: ", count, len(l.ackAliases), len(l.seqWithData), len(l.respAliases), len(l.respWithoutReq), len(l.packetsChan)) - return + case <-ch: + atomic.AddInt32(&count, 1) + case <-time.After(2000 * time.Millisecond): + log.Println("Emitted 200000 messages, captured: ", count, len(l.ackAliases), len(l.seqWithData), len(l.respAliases), len(l.respWithoutReq), len(l.packetsChan)) + return } } } diff --git a/raw_socket_listener/tcp_message.go b/raw_socket_listener/tcp_message.go index 2e399842..385e30dd 100644 --- a/raw_socket_listener/tcp_message.go +++ b/raw_socket_listener/tcp_message.go @@ -3,8 +3,8 @@ package rawSocket import ( "bytes" "crypto/sha1" - "encoding/hex" "encoding/binary" + "encoding/hex" "github.com/buger/gor/proto" "log" "strconv" @@ -20,12 +20,12 @@ var _ = log.Println // Message can be compiled from unique packets with same message_id which sorted by sequence // Message is received if we didn't receive any packets for 2000ms type TCPMessage struct { - Seq uint32 - Ack uint32 - ResponseAck uint32 - ResponseID tcpID - DataAck uint32 - DataSeq uint32 + Seq uint32 + Ack uint32 + ResponseAck uint32 + ResponseID tcpID + DataAck uint32 + DataSeq uint32 AssocMessage *TCPMessage Start time.Time @@ -210,11 +210,11 @@ func (t *TCPMessage) UpdateResponseAck() uint32 { copy(t.ResponseID[4:], lastPacket.Raw[2:4]) // Src port copy(t.ResponseID[6:], lastPacket.Raw[0:2]) // Dest port binary.BigEndian.PutUint32(t.ResponseID[8:12], t.ResponseAck) - } + } return t.ResponseAck } func (t *TCPMessage) ID() tcpID { return t.packets[0].ID -} \ No newline at end of file +} diff --git a/raw_socket_listener/tcp_message_test.go b/raw_socket_listener/tcp_message_test.go index 2df079e3..fc8ebc4a 100644 --- a/raw_socket_listener/tcp_message_test.go +++ b/raw_socket_listener/tcp_message_test.go @@ -2,9 +2,9 @@ package rawSocket import ( "bytes" + "encoding/binary" _ "log" "testing" - "encoding/binary" ) func buildPacket(isIncoming bool, Ack, Seq uint32, Data []byte) (packet *TCPPacket) { diff --git a/raw_socket_listener/tcp_packet.go b/raw_socket_listener/tcp_packet.go index bb6ef7da..fdfe5154 100644 --- a/raw_socket_listener/tcp_packet.go +++ b/raw_socket_listener/tcp_packet.go @@ -31,10 +31,10 @@ type TCPPacket struct { OrigAck uint32 DataOffset uint8 - Raw []byte + Raw []byte Data []byte Addr []byte - ID tcpID + ID tcpID } // ParseTCPPacket takes address and tcp payload and returns parsed TCPPacket @@ -49,8 +49,8 @@ func ParseTCPPacket(addr []byte, data []byte) (p *TCPPacket) { func (p *TCPPacket) GenID() { copy(p.ID[:16], p.Addr) - copy(p.ID[16:], p.Raw[0:2]) // Src port - copy(p.ID[18:], p.Raw[2:4]) // Dest port + copy(p.ID[16:], p.Raw[0:2]) // Src port + copy(p.ID[18:], p.Raw[2:4]) // Dest port copy(p.ID[20:], p.Raw[8:12]) // Ack } @@ -73,7 +73,7 @@ func (t *TCPPacket) ParseBasic() { } func (t *TCPPacket) Dump() []byte { - buf := make([]byte, len(t.Data) + 16 + 16) + buf := make([]byte, len(t.Data)+16+16) tcpBuf := buf[16:] binary.BigEndian.PutUint16(tcpBuf[2:4], t.DestPort)