Skip to content

Commit

Permalink
gateway: status: add gatewayhost to event wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
gioelecerati committed Dec 17, 2024
1 parent 3ac5972 commit a0f2725
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
15 changes: 10 additions & 5 deletions cmd/livepeer/starter/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ func startKafkaProducer(cfg LivepeerConfig) error {
broadcasterEthAddress = *cfg.EthAcctAddr
}

var kafkaProducerConfig = lpmon.KafkaProducerConfig{
BootstrapServers: *cfg.KafkaBootstrapServers,
Username: *cfg.KafkaUsername,
Password: *cfg.KafkaPassword,
GatewayTopic: *cfg.KafkaGatewayTopic,
GatewayHost: *cfg.GatewayHost,
BroadcasterEthAddress: broadcasterEthAddress,
}

Check warning on line 27 in cmd/livepeer/starter/kafka.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/kafka.go#L19-L27

Added lines #L19 - L27 were not covered by tests
return lpmon.InitKafkaProducer(
*cfg.KafkaBootstrapServers,
*cfg.KafkaUsername,
*cfg.KafkaPassword,
*cfg.KafkaGatewayTopic,
broadcasterEthAddress,
kafkaProducerConfig,

Check warning on line 29 in cmd/livepeer/starter/kafka.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/kafka.go#L29

Added line #L29 was not covered by tests
)
}
51 changes: 32 additions & 19 deletions monitor/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,25 @@ type KafkaProducer struct {
topic string
events chan GatewayEvent
gatewayAddress string
gatewayHost string
}

type KafkaProducerConfig struct {
BootstrapServers string
Username string
Password string
GatewayTopic string
GatewayHost string
BroadcasterEthAddress string
}

type GatewayEvent struct {
ID *string `json:"id,omitempty"`
Type *string `json:"type"`
Timestamp *string `json:"timestamp"`
Gateway *string `json:"gateway,omitempty"`
Data interface{} `json:"data"`
ID *string `json:"id,omitempty"`
Type *string `json:"type"`
Timestamp *string `json:"timestamp"`
Gateway *string `json:"gateway,omitempty"`
GatewayHost *string `json:"gateway_host,omitempty"`
Data interface{} `json:"data"`
}

type PipelineStatus struct {
Expand All @@ -54,8 +65,8 @@ type PipelineStatus struct {

var kafkaProducer *KafkaProducer

func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) error {
producer, err := newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress)
func InitKafkaProducer(config KafkaProducerConfig) error {
producer, err := newKafkaProducer(config)

Check warning on line 69 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L68-L69

Added lines #L68 - L69 were not covered by tests
if err != nil {
return err
}
Expand All @@ -64,12 +75,12 @@ func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress s
return nil
}

func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) (*KafkaProducer, error) {
func newKafkaProducer(config KafkaProducerConfig) (*KafkaProducer, error) {

Check warning on line 78 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L78

Added line #L78 was not covered by tests
dialer := &kafka.Dialer{
Timeout: KafkaRequestTimeout,
SASLMechanism: plain.Mechanism{
Username: user,
Password: password,
Username: config.Username,
Password: config.Password,

Check warning on line 83 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L82-L83

Added lines #L82 - L83 were not covered by tests
},
DualStack: true,
TLS: &tls.Config{
Expand All @@ -78,17 +89,18 @@ func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress st
}

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{bootstrapServers},
Topic: topic,
Brokers: []string{config.BootstrapServers},
Topic: config.GatewayTopic,

Check warning on line 93 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L92-L93

Added lines #L92 - L93 were not covered by tests
Balancer: kafka.CRC32Balancer{},
Dialer: dialer,
})

return &KafkaProducer{
writer: writer,
topic: topic,
topic: config.GatewayTopic,

Check warning on line 100 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L100

Added line #L100 was not covered by tests
events: make(chan GatewayEvent, KafkaChannelSize),
gatewayAddress: gatewayAddress,
gatewayAddress: config.BroadcasterEthAddress,
gatewayHost: config.GatewayHost,

Check warning on line 103 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}, nil
}

Expand Down Expand Up @@ -153,11 +165,12 @@ func SendQueueEventAsync(eventType string, data interface{}) {
timestampMs := time.Now().UnixMilli()

event := GatewayEvent{
ID: stringPtr(randomID),
Gateway: stringPtr(kafkaProducer.gatewayAddress),
Type: &eventType,
Timestamp: stringPtr(fmt.Sprint(timestampMs)),
Data: data,
ID: stringPtr(randomID),
Gateway: stringPtr(kafkaProducer.gatewayAddress),
GatewayHost: stringPtr(kafkaProducer.gatewayHost),
Type: &eventType,
Timestamp: stringPtr(fmt.Sprint(timestampMs)),
Data: data,

Check warning on line 173 in monitor/kafka.go

View check run for this annotation

Codecov / codecov/patch

monitor/kafka.go#L168-L173

Added lines #L168 - L173 were not covered by tests
}

select {
Expand Down

0 comments on commit a0f2725

Please sign in to comment.