diff --git a/common/sender/factory.go b/common/sender/factory.go index 44f69ea..19f2e1b 100644 --- a/common/sender/factory.go +++ b/common/sender/factory.go @@ -8,7 +8,8 @@ func CreateBrokerSender( rabbitMQServer string, exchange string, routeKey string, - writeTimeout time.Duration) Sender { + writeTimeout time.Duration, +) Sender { rabbitmqTarget := RabbitMQTarget{ Server: rabbitMQServer, Exchange: exchange, @@ -16,13 +17,13 @@ func CreateBrokerSender( WriteTimeout: writeTimeout, } - brokerSender := BrokerSender{ + currentSender := BrokerSender{ BrokerAddress: brokerAddress, BrokerTryNo: brokerTryNo, Target: rabbitmqTarget, } - return &brokerSender + return ¤tSender } func CreateRabbitMQSender( @@ -30,17 +31,36 @@ func CreateRabbitMQSender( exchange string, routeKey string, writeTimeout time.Duration) Sender { - rabbitmqTarget := RabbitMQTarget{ + target := RabbitMQTarget{ Server: server, Exchange: exchange, RouteKey: routeKey, WriteTimeout: writeTimeout, } - rabbitSender := RabbitMQSender{ - Target: rabbitmqTarget, + currentSender := RabbitMQSender{ + Target: target, + Debug: true, + } + + return ¤tSender +} + +func CreateKafkaSender( + brokers []string, + topic string, + writeTimeout time.Duration, +) Sender { + target := KafkaTarget{ + Brokers: brokers, + Topic: topic, + WriteTimeout: writeTimeout, + } + + currentSender := KafkaSender{ + Target: target, Debug: true, } - return &rabbitSender + return ¤tSender } diff --git a/common/sender/kafka.go b/common/sender/kafka.go index 3c3ee96..23485d1 100644 --- a/common/sender/kafka.go +++ b/common/sender/kafka.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/segmentio/kafka-go" - log "github.com/sirupsen/logrus" "time" ) @@ -20,8 +19,6 @@ type KafkaSender struct { } func (s *KafkaSender) SendMessage(message []byte) error { - log.Debug("creating writer...") - w := kafka.Writer{ Addr: kafka.TCP(s.Target.Brokers...), Topic: s.Target.Topic, @@ -29,17 +26,6 @@ func (s *KafkaSender) SendMessage(message []byte) error { WriteTimeout: s.Target.WriteTimeout, } - //w := kafka.NewWriter(kafka.WriterConfig{ - // Brokers: s.Target.Brokers, - // Topic: s.Target.Topic, - // Balancer: &kafka.LeastBytes{}, - // WriteTimeout: s.Target.WriteTimeout, - //}) - - //log.Debug("creating writer...done") - // - //log.Debug("sending message...") - err := w.WriteMessages(context.Background(), kafka.Message{ Value: message, @@ -50,12 +36,7 @@ func (s *KafkaSender) SendMessage(message []byte) error { return fmt.Errorf("send message failed: %s", err) } - //log.Info("sending message...done") - //log.Debug("closing writer...") - w.Close() - //log.Debug("closing writer...done") - return nil } diff --git a/test/client_to_kafka/main.go b/test/client_to_kafka/main.go index b000fb8..c7ef2d1 100644 --- a/test/client_to_kafka/main.go +++ b/test/client_to_kafka/main.go @@ -27,21 +27,21 @@ func init() { &brokerServers, "brokers", []string{}, - "brokers", + "kafka brokers", ) rootCmd.Flags().IntVar( &workerCount, "worker-count", 1, - "count of worker to send message", + "count of worker to send messages", ) rootCmd.Flags().StringVar( &logDirectory, "log-dir", "", - "log director", + "log directory", ) rootCmd.MarkFlagRequired("brokers") rootCmd.MarkFlagRequired("log-dir") @@ -63,7 +63,7 @@ var rootCmd = &cobra.Command{ defer logFile.Close() c := time.Tick(1 * time.Second) for _ = range c { - SendMessage( + sendMessage( index, workerLog, ) @@ -85,7 +85,7 @@ const ( writeTimeOut = 2 * time.Second ) -func SendMessage(index int, workerLog *log.Logger) { +func sendMessage(index int, workerLog *log.Logger) { data, err := common.CreateEcflowClientMessage("--init=31134") message := common.EventMessage{ App: "nwpc-message-client", @@ -95,21 +95,10 @@ func SendMessage(index int, workerLog *log.Logger) { } messageBytes, _ := json.Marshal(message) - //log.WithFields(log.Fields{ - // "index": index, - //}).Infof("sending message...") - target := sender.KafkaTarget{ - Brokers: brokerServers, - Topic: topic, - WriteTimeout: writeTimeOut, - } - rabbitSender := sender.KafkaSender{ - Target: target, - Debug: true, - } + currentSender := sender.CreateKafkaSender(brokerServers, topic, writeTimeOut) - err = rabbitSender.SendMessage(messageBytes) + err = currentSender.SendMessage(messageBytes) if err != nil { workerLog.WithFields(log.Fields{ "index": index,