diff --git a/cmd/livepeer/starter/kafka.go b/cmd/livepeer/starter/kafka.go index 880b04f1f..f2fbdffac 100644 --- a/cmd/livepeer/starter/kafka.go +++ b/cmd/livepeer/starter/kafka.go @@ -16,11 +16,16 @@ func startKafkaProducer(cfg LivepeerConfig) error { broadcasterEthAddress = *cfg.EthAcctAddr } + var kafkaProducerConfig = lpmon.KafkaProducerConfig{ + BootstrapServers: *cfg.KafkaBootstrapServers, + Username: *cfg.KafkaUsername, + Password: *cfg.KafkaPassword, + GatewayTopic: *cfg.KafkaGatewayTopic, + GatewayHost: *cfg.GatewayHost, + BroadcasterEthAddress: broadcasterEthAddress, + } + return lpmon.InitKafkaProducer( - *cfg.KafkaBootstrapServers, - *cfg.KafkaUsername, - *cfg.KafkaPassword, - *cfg.KafkaGatewayTopic, - broadcasterEthAddress, + kafkaProducerConfig, ) } diff --git a/monitor/kafka.go b/monitor/kafka.go index bf31d293b..ef0788936 100644 --- a/monitor/kafka.go +++ b/monitor/kafka.go @@ -25,14 +25,25 @@ type KafkaProducer struct { topic string events chan GatewayEvent gatewayAddress string + gatewayHost string +} + +type KafkaProducerConfig struct { + BootstrapServers string + Username string + Password string + GatewayTopic string + GatewayHost string + BroadcasterEthAddress string } type GatewayEvent struct { - ID *string `json:"id,omitempty"` - Type *string `json:"type"` - Timestamp *string `json:"timestamp"` - Gateway *string `json:"gateway,omitempty"` - Data interface{} `json:"data"` + ID *string `json:"id,omitempty"` + Type *string `json:"type"` + Timestamp *string `json:"timestamp"` + Gateway *string `json:"gateway,omitempty"` + GatewayHost *string `json:"gateway_host,omitempty"` + Data interface{} `json:"data"` } type PipelineStatus struct { @@ -54,8 +65,8 @@ type PipelineStatus struct { var kafkaProducer *KafkaProducer -func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) error { - producer, err := newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress) +func InitKafkaProducer(config KafkaProducerConfig) error { + producer, err := newKafkaProducer(config) if err != nil { return err } @@ -64,12 +75,12 @@ func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress s return nil } -func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) (*KafkaProducer, error) { +func newKafkaProducer(config KafkaProducerConfig) (*KafkaProducer, error) { dialer := &kafka.Dialer{ Timeout: KafkaRequestTimeout, SASLMechanism: plain.Mechanism{ - Username: user, - Password: password, + Username: config.Username, + Password: config.Password, }, DualStack: true, TLS: &tls.Config{ @@ -78,17 +89,18 @@ func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress st } writer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{bootstrapServers}, - Topic: topic, + Brokers: []string{config.BootstrapServers}, + Topic: config.GatewayTopic, Balancer: kafka.CRC32Balancer{}, Dialer: dialer, }) return &KafkaProducer{ writer: writer, - topic: topic, + topic: config.GatewayTopic, events: make(chan GatewayEvent, KafkaChannelSize), - gatewayAddress: gatewayAddress, + gatewayAddress: config.BroadcasterEthAddress, + gatewayHost: config.GatewayHost, }, nil } @@ -153,11 +165,12 @@ func SendQueueEventAsync(eventType string, data interface{}) { timestampMs := time.Now().UnixMilli() event := GatewayEvent{ - ID: stringPtr(randomID), - Gateway: stringPtr(kafkaProducer.gatewayAddress), - Type: &eventType, - Timestamp: stringPtr(fmt.Sprint(timestampMs)), - Data: data, + ID: stringPtr(randomID), + Gateway: stringPtr(kafkaProducer.gatewayAddress), + GatewayHost: stringPtr(kafkaProducer.gatewayHost), + Type: &eventType, + Timestamp: stringPtr(fmt.Sprint(timestampMs)), + Data: data, } select {