Skip to content

Commit

Permalink
feat: nats core and jetstream upgrade are completed
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Oct 2, 2024
1 parent 7782b48 commit e7ea761
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 18 deletions.
65 changes: 64 additions & 1 deletion internal/client/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ type Client struct {

// New initializes NATS connection.
func New(config Config, logger *zap.Logger) *Client {
conn := "jetstream"
if !config.IsJetstream {
conn = "core"
}

client := &Client{
jetstream: nil,
connection: nil,

config: config,

logger: logger,
metrics: NewMetrics(),
metrics: NewMetrics(conn),
}

client.connect()
Expand Down Expand Up @@ -97,10 +102,12 @@ func (client *Client) UpdateOrCreateStream(ctx context.Context) {
info, err := stream.Info(ctx)
if err == nil {
client.updateStream(ctx, client.config.Streams[i], info)

return
}
} else if errors.Is(err, jetstream.ErrStreamNotFound) && client.config.NewStreamAllow {
client.createStream(ctx, client.config.Streams[i])

return
}

Expand Down Expand Up @@ -148,6 +155,11 @@ func (client *Client) StartBlackboxTest(ctx context.Context) {
go client.jetstreamPublish(ctx, stream.Subject, stream.Name)
go client.jetstreamSubscribe(messageChannel, stream.Name)
}
} else {
for _, stream := range client.config.Streams {
go client.coreSubscribe(stream.Subject)
go client.corePublish(stream.Subject)
}
}
}

Expand Down Expand Up @@ -210,6 +222,42 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) {
}
}

func (client *Client) coreSubscribe(subject string) {
clusterName := client.connection.ConnectedClusterName()

messageHandler, h := client.messageHandlerCoreFactory()

if _, err := client.connection.Subscribe(subject, messageHandler); err != nil {
client.logger.Panic("Consuming failed", zap.Error(err))
}

for msg := range h {
var publishTime time.Time

if err := publishTime.UnmarshalBinary(msg.Data); err != nil {
client.logger.Error("unable to unmarshal binary data for publishTime.")
client.logger.Info("received message but could not calculate latency due to unmarshalling error.",
zap.String("subject", msg.Subject),
)

return
}

latency := time.Since(publishTime).Seconds()

client.metrics.Latency.With(prometheus.Labels{
"cluster": clusterName,
}).Observe(latency)

client.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"cluster": clusterName,
}).Add(1)

client.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (client *Client) corePublish(subject string) {
clusterName := client.connection.ConnectedClusterName()

Expand Down Expand Up @@ -298,6 +346,21 @@ func (client *Client) messageHandlerJetstreamFactory() (jetstream.MessageHandler
}, ch
}

func (client *Client) messageHandlerCoreFactory() (nats.MsgHandler, <-chan *Message) {
ch := make(chan *Message)

return func(msg *nats.Msg) {
ch <- &Message{
Subject: msg.Subject,
Data: msg.Data,
}

if err := msg.Ack(); err != nil {
client.logger.Error("Failed to acknowledge the message", zap.Error(err))
}
}, ch
}

// Close closes NATS connection.
func (client *Client) Close() {
if err := client.connection.FlushTimeout(client.config.FlushTimeout); err != nil {
Expand Down
40 changes: 23 additions & 17 deletions internal/client/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom
}

// nolint: funlen
func NewMetrics() Metrics {
func NewMetrics(conn string) Metrics {
latencyBuckets := []float64{
0.001,
0.0015,
Expand Down Expand Up @@ -97,27 +97,33 @@ func NewMetrics() Metrics {

return Metrics{
Connection: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: nil,
Namespace: Namespace,
Subsystem: Subsystem,
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: prometheus.Labels{
"conn": conn,
},
}, []string{"type", "cluster"}),
// nolint: exhaustruct
Latency: newHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: nil,
Buckets: latencyBuckets,
Namespace: Namespace,
Subsystem: Subsystem,
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: prometheus.Labels{
"conn": conn,
},
Buckets: latencyBuckets,
}, []string{"stream", "cluster"}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: nil,
Namespace: Namespace,
Subsystem: Subsystem,
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: prometheus.Labels{
"conn": conn,
},
}, []string{"type", "stream", "cluster"}),
}
}

0 comments on commit e7ea761

Please sign in to comment.