From 0a64f574f16340f37d331ef95befc6a505243292 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 5 Apr 2024 12:38:35 -0300 Subject: [PATCH] output: flush complex nested values, not just map[string]string. send the full nested complex values when flushing to output plugins in `Message.Record` as a `map[string]interface{}` instead of as a simple `map[string]string`. This will allow receiving float64 and other simple types as well as nested records with key/value pairs and arrays. tests have also been added for this new functionality. Signed-off-by: Phillip Whelan --- cshared.go | 123 ++++++++++++++++-------------- cshared_test.go | 196 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 261 insertions(+), 58 deletions(-) diff --git a/cshared.go b/cshared.go index ec55e07..7349e26 100644 --- a/cshared.go +++ b/cshared.go @@ -194,6 +194,12 @@ func testFLBPluginInputCallback() ([]byte, error) { return C.GoBytes(data, C.int(csize)), nil } +func prepareOutputFlush(output OutputPlugin) error { + theOutput = output + FLBPluginOutputPreRun(0) + return nil +} + // Lock used to synchronize access to theInput variable. var theInputLock sync.Mutex @@ -417,6 +423,8 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { //export FLBPluginFlush //nolint:funlen //ignore length requirement for this function func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { + var err error + initWG.Wait() if theOutput == nil { @@ -424,7 +432,6 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { return output.FLB_RETRY } - var err error select { case <-runCtx.Done(): err = runCtx.Err() @@ -438,6 +445,59 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { } in := C.GoBytes(data, clength) + tag := C.GoString(ctag) + + return pluginFlush(tag, in) +} + +func decodeEntry(tag string, entry []any) (*Message, error) { + var t time.Time + + slice := reflect.ValueOf(entry) + if slice.Kind() != reflect.Slice || slice.Len() < 2 { + return nil, fmt.Errorf("unexpected entry length: %d", slice.Len()) + } + + ts := slice.Index(0).Interface() + switch ft := ts.(type) { + case bigEndianTime: + t = time.Time(ft) + case []interface{}: + s := reflect.ValueOf(ft) + st := s.Index(0).Interface() + ty, ok := st.(bigEndianTime) + if !ok { + return nil, fmt.Errorf("unable to decode time in record metadata") + } + t = time.Time(ty) + default: + return nil, fmt.Errorf("unexpected entry time type: %T", entry[0]) + } + + data := slice.Index(1) + recVal, ok := data.Interface().(map[interface{}]interface{}) + if !ok { + return nil, nil + } + + var rec map[string]interface{} + if d := len(recVal); d != 0 { + rec = make(map[string]interface{}, d) + for k, v := range recVal { + key, ok := k.(string) + if !ok { + return nil, fmt.Errorf("unexpected record key type: %T", k) + } + rec[key] = v + } + } + + return &Message{Time: t, Record: rec, tag: &tag}, nil +} + +func pluginFlush(tag string, in []byte) int { + var err error + h := &codec.MsgpackHandle{} err = h.SetBytesExt(reflect.TypeOf(bigEndianTime{}), 0, &bigEndianTime{}) if err != nil { @@ -471,66 +531,13 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { return output.FLB_ERROR } - slice := reflect.ValueOf(entry) - if slice.Kind() != reflect.Slice || slice.Len() < 2 { - fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", slice.Len()) - return output.FLB_ERROR - } - - var t time.Time - ts := slice.Index(0).Interface() - switch ft := ts.(type) { - case bigEndianTime: - t = time.Time(ft) - case []interface{}: - s := reflect.ValueOf(ft) - st := s.Index(0).Interface() - ty := st.(bigEndianTime) - t = time.Time(ty) - default: - fmt.Fprintf(os.Stderr, "unexpected entry time type: %T\n", entry[0]) + msg, err := decodeEntry(tag, entry) + if err != nil { + fmt.Fprintf(os.Stderr, "decode error: %s\n", err) return output.FLB_ERROR } - data := slice.Index(1) - recVal := data.Interface().(map[interface{}]interface{}) - - var rec map[string]string - if d := len(recVal); d != 0 { - rec = make(map[string]string, d) - for k, v := range recVal { - key, ok := k.(string) - if !ok { - fmt.Fprintf(os.Stderr, "unexpected record key type: %T\n", k) - return output.FLB_ERROR - } - - var val string - switch tv := v.(type) { - case []uint8: - val = string(tv) - case uint64: - val = strconv.FormatUint(tv, 10) - case int64: - val = strconv.FormatInt(tv, 10) - case bool: - val = strconv.FormatBool(tv) - default: - fmt.Fprintf(os.Stderr, "unexpected record value type: %T\n", v) - return output.FLB_ERROR - } - - rec[key] = val - } - } - - tag := C.GoString(ctag) - // C.free(unsafe.Pointer(ctag)) - - theChannel <- Message{Time: t, Record: rec, tag: &tag} - - // C.free(data) - // C.free(unsafe.Pointer(&clength)) + theChannel <- *msg } return output.FLB_OK diff --git a/cshared_test.go b/cshared_test.go index 86d294f..9d02d3d 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -1,14 +1,18 @@ package plugin import ( + "bytes" "context" "fmt" + "reflect" "sync" "sync/atomic" "testing" "time" "unsafe" + "github.com/calyptia/plugin/input" + "github.com/calyptia/plugin/metric" "github.com/calyptia/plugin/output" ) @@ -427,3 +431,195 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { concurrentCountFinish.Load()) } } + +type testOutputHandlerReflect struct { + param string + flushCounter metric.Counter + log Logger +} + +func (plug *testOutputHandlerReflect) Init(ctx context.Context, fbit *Fluentbit) error { + plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "gstdout") + plug.param = fbit.Conf.String("param") + plug.log = fbit.Logger + + return nil +} + +func (plug *testOutputHandlerReflect) Flush(ctx context.Context, ch <-chan Message) error { + // Iterate Records + count := 0 + printout := bytes.NewBuffer([]byte{}) + + for { + select { + case msg := <-ch: + rec := reflect.ValueOf(msg.Record) + printout.WriteString("[{") + if rec.Kind() == reflect.Map { + keyCount := 0 + for _, key := range rec.MapKeys() { + if keyCount > 0 { + printout.WriteString(", ") + } + strct := rec.MapIndex(key) + printout.WriteString( + fmt.Sprintf("\"%s\":\"%v\"", + key.Interface(), strct.Interface())) + keyCount++ + } + } + printout.WriteString("}]") + count++ + case <-ctx.Done(): + if count <= 0 { + return fmt.Errorf("no records flushed") + } + fmt.Print(printout.String()) + return nil + } + } +} + +type testOutputHandlerMapString struct { + param string + flushCounter metric.Counter + log Logger +} + +func (plug *testOutputHandlerMapString) Init(ctx context.Context, fbit *Fluentbit) error { + plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "gstdout") + plug.param = fbit.Conf.String("param") + plug.log = fbit.Logger + + return nil +} + +func (plug *testOutputHandlerMapString) Flush(ctx context.Context, ch <-chan Message) error { + count := 0 + printout := bytes.NewBuffer([]byte{}) + + for { + select { + case msg := <-ch: + printout.WriteString("[{") + keyCount := 0 + record, ok := msg.Record.(map[string]interface{}) + if !ok { + panic("unable to convert record") + } + for key, value := range record { + if keyCount > 0 { + printout.WriteString(", ") + } + + val, ok := value.(string) + if !ok { + panic("unable to convert value") + } + + printout.WriteString( + fmt.Sprintf("\"%s\":\"%v\"", key, val)) + keyCount++ + } + printout.WriteString("}]") + count++ + case <-ctx.Done(): + if count <= 0 { + return fmt.Errorf("no records flushed") + } + fmt.Print(printout.String()) + return nil + } + } +} + +// TestOutput is a simple output test. It also shows which format of records +// we currently support and how they should be handled. Feel free to use this +// code as an example of how to implement the Flush receive for output plugins. +// +// At the moment all Message.Records will be sent as a `map[string]interface{}`. +// Older plugins will have to do as testOutputHandlerMapString.Flush does +// and cast the actual value as a string. +func TestOutputSimulated(t *testing.T) { + var wg sync.WaitGroup + ctxt, cancel := context.WithCancel(context.Background()) + ch := make(chan Message) + tag := "tag" + + outputReflect := testOutputHandlerReflect{} + + wg.Add(1) + go func(ctxt context.Context, wg *sync.WaitGroup, ch <-chan Message) { + err := outputReflect.Flush(ctxt, ch) + if err != nil { + t.Error(err) + t.Fail() + } + wg.Done() + }(ctxt, &wg, ch) + + ch <- Message{ + Time: time.Now(), + Record: map[string]interface{}{ + "foo": "bar", + "foobar": "1", + }, + tag: &tag, + } + + cancel() + wg.Wait() + wg = sync.WaitGroup{} + ctxt, cancel = context.WithCancel(context.Background()) + + outputMapString := testOutputHandlerMapString{} + + wg.Add(1) + go func(ctxt context.Context, wg *sync.WaitGroup, ch <-chan Message) { + err := outputMapString.Flush(ctxt, ch) + if err != nil { + t.Error(err) + t.Fail() + } + wg.Done() + }(ctxt, &wg, ch) + + ch <- Message{ + Time: time.Now(), + Record: map[string]interface{}{ + "foo": "bar", + "foobar": "1", + }, + tag: &tag, + } + + cancel() + wg.Wait() + close(ch) +} + +func TestOutputFlush(t *testing.T) { + out := testOutputHandlerReflect{} + _ = prepareOutputFlush(&out) + + msg := Message{ + Record: map[interface{}]interface{}{ + "foo": "bar", + "bar": 0, + "foobar": 1.337, + }, + } + + tm := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{tm, msg.Record}) + if err != nil { + t.Error(err) + t.Fail() + } + + if err := pluginFlush("foobar", b); err == output.FLB_ERROR { + t.Error(err) + t.Fail() + } +}