Skip to content

Commit

Permalink
feat: add bytes_sent and bytes_received as metrics (#624)
Browse files Browse the repository at this point in the history
feat: add bytes_sent and bytes_received as metrics
  • Loading branch information
rhatgadkar-goog authored Sep 27, 2024
1 parent 58057c7 commit 4aa27a5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ Supported metrics include:
refresh operations
- `alloydbconn/refresh_failure_count`: The number of failed refresh
operations.
- `alloydbconn/bytes_sent`: The number of bytes sent to an AlloyDB instance.
- `alloydbconn/bytes_received`: The number of bytes received from an AlloyDB
instance.

Supported traces include:

Expand Down
28 changes: 26 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption)
return newInstrumentedConn(tlsConn, func() {
n := atomic.AddUint64(cache.openConns, ^uint64(0))
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, inst.String())
}), nil
}, d.dialerID, inst.String()), nil
}

// removeCached stops all background refreshes and deletes the connection
Expand Down Expand Up @@ -537,10 +537,12 @@ func (b *buffer) put(buf *[]byte) {

// newInstrumentedConn initializes an instrumentedConn that on closing will
// decrement the number of open connects and record the result.
func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, instance string) *instrumentedConn {
return &instrumentedConn{
Conn: conn,
closeFunc: closeFunc,
dialerID: dialerID,
instance: instance,
}
}

Expand All @@ -549,6 +551,28 @@ func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
type instrumentedConn struct {
net.Conn
closeFunc func()
dialerID string
instance string
}

// Read delegates to the underlying net.Conn interface and records number of
// bytes read.
func (i *instrumentedConn) Read(b []byte) (int, error) {
bytesRead, err := i.Conn.Read(b)
if err == nil {
go trace.RecordBytesReceived(context.Background(), int64(bytesRead), i.instance, i.dialerID)
}
return bytesRead, err
}

// Write delegates to the underlying net.Conn interface and records number of
// bytes written.
func (i *instrumentedConn) Write(b []byte) (int, error) {
bytesWritten, err := i.Conn.Write(b)
if err == nil {
go trace.RecordBytesSent(context.Background(), int64(bytesWritten), i.instance, i.dialerID)
}
return bytesWritten, err
}

// Close delegates to the underlying net.Conn interface and reports the close
Expand Down
38 changes: 38 additions & 0 deletions internal/trace/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ var (
"A failed certificate refresh operation",
stats.UnitDimensionless,
)
mBytesSent = stats.Int64(
"alloydbconn/bytes_sent",
"The bytes sent to an AlloyDB instance",
stats.UnitDimensionless,
)
mBytesReceived = stats.Int64(
"alloydbconn/bytes_received",
"The bytes received from an AlloyDB instance",
stats.UnitDimensionless,
)

latencyView = &view.View{
Name: "alloydbconn/dial_latency",
Expand Down Expand Up @@ -94,6 +104,20 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{keyInstance, keyDialerID, keyErrorCode},
}
bytesSentView = &view.View{
Name: "alloydbconn/bytes_sent",
Measure: mBytesSent,
Description: "The number of bytes sent to an AlloyDB instance",
Aggregation: view.Sum(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}
bytesReceivedView = &view.View{
Name: "alloydbconn/bytes_received",
Measure: mBytesReceived,
Description: "The number of bytes received from an AlloyDB instance",
Aggregation: view.Sum(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}

registerOnce sync.Once
registerErr error
Expand All @@ -110,6 +134,8 @@ func InitMetrics() error {
dialFailureView,
refreshCountView,
failedRefreshCountView,
bytesSentView,
bytesReceivedView,
); rErr != nil {
registerErr = fmt.Errorf("failed to initialize metrics: %v", rErr)
}
Expand Down Expand Up @@ -157,6 +183,18 @@ func RecordRefreshResult(ctx context.Context, instance, dialerID string, err err
stats.Record(ctx, mSuccessfulRefresh.M(1))
}

// RecordBytesSent reports the number of bytes sent to an AlloyDB instance.
func RecordBytesSent(ctx context.Context, num int64, instance, dialerID string) {
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mBytesSent.M(num))
}

// RecordBytesReceived reports the number of bytes received from an AlloyDB instance.
func RecordBytesReceived(ctx context.Context, num int64, instance, dialerID string) {
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mBytesReceived.M(num))
}

// errorCode returns an error code as given from the AlloyDB Admin API, provided
// the error wraps a googleapi.Error type. If multiple error codes are returned
// from the API, then a comma-separated string of all codes is returned.
Expand Down
37 changes: 37 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package alloydbconn

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -118,6 +119,24 @@ func wantCountMetric(t *testing.T, wantName string, ms []metric) {
)
}

// wantSumMetric ensures the provided metrics include a metric with the wanted
// name and at least one data point.
func wantSumMetric(t *testing.T, wantName string, ms []metric) {
t.Helper()
gotNames := make(map[string]view.AggregationData)
for _, m := range ms {
gotNames[m.name] = m.data
_, ok := m.data.(*view.SumData)
if m.name == wantName && ok {
return
}
}
t.Fatalf(
"metric name want = %v with SumData, all metrics = %v",
wantName, dump(t, gotNames),
)
}

func TestDialerWithMetrics(t *testing.T) {
spy := &spyMetricsExporter{}
view.RegisterExporter(spy)
Expand Down Expand Up @@ -163,6 +182,22 @@ func TestDialerWithMetrics(t *testing.T) {
if err != nil {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
// write to conn to test bytes_sent and bytes_received
buf := &bytes.Buffer{}
err = buf.WriteByte('a')
if err != nil {
t.Fatalf("buf.WriteByte failed: %v", err)
}
// Doing a read before doing a write, because when this unit test runs on
// Windows, it fails when the write is done before the read.
_, err = conn2.Read(buf.Bytes())
if err != nil {
t.Fatalf("conn.Read failed: %v", err)
}
_, err = conn2.Write(buf.Bytes())
if err != nil {
t.Fatalf("conn.Write failed: %v", err)
}
defer conn2.Close()
// dial a bogus instance
_, err = d.Dial(ctx,
Expand All @@ -179,6 +214,8 @@ func TestDialerWithMetrics(t *testing.T) {
wantLastValueMetric(t, "alloydbconn/open_connections", spy.data(), 2)
wantDistributionMetric(t, "alloydbconn/dial_latency", spy.data())
wantCountMetric(t, "alloydbconn/refresh_success_count", spy.data())
wantSumMetric(t, "alloydbconn/bytes_sent", spy.data())
wantSumMetric(t, "alloydbconn/bytes_received", spy.data())

// failure metrics from dialing bogus instance
wantCountMetric(t, "alloydbconn/dial_failure_count", spy.data())
Expand Down

0 comments on commit 4aa27a5

Please sign in to comment.