From c208ea293b7789d8f7e99e87fbae201cb93df30e Mon Sep 17 00:00:00 2001 From: Michael Li Date: Thu, 19 Dec 2024 20:23:07 -0800 Subject: [PATCH] Add statsdreceiver Unixgram Support (#36608) #### Description Adds `unixgram` transport for the `statsdreceiver`. Additionally, creates a new `packetServer` base class for both `UDS` and `UDP*` transport types #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21385 #### Testing Added a unit test --------- Co-authored-by: Christos Markou --- .chloggen/statsdreceiver-uds.yaml | 27 ++++++ receiver/statsdreceiver/README.md | 9 +- .../internal/transport/client/client.go | 9 ++ .../internal/transport/packet_server.go | 84 +++++++++++++++++++ .../internal/transport/transport.go | 7 +- .../internal/transport/udp_server.go | 60 ++----------- .../internal/transport/uds_server.go | 42 ++++++++++ receiver/statsdreceiver/receiver.go | 12 ++- receiver/statsdreceiver/receiver_test.go | 18 ++++ 9 files changed, 204 insertions(+), 64 deletions(-) create mode 100644 .chloggen/statsdreceiver-uds.yaml create mode 100644 receiver/statsdreceiver/internal/transport/packet_server.go create mode 100644 receiver/statsdreceiver/internal/transport/uds_server.go diff --git a/.chloggen/statsdreceiver-uds.yaml b/.chloggen/statsdreceiver-uds.yaml new file mode 100644 index 000000000000..b6307d452d6b --- /dev/null +++ b/.chloggen/statsdreceiver-uds.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: statsdreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add UDS support to statsdreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21385] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index 48130dfd35cd..c302cde0eda1 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -19,12 +19,13 @@ Use case: it does not support horizontal pool of collectors. Desired work case i ## Configuration -The following settings are required: - -- `endpoint` (default = `localhost:8125`): Address and port to listen on. +The Following settings are optional: +- `endpoint`: Address and port to listen on. + - For `udp` and `tcp` based `transport`, this config will default to `localhost:8125` + - For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock` -The Following settings are optional: +- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/internal/transport/transport.go). - `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 8b9fd7f06ba3..ee1aab425cf9 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -51,6 +51,15 @@ func (s *StatsD) connect() error { if err != nil { return err } + case "unixgram": + unixAddr, err := net.ResolveUnixAddr(s.transport, s.address) + if err != nil { + return err + } + s.conn, err = net.DialUnix(s.transport, nil, unixAddr) + if err != nil { + return err + } default: return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go new file mode 100644 index 000000000000..c1fbe2b7f946 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import ( + "errors" + "net" + + "go.opentelemetry.io/collector/consumer" +) + +type packetServer struct { + packetConn net.PacketConn + transport Transport +} + +// ListenAndServe starts the server ready to receive metrics. +func (u *packetServer) ListenAndServe( + nextConsumer consumer.Metrics, + reporter Reporter, + transferChan chan<- Metric, +) error { + if nextConsumer == nil || reporter == nil { + return errNilListenAndServeParameters + } + + buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) + for { + n, addr, err := u.packetConn.ReadFrom(buf) + if addr == nil && u.transport == UDS { + addr = &udsAddr{ + network: u.transport.String(), + address: u.packetConn.LocalAddr().String(), + } + } + + if n > 0 { + u.handlePacket(n, buf, addr, transferChan) + } + if err != nil { + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + u.transport, + u.packetConn.LocalAddr(), + err) + var netErr net.Error + if errors.As(err, &netErr) { + if netErr.Timeout() { + continue + } + } + return err + } + } +} + +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. +func (u *packetServer) handlePacket( + numBytes int, + data []byte, + addr net.Addr, + transferChan chan<- Metric, +) { + splitPacket := NewSplitBytes(data[:numBytes], '\n') + for splitPacket.Next() { + chunk := splitPacket.Chunk() + if len(chunk) > 0 { + transferChan <- Metric{string(chunk), addr} + } + } +} + +type udsAddr struct { + network string + address string +} + +func (u *udsAddr) Network() string { + return u.network +} + +func (u *udsAddr) String() string { + return u.address +} diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index c065e30c746f..5e9113abcb87 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -21,6 +21,7 @@ const ( TCP Transport = "tcp" TCP4 Transport = "tcp4" TCP6 Transport = "tcp6" + UDS Transport = "unixgram" ) // NewTransport creates a Transport based on the transport string or returns an empty Transport. @@ -31,6 +32,8 @@ func NewTransport(ts string) Transport { return trans case TCP, TCP4, TCP6: return trans + case UDS: + return trans } return Transport("") } @@ -38,7 +41,7 @@ func NewTransport(ts string) Transport { // String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. func (trans Transport) String() string { switch trans { - case UDP, UDP4, UDP6, TCP, TCP4, TCP6: + case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS: return string(trans) } return "" @@ -47,7 +50,7 @@ func (trans Transport) String() string { // IsPacketTransport returns true if the transport is packet based. func (trans Transport) IsPacketTransport() bool { switch trans { - case UDP, UDP4, UDP6: + case UDP, UDP4, UDP6, UDS: return true } return false diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 3ad113c80654..823949b7ca4a 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -4,16 +4,12 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" import ( - "errors" "fmt" "net" - - "go.opentelemetry.io/collector/consumer" ) type udpServer struct { - packetConn net.PacketConn - transport Transport + packetServer } // Ensure that Server is implemented on UDP Server. @@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) { } return &udpServer{ - packetConn: conn, - transport: transport, + packetServer: packetServer{ + packetConn: conn, + transport: transport, + }, }, nil } -// ListenAndServe starts the server ready to receive metrics. -func (u *udpServer) ListenAndServe( - nextConsumer consumer.Metrics, - reporter Reporter, - transferChan chan<- Metric, -) error { - if nextConsumer == nil || reporter == nil { - return errNilListenAndServeParameters - } - - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) - for { - n, addr, err := u.packetConn.ReadFrom(buf) - if n > 0 { - u.handlePacket(n, buf, addr, transferChan) - } - if err != nil { - reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", - u.transport, - u.packetConn.LocalAddr(), - err) - var netErr net.Error - if errors.As(err, &netErr) { - if netErr.Timeout() { - continue - } - } - return err - } - } -} - // Close closes the server. func (u *udpServer) Close() error { return u.packetConn.Close() } - -// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. -func (u *udpServer) handlePacket( - numBytes int, - data []byte, - addr net.Addr, - transferChan chan<- Metric, -) { - splitPacket := NewSplitBytes(data[:numBytes], '\n') - for splitPacket.Next() { - chunk := splitPacket.Chunk() - if len(chunk) > 0 { - transferChan <- Metric{string(chunk), addr} - } - } -} diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go new file mode 100644 index 000000000000..9e019624e85d --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/uds_server.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import ( + "fmt" + "net" + "os" +) + +type udsServer struct { + packetServer +} + +// Ensure that Server is implemented on UDS Server. +var _ (Server) = (*udsServer)(nil) + +// NewUDSServer creates a transport.Server using Unixgram as its transport. +func NewUDSServer(transport Transport, socketPath string) (Server, error) { + if !transport.IsPacketTransport() { + return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) + } + + conn, err := net.ListenPacket(transport.String(), socketPath) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) + } + + return &udsServer{ + packetServer: packetServer{ + packetConn: conn, + transport: transport, + }, + }, nil +} + +// Close closes the server. +func (u *udsServer) Close() error { + os.Remove(u.packetConn.LocalAddr().String()) + return u.packetConn.Close() +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 81e11a67b1e5..f5114d6c66dc 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -46,8 +46,14 @@ func newReceiver( config Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { + trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) + if config.NetAddr.Endpoint == "" { - config.NetAddr.Endpoint = "localhost:8125" + if trans == transport.UDS { + config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock" + } else { + config.NetAddr.Endpoint = "localhost:8125" + } } rep, err := newReporter(set) @@ -55,7 +61,6 @@ func newReceiver( return nil, err } - trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ LongLivedCtx: true, ReceiverID: set.ID, @@ -80,13 +85,14 @@ func newReceiver( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add unix socket transport implementations trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: return transport.NewUDPServer(trans, config.NetAddr.Endpoint) case transport.TCP, transport.TCP4, transport.TCP6: return transport.NewTCPServer(trans, config.NetAddr.Endpoint) + case transport.UDS: + return transport.NewUDSServer(trans, config.NetAddr.Endpoint) } return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 11a12d9d951e..abc425ac2628 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { return c }, }, + { + name: "UDS server with 4s interval", + addr: "/tmp/statsd_test.sock", + configFn: func() *Config { + return &Config{ + NetAddr: confignet.AddrConfig{ + Endpoint: "/tmp/statsd_test.sock", + Transport: confignet.TransportTypeUnixgram, + }, + AggregationInterval: 4 * time.Second, + } + }, + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("unixgram", addr) + require.NoError(t, err) + return c + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {