Skip to content

Commit

Permalink
output: flush complex nested values, not just map[string]string.
Browse files Browse the repository at this point in the history
send the full nested complex values when flushing to output plugins
in `Message.Record` as a `map[string]interface{}` instead of as a
simple `map[string]string`. This will allow receiving float64 and
other simple types as well as nested records with key/value pairs
and arrays.

tests have also been added for this new functionality.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Apr 5, 2024
1 parent 585f27a commit 0a64f57
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 58 deletions.
123 changes: 65 additions & 58 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ func testFLBPluginInputCallback() ([]byte, error) {
return C.GoBytes(data, C.int(csize)), nil
}

func prepareOutputFlush(output OutputPlugin) error {
theOutput = output
FLBPluginOutputPreRun(0)
return nil
}

// Lock used to synchronize access to theInput variable.
var theInputLock sync.Mutex

Expand Down Expand Up @@ -417,14 +423,15 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
//export FLBPluginFlush
//nolint:funlen //ignore length requirement for this function
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
var err error

initWG.Wait()

if theOutput == nil {
fmt.Fprintf(os.Stderr, "no output registered\n")
return output.FLB_RETRY
}

var err error
select {
case <-runCtx.Done():
err = runCtx.Err()
Expand All @@ -438,6 +445,59 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
}

in := C.GoBytes(data, clength)
tag := C.GoString(ctag)

return pluginFlush(tag, in)
}

func decodeEntry(tag string, entry []any) (*Message, error) {
var t time.Time

slice := reflect.ValueOf(entry)
if slice.Kind() != reflect.Slice || slice.Len() < 2 {
return nil, fmt.Errorf("unexpected entry length: %d", slice.Len())
}

ts := slice.Index(0).Interface()
switch ft := ts.(type) {
case bigEndianTime:
t = time.Time(ft)
case []interface{}:
s := reflect.ValueOf(ft)
st := s.Index(0).Interface()
ty, ok := st.(bigEndianTime)
if !ok {
return nil, fmt.Errorf("unable to decode time in record metadata")
}
t = time.Time(ty)
default:
return nil, fmt.Errorf("unexpected entry time type: %T", entry[0])
}

data := slice.Index(1)
recVal, ok := data.Interface().(map[interface{}]interface{})
if !ok {
return nil, nil
}

var rec map[string]interface{}
if d := len(recVal); d != 0 {
rec = make(map[string]interface{}, d)
for k, v := range recVal {
key, ok := k.(string)
if !ok {
return nil, fmt.Errorf("unexpected record key type: %T", k)
}
rec[key] = v
}
}

return &Message{Time: t, Record: rec, tag: &tag}, nil
}

func pluginFlush(tag string, in []byte) int {
var err error

h := &codec.MsgpackHandle{}
err = h.SetBytesExt(reflect.TypeOf(bigEndianTime{}), 0, &bigEndianTime{})
if err != nil {
Expand Down Expand Up @@ -471,66 +531,13 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
return output.FLB_ERROR
}

slice := reflect.ValueOf(entry)
if slice.Kind() != reflect.Slice || slice.Len() < 2 {
fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", slice.Len())
return output.FLB_ERROR
}

var t time.Time
ts := slice.Index(0).Interface()
switch ft := ts.(type) {
case bigEndianTime:
t = time.Time(ft)
case []interface{}:
s := reflect.ValueOf(ft)
st := s.Index(0).Interface()
ty := st.(bigEndianTime)
t = time.Time(ty)
default:
fmt.Fprintf(os.Stderr, "unexpected entry time type: %T\n", entry[0])
msg, err := decodeEntry(tag, entry)
if err != nil {
fmt.Fprintf(os.Stderr, "decode error: %s\n", err)
return output.FLB_ERROR
}

data := slice.Index(1)
recVal := data.Interface().(map[interface{}]interface{})

var rec map[string]string
if d := len(recVal); d != 0 {
rec = make(map[string]string, d)
for k, v := range recVal {
key, ok := k.(string)
if !ok {
fmt.Fprintf(os.Stderr, "unexpected record key type: %T\n", k)
return output.FLB_ERROR
}

var val string
switch tv := v.(type) {
case []uint8:
val = string(tv)
case uint64:
val = strconv.FormatUint(tv, 10)
case int64:
val = strconv.FormatInt(tv, 10)
case bool:
val = strconv.FormatBool(tv)
default:
fmt.Fprintf(os.Stderr, "unexpected record value type: %T\n", v)
return output.FLB_ERROR
}

rec[key] = val
}
}

tag := C.GoString(ctag)
// C.free(unsafe.Pointer(ctag))

theChannel <- Message{Time: t, Record: rec, tag: &tag}

// C.free(data)
// C.free(unsafe.Pointer(&clength))
theChannel <- *msg
}

return output.FLB_OK
Expand Down
196 changes: 196 additions & 0 deletions cshared_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package plugin

import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/calyptia/plugin/input"
"github.com/calyptia/plugin/metric"
"github.com/calyptia/plugin/output"
)

Expand Down Expand Up @@ -427,3 +431,195 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
concurrentCountFinish.Load())
}
}

type testOutputHandlerReflect struct {
param string
flushCounter metric.Counter
log Logger
}

func (plug *testOutputHandlerReflect) Init(ctx context.Context, fbit *Fluentbit) error {
plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "gstdout")
plug.param = fbit.Conf.String("param")
plug.log = fbit.Logger

return nil
}

func (plug *testOutputHandlerReflect) Flush(ctx context.Context, ch <-chan Message) error {
// Iterate Records
count := 0
printout := bytes.NewBuffer([]byte{})

for {
select {
case msg := <-ch:
rec := reflect.ValueOf(msg.Record)
printout.WriteString("[{")
if rec.Kind() == reflect.Map {
keyCount := 0
for _, key := range rec.MapKeys() {
if keyCount > 0 {
printout.WriteString(", ")
}
strct := rec.MapIndex(key)
printout.WriteString(
fmt.Sprintf("\"%s\":\"%v\"",
key.Interface(), strct.Interface()))
keyCount++
}
}
printout.WriteString("}]")
count++
case <-ctx.Done():
if count <= 0 {
return fmt.Errorf("no records flushed")
}
fmt.Print(printout.String())
return nil
}
}
}

type testOutputHandlerMapString struct {
param string
flushCounter metric.Counter
log Logger
}

func (plug *testOutputHandlerMapString) Init(ctx context.Context, fbit *Fluentbit) error {
plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "gstdout")
plug.param = fbit.Conf.String("param")
plug.log = fbit.Logger

return nil
}

func (plug *testOutputHandlerMapString) Flush(ctx context.Context, ch <-chan Message) error {
count := 0
printout := bytes.NewBuffer([]byte{})

for {
select {
case msg := <-ch:
printout.WriteString("[{")
keyCount := 0
record, ok := msg.Record.(map[string]interface{})
if !ok {
panic("unable to convert record")
}
for key, value := range record {
if keyCount > 0 {
printout.WriteString(", ")
}

val, ok := value.(string)
if !ok {
panic("unable to convert value")
}

printout.WriteString(
fmt.Sprintf("\"%s\":\"%v\"", key, val))
keyCount++
}
printout.WriteString("}]")
count++
case <-ctx.Done():
if count <= 0 {
return fmt.Errorf("no records flushed")
}
fmt.Print(printout.String())
return nil
}
}
}

// TestOutput is a simple output test. It also shows which format of records
// we currently support and how they should be handled. Feel free to use this
// code as an example of how to implement the Flush receive for output plugins.
//
// At the moment all Message.Records will be sent as a `map[string]interface{}`.
// Older plugins will have to do as testOutputHandlerMapString.Flush does
// and cast the actual value as a string.
func TestOutputSimulated(t *testing.T) {
var wg sync.WaitGroup
ctxt, cancel := context.WithCancel(context.Background())
ch := make(chan Message)
tag := "tag"

outputReflect := testOutputHandlerReflect{}

wg.Add(1)
go func(ctxt context.Context, wg *sync.WaitGroup, ch <-chan Message) {
err := outputReflect.Flush(ctxt, ch)
if err != nil {
t.Error(err)
t.Fail()
}
wg.Done()
}(ctxt, &wg, ch)

ch <- Message{
Time: time.Now(),
Record: map[string]interface{}{
"foo": "bar",
"foobar": "1",
},
tag: &tag,
}

cancel()
wg.Wait()
wg = sync.WaitGroup{}
ctxt, cancel = context.WithCancel(context.Background())

outputMapString := testOutputHandlerMapString{}

wg.Add(1)
go func(ctxt context.Context, wg *sync.WaitGroup, ch <-chan Message) {
err := outputMapString.Flush(ctxt, ch)
if err != nil {
t.Error(err)
t.Fail()
}
wg.Done()
}(ctxt, &wg, ch)

ch <- Message{
Time: time.Now(),
Record: map[string]interface{}{
"foo": "bar",
"foobar": "1",
},
tag: &tag,
}

cancel()
wg.Wait()
close(ch)
}

func TestOutputFlush(t *testing.T) {
out := testOutputHandlerReflect{}
_ = prepareOutputFlush(&out)

msg := Message{
Record: map[interface{}]interface{}{
"foo": "bar",
"bar": 0,
"foobar": 1.337,
},
}

tm := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{tm, msg.Record})
if err != nil {
t.Error(err)
t.Fail()
}

if err := pluginFlush("foobar", b); err == output.FLB_ERROR {
t.Error(err)
t.Fail()
}
}

0 comments on commit 0a64f57

Please sign in to comment.