-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.go
102 lines (93 loc) · 2.27 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package easykafka
import (
"github.com/segmentio/kafka-go"
"net"
"regexp"
"strconv"
)
// strP returns pointer to string
func strP(s string) *string {
return &s
}
// intP returns pointer to int
func intP(i int) *int {
return &i
}
// mustConnect connects to kafka
func mustConnect(brokers []string) *kafka.Conn {
conn, err := kafka.Dial("tcp", brokers[0])
if err != nil {
panic(err)
}
return conn
}
// getLeaderConn returns connection to leader
func getLeaderConn(conn *kafka.Conn) *kafka.Conn {
controller, err := conn.Controller()
if err != nil {
panic(err)
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err)
}
return controllerConn
}
// matchTopicsFromConnectionByRegex matches topics from partitions
func matchTopicsFromConnectionByRegex(conn *kafka.Conn, patterns ...*regexp.Regexp) []string {
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err)
}
var matchingTopics []string
topicSet := make(map[string]struct{})
for _, partition := range partitions {
for _, re := range patterns {
if re.MatchString(partition.Topic) {
if _, ok := topicSet[partition.Topic]; !ok {
topicSet[partition.Topic] = struct{}{}
matchingTopics = append(matchingTopics, partition.Topic)
}
}
}
}
return matchingTopics
}
// matchTopicsFromConnection matches topics from partitions
func matchTopicsFromConnection(conn *kafka.Conn, topics ...string) []string {
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err)
}
var matchingTopics []string
topicSet := make(map[string]struct{})
for _, partition := range partitions {
for _, t := range topics {
if partition.Topic == t {
if _, ok := topicSet[partition.Topic]; !ok {
topicSet[partition.Topic] = struct{}{}
matchingTopics = append(matchingTopics, partition.Topic)
}
}
}
}
return matchingTopics
}
// scrapTopicsFromMessages scraps topics from messages
func scrapTopicsFromMessages(messages []*kafka.Message) []string {
var topics []string
for _, m := range messages {
var found bool
for _, t := range topics {
if t == m.Topic {
found = true
}
}
if found {
continue
}
topics = append(topics, m.Topic)
}
return topics
}