From aeb9a82914cc9a78e477fca96440682d393650d3 Mon Sep 17 00:00:00 2001 From: 0x0eul <0x0eul@gmail.com> Date: Wed, 17 Jul 2024 09:48:42 +0800 Subject: [PATCH] Feat(source): franzKafka support tls --- pkg/source/franz/config.go | 1 + pkg/source/franz/kafka.go | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/pkg/source/franz/config.go b/pkg/source/franz/config.go index 98303465b..6444f4d67 100644 --- a/pkg/source/franz/config.go +++ b/pkg/source/franz/config.go @@ -47,6 +47,7 @@ type Config struct { AutoOffsetReset string `yaml:"autoOffsetReset,omitempty" default:"latest"` SASL franz.SASL `yaml:"sasl,omitempty"` + TLS franz.TLS `yaml:"tls,omitempty"` AddonMeta *bool `yaml:"addonMeta,omitempty" default:"true"` } diff --git a/pkg/source/franz/kafka.go b/pkg/source/franz/kafka.go index e7b042851..662dd52a2 100644 --- a/pkg/source/franz/kafka.go +++ b/pkg/source/franz/kafka.go @@ -18,6 +18,7 @@ package franz import ( "context" + "crypto/tls" "fmt" "github.com/loggie-io/loggie/pkg/core/api" "github.com/loggie-io/loggie/pkg/core/event" @@ -149,6 +150,16 @@ func (k *Source) Start() error { } } + //set tls + if c.TLS.Enabled == true { + var tlsCfg *tls.Config + var err error + if tlsCfg, err = franz.NewTLSConfig(c.TLS.CaCertFiles, c.TLS.ClientCertFile, c.TLS.ClientKeyFile, c.TLS.EndpIdentAlgo == ""); err != nil { + return err + } + opts = append(opts, kgo.DialTLSConfig(tlsCfg)) + } + // new client cl, err := kgo.NewClient(opts...) if err != nil {