Skip to content

Commit

Permalink
build: implement kafka health check. (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
mo3et authored Nov 15, 2024
1 parent 45a1486 commit ca13221
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion mq/kafka/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package kafka
import (
"context"
"fmt"

"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
)

func Check(ctx context.Context, conf *Config, topics []string) error {
func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
Expand Down Expand Up @@ -49,3 +50,30 @@ func Check(ctx context.Context, conf *Config, topics []string) error {
}
return nil
}

func CheckHealth(ctx context.Context, conf *Config) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
}
cli, err := sarama.NewClient(conf.Addr, kfk)
if err != nil {
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
}
defer cli.Close()

// Get broker list
brokers := cli.Brokers()
if len(brokers) == 0 {
return errs.New("no brokers found").Wrap()
}

// Check if all brokers are reachable
for _, broker := range brokers {
if err := broker.Open(kfk); err != nil {
return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
}
}

return nil
}

0 comments on commit ca13221

Please sign in to comment.