Skip to content

Commit

Permalink
Merge pull request #848 from forta-network/kisel/FORTA-1545-refactor-…
Browse files Browse the repository at this point in the history
…docker-resources-polling

Refactor docker resources polling
  • Loading branch information
dkeysil authored Jan 29, 2024
2 parents 3f94683 + 850a0df commit acb524f
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 169 deletions.
2 changes: 1 addition & 1 deletion services/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func GetBotLifecycleComponents(ctx context.Context, botLifeConfig BotLifecycleCo

botClient := containers.NewBotClient(
cfg.Log, cfg.ResourcesConfig, cfg.AdvancedConfig.TokenExchangeURL,
dockerClient, botImageClient,
dockerClient, botImageClient, botLifeConfig.MessageClient,
)
lifecycleMetrics := metrics.NewLifecycleClient(botLifeConfig.MessageClient)
lifecycleMediator := mediator.New(botLifeConfig.MessageClient, lifecycleMetrics)
Expand Down
82 changes: 79 additions & 3 deletions services/components/containers/bot_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/errdefs"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/services/components/metrics"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

Expand All @@ -21,6 +26,8 @@ const (

ImagePullCooldownThreshold = 5
ImagePullCooldownDuration = time.Hour * 4

DockerResourcesPollingInterval = time.Second * 10
)

// BotClient launches a bot.
Expand All @@ -39,13 +46,15 @@ type botClient struct {
tokenExchangeURL string
client clients.DockerClient
botImageClient clients.DockerClient
msgClient clients.MessageClient
}

// NewBotClient creates a new bot client to manage bot containers.
func NewBotClient(
logConfig config.LogConfig, resourcesConfig config.ResourcesConfig,
tokenExchangeURL string,
client clients.DockerClient, botImageClient clients.DockerClient,
msgClient clients.MessageClient,
) *botClient {
botImageClient.SetImagePullCooldown(ImagePullCooldownThreshold, ImagePullCooldownDuration)
return &botClient{
Expand All @@ -54,6 +63,7 @@ func NewBotClient(
tokenExchangeURL: tokenExchangeURL,
client: client,
botImageClient: botImageClient,
msgClient: msgClient,
}
}

Expand Down Expand Up @@ -88,7 +98,6 @@ func (bc *botClient) LaunchBot(ctx context.Context, botConfig config.AgentConfig
switch {
case err == nil:
// do not create a new container - we already have it

case errors.Is(err, docker.ErrContainerNotFound):
// if the bot container doesn't exist, create and start the container
botContainerCfg := NewBotContainerConfig(
Expand All @@ -98,15 +107,69 @@ func (bc *botClient) LaunchBot(ctx context.Context, botConfig config.AgentConfig
if err != nil {
return fmt.Errorf("failed to start bot container: %v", err)
}

default:
return fmt.Errorf("unexpected error while getting the bot container '%s': %v", botConfig.ContainerName(), err)
}

// at this point we have created a new bot container and a new bridge network for the bot
// or found the existing container and the network: it's time to ensure that all service containers
// are reattached to the bot's network
return bc.attachServiceContainers(ctx, botNetworkID)
err = bc.attachServiceContainers(ctx, botNetworkID)
if err != nil {
return fmt.Errorf("failed to attach service containers to the bot network: %v", err)
}

go bc.pollDockerResources(botConfig.ContainerName(), botConfig)

return nil
}

// pollDockerResources polls docker resources for bot container and sends them to the publisher.
func (bc *botClient) pollDockerResources(containerID string, agentConfig config.AgentConfig) {
ctx := context.Background()
ticker := initTicker(DockerResourcesPollingInterval)
defer ticker.Stop()

for t := range ticker.C {
logrus.WithField("containerID", containerID).Debug("polling docker resources")
// request docker stats
resources, err := bc.client.ContainerStats(ctx, containerID)
if errdefs.IsNotFound(err) {
logrus.WithError(err).
WithField("containerID", containerID).
WithField("agentID", agentConfig.ID).
Warn("bot container can't be found, stopping docker resources poller")
return
} else if err != nil {
logrus.WithError(err).Error("error while getting container stats", containerID)
continue
}

var (
bytesSent uint64
bytesRecv uint64
)

for _, network := range resources.NetworkStats {
bytesSent += network.TxBytes
bytesRecv += network.RxBytes
}

logrus.WithField("containerID", containerID).
WithField("resources", resources).
Debug("sending docker resources metrics")

metrics.SendAgentMetrics(bc.msgClient, []*protocol.AgentMetric{
metrics.CreateAgentResourcesMetric(
agentConfig, t, domain.MetricDockerResourcesCPU, float64(resources.CPUStats.CPUUsage.TotalUsage)),
metrics.CreateAgentResourcesMetric(
agentConfig, t, domain.MetricDockerResourcesMemory, float64(resources.MemoryStats.Usage)),
metrics.CreateAgentResourcesMetric(
agentConfig, t, domain.MetricDockerResourcesNetworkSent, float64(bytesSent)),
metrics.CreateAgentResourcesMetric(
agentConfig, t, domain.MetricDockerResourcesNetworkReceive, float64(bytesRecv)),
})
}
}

func (bc *botClient) attachServiceContainers(ctx context.Context, botNetworkID string) error {
Expand Down Expand Up @@ -223,3 +286,16 @@ func (bc *botClient) StartWaitBotContainer(ctx context.Context, containerID stri
}
return bc.client.WaitContainerStart(ctx, containerID)
}

func initTicker(interval time.Duration) *time.Ticker {
nextTick := time.Now().Truncate(interval).Add(interval)
initialSleepDuration := time.Until(nextTick)

// Sleep until the next interval
time.Sleep(initialSleepDuration)

// Start a ticker that ticks every interval
ticker := time.NewTicker(interval)

return ticker
}
57 changes: 56 additions & 1 deletion services/components/containers/bot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"testing"

"github.com/docker/docker/api/types"
"github.com/docker/docker/errdefs"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/clients/messaging"
mock_clients "github.com/forta-network/forta-node/clients/mocks"
"github.com/forta-network/forta-node/config"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand All @@ -30,6 +35,7 @@ type BotClientTestSuite struct {

client *mock_clients.MockDockerClient
botImageClient *mock_clients.MockDockerClient
msgClient *mock_clients.MockMessageClient

botClient *botClient

Expand All @@ -46,10 +52,11 @@ func (s *BotClientTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.client = mock_clients.NewMockDockerClient(ctrl)
s.botImageClient = mock_clients.NewMockDockerClient(ctrl)
s.msgClient = mock_clients.NewMockMessageClient(ctrl)

s.botImageClient.EXPECT().SetImagePullCooldown(ImagePullCooldownThreshold, ImagePullCooldownDuration)

s.botClient = NewBotClient(config.LogConfig{}, config.ResourcesConfig{}, "", s.client, s.botImageClient)
s.botClient = NewBotClient(config.LogConfig{}, config.ResourcesConfig{}, "", s.client, s.botImageClient, s.msgClient)
}

func (s *BotClientTestSuite) TestEnsureBotImages() {
Expand Down Expand Up @@ -97,7 +104,53 @@ func (s *BotClientTestSuite) TestLaunchBot_Exists() {
s.client.EXPECT().AttachNetwork(gomock.Any(), testContainerID, testBotNetworkID).Return(nil)
}

resources := &docker.ContainerResources{
CPUStats: docker.CPUStats{
CPUUsage: docker.CPUUsage{
TotalUsage: 33,
},
},
MemoryStats: docker.MemoryStats{
Usage: 100,
},
NetworkStats: map[string]docker.NetworkStats{
"eth0": {
RxBytes: 123,
TxBytes: 456,
},
},
}
executed := make(chan bool)
s.client.EXPECT().ContainerStats(gomock.Any(), botConfig.ContainerName()).Return(resources, nil).Times(1)
s.msgClient.EXPECT().PublishProto(messaging.SubjectMetricAgent, gomock.Any()).Do(func(v1, v2 interface{}) {
metrics := v2.(*protocol.AgentMetricList)
assert.Len(s.T(), metrics.Metrics, 4)

// CPU metric
assert.Equal(s.T(), botConfig.ID, metrics.Metrics[0].AgentId)
assert.Equal(s.T(), domain.MetricDockerResourcesCPU, metrics.Metrics[0].Name)
assert.Equal(s.T(), float64(33), metrics.Metrics[0].Value)

// Memory metric
assert.Equal(s.T(), botConfig.ID, metrics.Metrics[1].AgentId)
assert.Equal(s.T(), domain.MetricDockerResourcesMemory, metrics.Metrics[1].Name)
assert.Equal(s.T(), float64(100), metrics.Metrics[1].Value)

// Network bytes received metric
assert.Equal(s.T(), botConfig.ID, metrics.Metrics[3].AgentId)
assert.Equal(s.T(), domain.MetricDockerResourcesNetworkReceive, metrics.Metrics[3].Name)
assert.Equal(s.T(), float64(123), metrics.Metrics[3].Value)

// Network bytes sent metric
assert.Equal(s.T(), botConfig.ID, metrics.Metrics[2].AgentId)
assert.Equal(s.T(), domain.MetricDockerResourcesNetworkSent, metrics.Metrics[2].Name)
assert.Equal(s.T(), float64(456), metrics.Metrics[2].Value)

close(executed)
})

s.r.NoError(s.botClient.LaunchBot(context.Background(), botConfig))
<-executed
}

func (s *BotClientTestSuite) TestLaunchBot_GetContainerError() {
Expand Down Expand Up @@ -130,6 +183,8 @@ func (s *BotClientTestSuite) TestLaunchBot_DoesNotExist() {
s.client.EXPECT().AttachNetwork(gomock.Any(), testContainerID, testBotNetworkID).Return(nil)
}

s.client.EXPECT().ContainerStats(gomock.Any(), botConfig.ContainerName()).Return(nil, errdefs.NotFound(errors.New(""))).MaxTimes(1)

s.r.NoError(s.botClient.LaunchBot(context.Background(), botConfig))
}

Expand Down
74 changes: 0 additions & 74 deletions services/components/containers/resources.go

This file was deleted.

Loading

0 comments on commit acb524f

Please sign in to comment.