Skip to content

Commit

Permalink
feat: add endpoint metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanViast committed Oct 17, 2024
1 parent ac6a86e commit 682cec9
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 43 deletions.
60 changes: 60 additions & 0 deletions pkg/filters/proxies/providerproxy/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package providerproxy

import (
"net/http"
"strconv"
"time"

"github.com/megaease/easegress/v2/pkg/util/prometheushelper"
"github.com/prometheus/client_golang/prometheus"
)

type (
metrics struct {
TotalRequests *prometheus.CounterVec
RequestsDuration prometheus.ObserverVec
}
)

func (m *ProviderProxy) newMetrics() *metrics {

commonLabels := prometheus.Labels{
"pipelineName": m.Name(),
"kind": Kind,
"clusterName": m.spec.Super().Options().ClusterName,
"clusterRole": m.spec.Super().Options().ClusterRole,
"instanceName": m.spec.Super().Options().Name,
}
prometheusLabels := []string{
"clusterName", "clusterRole", "instanceName", "pipelineName", "kind",
"policy", "statusCode", "provider",
}

return &metrics{
TotalRequests: prometheushelper.NewCounter(
"providerproxy_total_requests",
"the total count of http requests", prometheusLabels).MustCurryWith(commonLabels),
RequestsDuration: prometheushelper.NewHistogram(
prometheus.HistogramOpts{
Name: "providerproxy_requests_duration",
Help: "request processing duration histogram of a backend",
Buckets: prometheushelper.DefaultDurationBuckets(),
}, prometheusLabels).MustCurryWith(commonLabels),
}
}

type RequestStat struct {
StatusCode int // e.g. 200
Duration time.Duration
Method *string // rpc provider method e.g. eth_blockNumber
}

func (m *ProviderProxy) collectMetrics(providerUrl string, response *http.Response) {
labels := prometheus.Labels{
"policy": m.spec.Policy,
"statusCode": strconv.Itoa(response.StatusCode),
"provider": providerUrl,
}

m.metrics.TotalRequests.With(labels).Inc()
}
61 changes: 43 additions & 18 deletions pkg/filters/proxies/providerproxy/providerproxy.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
/*
* Copyright (c) 2017, The Easegress Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package providerproxy

import (
"errors"
"math/rand"
"net/http"
"net/url"

"github.com/megaease/easegress/v2/pkg/context"
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/filters/proxies/providerproxy/selector"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
)

const (
Expand All @@ -19,9 +37,11 @@ const (

type (
ProviderProxy struct {
super *supervisor.Supervisor
spec *Spec
client *http.Client
providerSelector *ProviderSelector
providerSelector selector.ProviderSelector
metrics *metrics
}

Spec struct {
Expand All @@ -30,16 +50,11 @@ type (
Urls []string `yaml:"urls"`
Interval string `yaml:"interval,omitempty" jsonschema:"format=duration"`
Lag uint64 `yaml:"lag,omitempty" jsonschema:"default=100"`
Policy string `yaml:"policy,omitempty" jsonschema:"default=roundRobin"`
}
)

func (m *ProviderProxy) SelectNode() (*url.URL, error) {
if m.providerSelector == nil {
urls := m.spec.Urls
randomIndex := rand.Intn(len(urls))
rpcUrl := urls[randomIndex]
return url.Parse(rpcUrl)
}
rpcUrl, err := m.providerSelector.ChooseServer()
if err != nil {
return nil, err
Expand All @@ -48,6 +63,7 @@ func (m *ProviderProxy) SelectNode() (*url.URL, error) {
}

func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {

reqUrl, err := m.SelectNode()
if err != nil {
logger.Errorf(err.Error())
Expand All @@ -57,11 +73,18 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
logger.Infof("select rpc provider: %s", reqUrl.String())
req := ctx.GetInputRequest().(*httpprot.Request)
forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload())
if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

for key := range req.HTTPHeader() {
forwardReq.Header.Add(key, req.HTTPHeader().Get(key))
}

response, err := m.client.Do(forwardReq)
defer m.collectMetrics(reqUrl.String(), response)

if err != nil {
logger.Errorf(err.Error())
return err.Error()
Expand All @@ -73,7 +96,7 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
return err.Error()
}

if err = outputResponse.FetchPayload(1024 * 1024); err != nil {
if err = outputResponse.FetchPayload(-1); err != nil {
logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err)
return err.Error()
}
Expand All @@ -90,12 +113,14 @@ var kind = &filters.Kind{
return &Spec{
Urls: make([]string, 0),
Interval: "1s",
Policy: "roundRobin",
}
},
CreateInstance: func(spec filters.Spec) filters.Filter {
providerSpec := spec.(*Spec)
return &ProviderProxy{
spec: providerSpec,
super: spec.Super(),
client: http.DefaultClient,
}
},
Expand Down Expand Up @@ -130,16 +155,16 @@ func (m *ProviderProxy) reload() {
client := http.DefaultClient
m.client = client

if len(m.spec.Urls) > 1 {
providerSelectorSpec := ProviderSelectorSpec{
Urls: m.spec.Urls,
Interval: m.spec.Interval,
Lag: m.spec.Lag,
}

providerSelector := NewProviderSelector(providerSelectorSpec)
m.providerSelector = &providerSelector
providerSelectorSpec := selector.ProviderSelectorSpec{
Urls: m.spec.Urls,
Interval: m.spec.Interval,
Lag: m.spec.Lag,
}

m.metrics = m.newMetrics()

providerSelector := selector.CreateProviderSelectorByPolicy(m.spec.Policy, providerSelectorSpec, m.super)
m.providerSelector = providerSelector
}

// Status returns status.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
package providerproxy
/*
* Copyright (c) 2017, The Easegress Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package selector

import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/prometheushelper"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)

type ProviderSelectorSpec struct {
Urls []string `json:"urls"`
Interval string `json:"interval,omitempty" jsonschema:"format=duration"`
Lag uint64 `json:"lag,omitempty" jsonschema:"default=100"`
}

// GetInterval returns the interval duration.
func (ps *ProviderSelectorSpec) GetInterval() time.Duration {
interval, _ := time.ParseDuration(ps.Interval)
if interval <= 0 {
interval = time.Second
}
return interval
}

type ProviderWeight struct {
Url string
BlockNumber uint64
Client *RPCClient
}

type ProviderSelector struct {
type BlockLagProviderSelector struct {
done chan struct{}
providers []ProviderWeight
lag uint64
metrics *metrics
}

func NewProviderSelector(spec ProviderSelectorSpec) ProviderSelector {
func NewBlockLagProviderSelector(spec ProviderSelectorSpec, super *supervisor.Supervisor) ProviderSelector {

providers := make([]ProviderWeight, 0)

Expand All @@ -58,10 +63,11 @@ func NewProviderSelector(spec ProviderSelectorSpec) ProviderSelector {
})
}

ps := ProviderSelector{
ps := BlockLagProviderSelector{
done: make(chan struct{}),
providers: providers,
lag: spec.Lag,
metrics: newMetrics(super),
}
ticker := time.NewTicker(intervalDuration)
ps.checkServers()
Expand All @@ -84,8 +90,7 @@ type ProviderBlock struct {
block uint64
}

func (ps ProviderSelector) checkServers() {
log.Println("check block number")
func (ps BlockLagProviderSelector) checkServers() {
eg := new(errgroup.Group)
blockNumberChannel := make(chan ProviderBlock, len(ps.providers))
startTime := time.Now().Local()
Expand Down Expand Up @@ -128,20 +133,28 @@ func (ps ProviderSelector) checkServers() {
for i := 0; i < len(ps.providers); i++ {
blockIndex := <-blockNumberChannel
ps.providers[blockIndex.index].BlockNumber = blockIndex.block
labels := prometheus.Labels{
"provider": ps.providers[blockIndex.index].Url,
}
ps.metrics.ProviderBlockHeight.With(labels).Set(float64(blockIndex.block))
}
logger.Debugf("update block number time: %s", time.Since(startTime))
}

func (ps ProviderSelector) Close() {
func (ps BlockLagProviderSelector) Close() {
close(ps.done)
}

func (ps ProviderSelector) ChooseServer() (string, error) {
func (ps BlockLagProviderSelector) ChooseServer() (string, error) {

if len(ps.providers) == 0 {
return "", fmt.Errorf("no provider available")
}

if len(ps.providers) == 1 {
return ps.providers[0].Url, nil
}

var bestProvider ProviderWeight
for _, provider := range ps.providers {
if provider.BlockNumber == 0 {
Expand All @@ -159,3 +172,27 @@ func (ps ProviderSelector) ChooseServer() (string, error) {

return ps.providers[0].Url, nil
}

type metrics struct {
ProviderBlockHeight *prometheus.GaugeVec
}

func newMetrics(super *supervisor.Supervisor) *metrics {
commonLabels := prometheus.Labels{
"pipelineName": super.Options().Name,
"kind": "BlockLagProviderSelector",
"clusterName": super.Options().ClusterName,
"clusterRole": super.Options().ClusterRole,
"instanceName": super.Options().Name,
}
prometheusLabels := []string{
"clusterName", "clusterRole", "instanceName", "pipelineName", "kind",
"provider",
}

return &metrics{
ProviderBlockHeight: prometheushelper.NewGauge(
"provider_block_height",
"the block height of provider", prometheusLabels).MustCurryWith(commonLabels),
}
}
Loading

0 comments on commit 682cec9

Please sign in to comment.