-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathyclogging.go
115 lines (95 loc) · 3.15 KB
/
yclogging.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"C"
"fmt"
"unsafe"
"github.com/fluent/fluent-bit-go/output"
ycsdk "github.com/yandex-cloud/go-sdk"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/yandex-cloud/fluent-bit-plugin-yandex/v2/config"
"github.com/yandex-cloud/fluent-bit-plugin-yandex/v2/metadata"
plugin2 "github.com/yandex-cloud/fluent-bit-plugin-yandex/v2/plugin"
"github.com/yandex-cloud/fluent-bit-plugin-yandex/v2/yclient"
)
//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
fmt.Println("yc-logging: registering")
return output.FLBPluginRegister(def, "yc-logging", "Yandex Cloud Logging output")
}
//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
fmt.Println("yc-logging: init")
getConfigValue := func(key string) string {
return config.GetKey(plugin, key)
}
metadataProvider := metadata.NewCachingProvider(ycsdk.InstanceMetadataAddr)
destination, err := config.GetDestination(getConfigValue, metadataProvider)
if err != nil {
return output.FLB_ERROR
}
defaults, err := config.GetDefaults(getConfigValue, metadataProvider)
if err != nil {
return output.FLB_ERROR
}
authorization, err := config.GetAuthorization(getConfigValue, metadataProvider)
if err != nil {
return output.FLB_ERROR
}
endpoint := config.GetEndpoint(getConfigValue)
CAFileName := config.GetCAFileName(getConfigValue)
ingestionClient, err := yclient.New(destination, defaults, authorization, endpoint, CAFileName)
if err != nil {
return output.FLB_ERROR
}
impl, err := plugin2.New(getConfigValue, metadataProvider, ingestionClient)
if err != nil {
fmt.Printf("yc-logging: init err: %s\n", err.Error())
return output.FLB_ERROR
}
output.FLBPluginSetContext(plugin, impl)
return output.FLB_OK
}
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
tagStr := C.GoString(tag)
plugin := output.FLBPluginGetContext(ctx).(*plugin2.Plugin)
dec := output.NewDecoder(data, int(length))
provider := func() (ret int, ts interface{}, rec map[interface{}]interface{}) {
return output.GetRecord(dec)
}
resourceToEntries := plugin.Transform(provider, tagStr)
results, resCount := plugin.WriteAll(resourceToEntries)
for i := 0; i < resCount; i++ {
err := <-results
if err == nil {
continue
}
code := status.Code(err)
switch code {
case codes.PermissionDenied:
// kick client reinit
fmt.Printf("yc-logging: reinit on write error %s: %q\n", code.String(), err.Error())
if initErr := plugin.InitClient(); initErr != nil {
fmt.Printf("yc-logging: reinit failed: %q\n", initErr.Error())
} else {
fmt.Printf("yc-logging: reinit succeded\n")
}
return output.FLB_RETRY
case codes.ResourceExhausted, codes.FailedPrecondition, codes.Unavailable,
codes.Canceled, codes.DeadlineExceeded:
fmt.Printf("yc-logging: write retriable error %s: %q\n", code.String(), err.Error())
return output.FLB_RETRY
default:
fmt.Printf("yc-logging: write failed %s: %q\n", code.String(), err.Error())
return output.FLB_ERROR
}
}
return output.FLB_OK
}
//export FLBPluginExit
func FLBPluginExit() int {
return output.FLB_OK
}
func main() {
}