Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: add a FileExporter to match (experimental) spec #146

Open
StephenWithPH opened this issue Feb 2, 2024 · 0 comments
Open

Comments

@StephenWithPH
Copy link

It would be useful to have a FileExporter in addition to the current CollectorExporter and ConsoleExporter. This would allow users of the library to sink the telemetry locally in cases where there wasn't connectivity.

The spec can be found at: https://opentelemetry.io/docs/specs/otel/protocol/file-exporter

I was able to hack in a very prototype version that worked. In order to get to proof-of-concept, I copy/pasted the internal code necessary to get hands on the protobufs so I could use their built-in toProto3Json method.

class FileExporter implements sdk.SpanExporter {
  const FileExporter._internal({required IOSink sink}) : _sink = sink;

  final IOSink _sink;

  static Future<FileExporter?> init({
    required String fileName,
  }) async {
    try {
      final dir = await getApplicationSupportDirectory();
      final fullPath = p.join(dir.path, fileName);
      final file = LocalFileSystem().file(fullPath);
      file.createSync(recursive: true); // assure that we can
      final sink = File(file.absolute.path).openWrite();

      final instance = FileExporter._internal(sink: sink,);
      return instance;
    } on Object catch (obj) {
      // never crash the caller!
      print(obj);
      return null;
    }
  }

  @override
  void export(List<sdk.ReadOnlySpan> spans) {
    if (spans.isEmpty) {
      return;
    }

    final pbs = _spansToProtobuf(spans);


    _sink.writeAll(pbs.map((e) => "${e.toProto3Json()}\n"));
  }

  @override
  void forceFlush() {
    _sink.flush();
  }

  @override
  void shutdown() {
    forceFlush();
    _sink.close();
    throw UnimplementedError();
  }

  Iterable<pb_trace.ResourceSpans> _spansToProtobuf(
      List<sdk.ReadOnlySpan> spans) {
    // use a map of maps to group spans by resource and instrumentation library
    final rsm =
    <sdk.Resource, Map<sdk.InstrumentationScope, List<pb_trace.Span>>>{};
    for (final span in spans) {
      final il = rsm[span.resource] ??
          <sdk.InstrumentationScope, List<pb_trace.Span>>{};
      il[span.instrumentationScope] =
          il[span.instrumentationScope] ?? <pb_trace.Span>[]
            ..add(_spanToProtobuf(span));
      rsm[span.resource] = il;
    }

    final rss = <pb_trace.ResourceSpans>[];
    for (final il in rsm.entries) {
      // for each distinct resource, construct the protobuf equivalent
      final attrs = <pb_common.KeyValue>[];
      for (final attr in il.key.attributes.keys) {
        attrs.add(pb_common.KeyValue(
            key: attr,
            value: _attributeValueToProtobuf(il.key.attributes.get(attr)!)));
      }
      final rs = pb_trace.ResourceSpans(
          resource: pb_resource.Resource(attributes: attrs), scopeSpans: []);
      // for each distinct instrumentation library, construct the protobuf equivalent
      for (final ils in il.value.entries) {
        rs.scopeSpans.add(pb_trace.ScopeSpans(
            spans: ils.value,
            scope: pb_common.InstrumentationScope(
                name: ils.key.name, version: ils.key.version)));
      }
      rss.add(rs);
    }
    return rss;
  }

  pb_trace.Span _spanToProtobuf(sdk.ReadOnlySpan span) {
    pb_trace.Status_StatusCode statusCode;
    switch (span.status.code) {
      case api.StatusCode.unset:
        statusCode = pb_trace.Status_StatusCode.STATUS_CODE_UNSET;
        break;
      case api.StatusCode.error:
        statusCode = pb_trace.Status_StatusCode.STATUS_CODE_ERROR;
        break;
      case api.StatusCode.ok:
        statusCode = pb_trace.Status_StatusCode.STATUS_CODE_OK;
        break;
    }

    pb_trace.Span_SpanKind spanKind;
    switch (span.kind) {
      case api.SpanKind.client:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_CLIENT;
        break;
      case api.SpanKind.consumer:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_CONSUMER;
        break;
      case api.SpanKind.internal:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_INTERNAL;
        break;
      case api.SpanKind.producer:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_PRODUCER;
        break;
      case api.SpanKind.server:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_SERVER;
        break;
      default:
        spanKind = pb_trace.Span_SpanKind.SPAN_KIND_UNSPECIFIED;
    }

    return pb_trace.Span(
      traceId: span.spanContext.traceId.get(),
      spanId: span.spanContext.spanId.get(),
      parentSpanId: span.parentSpanId.get(),
      name: span.name,
      startTimeUnixNano: span.startTime,
      endTimeUnixNano: span.endTime,
      attributes: span.attributes.keys.map((key) =>
          pb_common.KeyValue(
              key: key,
              value: _attributeValueToProtobuf(span.attributes.get(key)!))),
      status:
      pb_trace.Status(code: statusCode, message: span.status.description),
      kind: spanKind,
      links: _spanLinksToProtobuf(span.links),
      events: _spanEventsToPb(span.events),
    );
  }

  Iterable<pb_trace.Span_Link> _spanLinksToProtobuf(List<api.SpanLink> links) {
    final pbLinks = <pb_trace.Span_Link>[];
    for (final link in links) {
      final attrs = <pb_common.KeyValue>[];
      for (final attr in link.attributes) {
        attrs.add(pb_common.KeyValue(
            key: attr.key, value: _attributeValueToProtobuf(attr.value)));
      }
      pbLinks.add(pb_trace.Span_Link(
          traceId: link.context.traceId.get(),
          spanId: link.context.spanId.get(),
          traceState: link.context.traceState.toString(),
          attributes: attrs));
    }
    return pbLinks;
  }

  Iterable<pb_trace.Span_Event> _spanEventsToPb(List<SpanEvent> events) {
    final List<pb_trace.Span_Event> pbEvents = [];
    for (final event in events) {
      final attrs = <pb_common.KeyValue>[];
      for (final attr in event.attributes) {
        attrs.add(pb_common.KeyValue(
            key: attr.key, value: _attributeValueToProtobuf(attr.value)));
      }
      final e = pb_trace.Span_Event(
          timeUnixNano: event.eventTime, name: event.name, attributes: attrs);
      pbEvents.add(e);
    }
    return pbEvents;
  }

  pb_common.AnyValue _attributeValueToProtobuf(Object value) {
    switch (value.runtimeType) {
      case String:
        return pb_common.AnyValue(stringValue: value as String);
      case bool:
        return pb_common.AnyValue(boolValue: value as bool);
      case double:
        return pb_common.AnyValue(doubleValue: value as double);
      case int:
        return pb_common.AnyValue(intValue: Int64(value as int));
      case List:
        final list = value as List;
        if (list.isNotEmpty) {
          switch (list[0].runtimeType) {
            case String:
              final values = [] as List<pb_common.AnyValue>;
              for (final str in list) {
                values.add(pb_common.AnyValue(stringValue: str));
              }
              return pb_common.AnyValue(
                  arrayValue: pb_common.ArrayValue(values: values));
            case bool:
              final values = [] as List<pb_common.AnyValue>;
              for (final b in list) {
                values.add(pb_common.AnyValue(boolValue: b));
              }
              return pb_common.AnyValue(
                  arrayValue: pb_common.ArrayValue(values: values));
            case double:
              final values = [] as List<pb_common.AnyValue>;
              for (final d in list) {
                values.add(pb_common.AnyValue(doubleValue: d));
              }
              return pb_common.AnyValue(
                  arrayValue: pb_common.ArrayValue(values: values));
            case int:
              final values = [] as List<pb_common.AnyValue>;
              for (final i in list) {
                values.add(pb_common.AnyValue(intValue: i));
              }
              return pb_common.AnyValue(
                  arrayValue: pb_common.ArrayValue(values: values));
          }
        }
    }
    return pb_common.AnyValue();
  }

}
@StephenWithPH StephenWithPH changed the title add a FileExporter to match (experimental) spec feature request: add a FileExporter to match (experimental) spec Feb 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant