Skip to content

Commit

Permalink
🎨 add kafka sender factory function. #13
Browse files Browse the repository at this point in the history
  • Loading branch information
perillaroc committed Nov 9, 2020
1 parent 18ada81 commit d95d877
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
34 changes: 27 additions & 7 deletions common/sender/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,59 @@ func CreateBrokerSender(
rabbitMQServer string,
exchange string,
routeKey string,
writeTimeout time.Duration) Sender {
writeTimeout time.Duration,
) Sender {
rabbitmqTarget := RabbitMQTarget{
Server: rabbitMQServer,
Exchange: exchange,
RouteKey: routeKey,
WriteTimeout: writeTimeout,
}

brokerSender := BrokerSender{
currentSender := BrokerSender{
BrokerAddress: brokerAddress,
BrokerTryNo: brokerTryNo,
Target: rabbitmqTarget,
}

return &brokerSender
return &currentSender
}

func CreateRabbitMQSender(
server string,
exchange string,
routeKey string,
writeTimeout time.Duration) Sender {
rabbitmqTarget := RabbitMQTarget{
target := RabbitMQTarget{
Server: server,
Exchange: exchange,
RouteKey: routeKey,
WriteTimeout: writeTimeout,
}

rabbitSender := RabbitMQSender{
Target: rabbitmqTarget,
currentSender := RabbitMQSender{
Target: target,
Debug: true,
}

return &currentSender
}

func CreateKafkaSender(
brokers []string,
topic string,
writeTimeout time.Duration,
) Sender {
target := KafkaTarget{
Brokers: brokers,
Topic: topic,
WriteTimeout: writeTimeout,
}

currentSender := KafkaSender{
Target: target,
Debug: true,
}

return &rabbitSender
return &currentSender
}
19 changes: 0 additions & 19 deletions common/sender/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"time"
)

Expand All @@ -20,26 +19,13 @@ type KafkaSender struct {
}

func (s *KafkaSender) SendMessage(message []byte) error {
log.Debug("creating writer...")

w := kafka.Writer{
Addr: kafka.TCP(s.Target.Brokers...),
Topic: s.Target.Topic,
Balancer: &kafka.LeastBytes{},
WriteTimeout: s.Target.WriteTimeout,
}

//w := kafka.NewWriter(kafka.WriterConfig{
// Brokers: s.Target.Brokers,
// Topic: s.Target.Topic,
// Balancer: &kafka.LeastBytes{},
// WriteTimeout: s.Target.WriteTimeout,
//})

//log.Debug("creating writer...done")
//
//log.Debug("sending message...")

err := w.WriteMessages(context.Background(),
kafka.Message{
Value: message,
Expand All @@ -50,12 +36,7 @@ func (s *KafkaSender) SendMessage(message []byte) error {
return fmt.Errorf("send message failed: %s", err)
}

//log.Info("sending message...done")
//log.Debug("closing writer...")

w.Close()

//log.Debug("closing writer...done")

return nil
}
25 changes: 7 additions & 18 deletions test/client_to_kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ func init() {
&brokerServers,
"brokers",
[]string{},
"brokers",
"kafka brokers",
)

rootCmd.Flags().IntVar(
&workerCount,
"worker-count",
1,
"count of worker to send message",
"count of worker to send messages",
)

rootCmd.Flags().StringVar(
&logDirectory,
"log-dir",
"",
"log director",
"log directory",
)
rootCmd.MarkFlagRequired("brokers")
rootCmd.MarkFlagRequired("log-dir")
Expand All @@ -63,7 +63,7 @@ var rootCmd = &cobra.Command{
defer logFile.Close()
c := time.Tick(1 * time.Second)
for _ = range c {
SendMessage(
sendMessage(
index,
workerLog,
)
Expand All @@ -85,7 +85,7 @@ const (
writeTimeOut = 2 * time.Second
)

func SendMessage(index int, workerLog *log.Logger) {
func sendMessage(index int, workerLog *log.Logger) {
data, err := common.CreateEcflowClientMessage("--init=31134")
message := common.EventMessage{
App: "nwpc-message-client",
Expand All @@ -95,21 +95,10 @@ func SendMessage(index int, workerLog *log.Logger) {
}

messageBytes, _ := json.Marshal(message)
//log.WithFields(log.Fields{
// "index": index,
//}).Infof("sending message...")
target := sender.KafkaTarget{
Brokers: brokerServers,
Topic: topic,
WriteTimeout: writeTimeOut,
}

rabbitSender := sender.KafkaSender{
Target: target,
Debug: true,
}
currentSender := sender.CreateKafkaSender(brokerServers, topic, writeTimeOut)

err = rabbitSender.SendMessage(messageBytes)
err = currentSender.SendMessage(messageBytes)
if err != nil {
workerLog.WithFields(log.Fields{
"index": index,
Expand Down

0 comments on commit d95d877

Please sign in to comment.