Skip to content

Commit

Permalink
[elasticsearchexporter] Direct serialization without objmodel in OTel…
Browse files Browse the repository at this point in the history
… mode (open-telemetry#37032)

Directly serializes pdata to JSON in OTel mode
* Improved performance as no `objmodel.Document` needs to be created
first
* Fixes issue discovered in
open-telemetry#37021
where map bodies with dotted field names are de-dotted

---------

Co-authored-by: Carson Ip <[email protected]>
Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent 65e8851 commit 09d7cde
Show file tree
Hide file tree
Showing 11 changed files with 1,075 additions and 612 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_optimized-json-encoding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: More efficient JSON encoding for OTel mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37032]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Increases throughput for metrics by 2x and for logs and traces by 3x

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
107 changes: 56 additions & 51 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -20,7 +19,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
)

type elasticsearchExporter struct {
Expand All @@ -36,6 +35,8 @@ type elasticsearchExporter struct {

wg sync.WaitGroup // active sessions
bulkIndexer bulkIndexer

bufferPool *pool.BufferPool
}

func newExporter(
Expand Down Expand Up @@ -69,6 +70,7 @@ func newExporter(
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
bufferPool: pool.NewBufferPool(),
}
}

Expand Down Expand Up @@ -173,11 +175,14 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
buf := e.bufferPool.NewPooledBuffer()
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buf.Buffer)
if err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode log event: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand All @@ -193,21 +198,18 @@ func (e *elasticsearchExporter) pushMetricsData(
}
defer session.End()

var (
validationErrs []error // log instead of returning these so that upstream does not retry
errs []error
)
var errs []error
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
resource := resourceMetric.Resource()
scopeMetrics := resourceMetric.ScopeMetrics()

resourceDocs := make(map[string]map[uint32]objmodel.Document)

for j := 0; j < scopeMetrics.Len(); j++ {
var validationErrs []error // log instead of returning these so that upstream does not retry
scopeMetrics := scopeMetrics.At(j)
scope := scopeMetrics.Scope()
groupedDataPointsByIndex := make(map[string]map[uint32][]dataPoint)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)

Expand All @@ -216,13 +218,17 @@ func (e *elasticsearchExporter) pushMetricsData(
if err != nil {
return err
}
if _, ok := resourceDocs[fIndex]; !ok {
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
groupedDataPoints, ok := groupedDataPointsByIndex[fIndex]
if !ok {
groupedDataPoints = make(map[uint32][]dataPoint)
groupedDataPointsByIndex[fIndex] = groupedDataPoints
}

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp); err != nil {
return err
dpHash := e.model.hashDataPoint(dp)
dataPoints, ok := groupedDataPoints[dpHash]
if !ok {
groupedDataPoints[dpHash] = []dataPoint{dp}
} else {
groupedDataPoints[dpHash] = append(dataPoints, dp)
}
return nil
}
Expand All @@ -232,7 +238,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -241,7 +247,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -254,7 +260,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil {
if err := upsertDataPoint(newExponentialHistogramDataPoint(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -267,7 +273,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil {
if err := upsertDataPoint(newHistogramDataPoint(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -276,37 +282,35 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil {
if err := upsertDataPoint(newSummaryDataPoint(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
}
}
}

if len(validationErrs) > 0 {
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}

for fIndex, docs := range resourceDocs {
for _, doc := range docs {
var (
docBytes []byte
err error
)
docBytes, err = e.model.encodeDocument(doc)
if err != nil {
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
for fIndex, groupedDataPoints := range groupedDataPointsByIndex {
for _, dataPoints := range groupedDataPoints {
buf := e.bufferPool.NewPooledBuffer()
dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer)
if err != nil {
buf.Recycle()
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil {
// not recycling after Add returns an error as we don't know if it's already recycled
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
errs = append(errs, err)
}
}
if len(validationErrs) > 0 {
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}
}
}

Expand Down Expand Up @@ -411,11 +415,14 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
buf := e.bufferPool.NewPooledBuffer()
err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer)
if err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode trace record: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand All @@ -440,14 +447,12 @@ func (e *elasticsearchExporter) pushSpanEvent(
}
fIndex = formattedIndex
}

document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if document == nil {
buf := e.bufferPool.NewPooledBuffer()
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer)
if buf.Buffer.Len() == 0 {
buf.Recycle()
return nil
}
docBytes, err := e.model.encodeDocument(*document)
if err != nil {
return err
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
}
Loading

0 comments on commit 09d7cde

Please sign in to comment.