From 681eccf23d886344e5c0bafb8420739ff63ff41c Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Thu, 20 Jun 2024 18:32:26 +0900 Subject: [PATCH] feat: support canary test for multiple load balancers (#80) --- .vscode/settings.json | 7 + canary_task.go | 355 +++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + rollout.go | 452 ++++++------------------------------------ rollout_test.go | 24 ++- 6 files changed, 450 insertions(+), 391 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 canary_task.go diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1209016 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "explorer.fileNesting.enabled": true, + "explorer.fileNesting.patterns": { + "*.go": "${capture}_test.go", + "go.mod": "go.sum" + } +} diff --git a/canary_task.go b/canary_task.go new file mode 100644 index 0000000..aacbd6b --- /dev/null +++ b/canary_task.go @@ -0,0 +1,355 @@ +package cage + +import ( + "context" + "fmt" + "time" + + "github.com/apex/log" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/aws/aws-sdk-go-v2/service/ecs" + ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types" + elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" + elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" + "golang.org/x/xerrors" +) + +type CanaryTarget struct { + targetGroupArn *string + targetId *string + targetPort *int32 + availabilityZone *string +} + +type CanaryTask struct { + *cage + td *ecstypes.TaskDefinition + lb *ecstypes.LoadBalancer + networkConfiguration *ecstypes.NetworkConfiguration + platformVersion *string + taskArn *string + target *CanaryTarget +} + +func (c *CanaryTask) Start(ctx context.Context) error { + if c.Env.CanaryInstanceArn != "" { + // ec2 + startTask := &ecs.StartTaskInput{ + Cluster: &c.Env.Cluster, + Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), + NetworkConfiguration: c.networkConfiguration, + TaskDefinition: c.td.TaskDefinitionArn, + ContainerInstances: []string{c.Env.CanaryInstanceArn}, + } + if o, err := c.Ecs.StartTask(ctx, startTask); err != nil { + return err + } else { + c.taskArn = o.Tasks[0].TaskArn + } + } else { + // fargate + if o, err := c.Ecs.RunTask(ctx, &ecs.RunTaskInput{ + Cluster: &c.Env.Cluster, + Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), + NetworkConfiguration: c.networkConfiguration, + TaskDefinition: c.td.TaskDefinitionArn, + LaunchType: ecstypes.LaunchTypeFargate, + PlatformVersion: c.platformVersion, + }); err != nil { + return err + } else { + c.taskArn = o.Tasks[0].TaskArn + } + } + return nil +} + +func (c *CanaryTask) Wait(ctx context.Context) error { + log.Infof("🥚 waiting for canary task '%s' is running...", *c.taskArn) + if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ + Cluster: &c.Env.Cluster, + Tasks: []string{*c.taskArn}, + }, c.MaxWait); err != nil { + return err + } + log.Infof("🐣 canary task '%s' is running!", *c.taskArn) + if err := c.waitUntilHealthCeheckPassed(ctx); err != nil { + return err + } + log.Info("🤩 canary task container(s) is healthy!") + log.Infof("canary task '%s' ensured.", *c.taskArn) + if c.lb == nil { + log.Infof("no load balancer is attached to service '%s'. skip registration to target group", c.Env.Service) + return c.waitForIdleDuration(ctx) + } else { + if err := c.registerToTargetGroup(ctx); err != nil { + return err + } + log.Infof("😷 ensuring canary task to become healthy...") + if err := c.waitUntilTargetHealthy(ctx); err != nil { + return err + } + log.Info("🤩 canary task is healthy!") + return nil + } +} + +func (c *CanaryTask) waitForIdleDuration(ctx context.Context) error { + log.Infof("wait %d seconds for canary task to be stable...", c.Env.CanaryTaskIdleDuration) + wait := make(chan bool) + go func() { + duration := c.Env.CanaryTaskIdleDuration + for duration > 0 { + log.Infof("still waiting...; %d seconds left", duration) + wt := 10 + if duration < 10 { + wt = duration + } + <-c.Time.NewTimer(time.Duration(wt) * time.Second).C + duration -= 10 + } + wait <- true + }() + <-wait + o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ + Cluster: &c.Env.Cluster, + Tasks: []string{*c.taskArn}, + }) + if err != nil { + return err + } + task := o.Tasks[0] + if *task.LastStatus != "RUNNING" { + return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) + } + return nil +} + +func (c *CanaryTask) waitUntilHealthCeheckPassed(ctx context.Context) error { + log.Infof("😷 ensuring canary task container(s) to become healthy...") + containerHasHealthChecks := map[string]struct{}{} + for _, definition := range c.td.ContainerDefinitions { + if definition.HealthCheck != nil { + containerHasHealthChecks[*definition.Name] = struct{}{} + } + } + for count := 0; count < 10; count++ { + <-c.Time.NewTimer(time.Duration(15) * time.Second).C + log.Infof("canary task '%s' waits until %d container(s) become healthy", *c.taskArn, len(containerHasHealthChecks)) + if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ + Cluster: &c.Env.Cluster, + Tasks: []string{*c.taskArn}, + }); err != nil { + return err + } else { + task := o.Tasks[0] + if *task.LastStatus != "RUNNING" { + return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) + } + + for _, container := range task.Containers { + if _, ok := containerHasHealthChecks[*container.Name]; !ok { + continue + } + if container.HealthStatus != ecstypes.HealthStatusHealthy { + log.Infof("container '%s' is not healthy: %s", *container.Name, container.HealthStatus) + continue + } + delete(containerHasHealthChecks, *container.Name) + } + if len(containerHasHealthChecks) == 0 { + return nil + } + } + } + return xerrors.Errorf("😨 canary task hasn't become to be healthy") +} + +func (c *CanaryTask) registerToTargetGroup(ctx context.Context) error { + // Phase 3: Get task details after network interface is attached + var task ecstypes.Task + if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ + Cluster: &c.Env.Cluster, + Tasks: []string{*c.taskArn}, + }); err != nil { + return err + } else { + task = o.Tasks[0] + } + var targetId *string + var targetPort *int32 + var subnet ec2types.Subnet + for _, container := range c.td.ContainerDefinitions { + if *container.Name == *c.lb.ContainerName { + targetPort = container.PortMappings[0].HostPort + } + } + if targetPort == nil { + return xerrors.Errorf("couldn't find host port in container definition") + } + if c.Env.CanaryInstanceArn == "" { // Fargate + details := task.Attachments[0].Details + var subnetId *string + var privateIp *string + for _, v := range details { + if *v.Name == "subnetId" { + subnetId = v.Value + } else if *v.Name == "privateIPv4Address" { + privateIp = v.Value + } + } + if subnetId == nil || privateIp == nil { + return xerrors.Errorf("couldn't find subnetId or privateIPv4Address in task details") + } + if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ + SubnetIds: []string{*subnetId}, + }); err != nil { + return err + } else { + subnet = o.Subnets[0] + } + targetId = privateIp + log.Infof("canary task was placed: privateIp = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) + } else { + var containerInstance ecstypes.ContainerInstance + if outputs, err := c.Ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{ + Cluster: &c.Env.Cluster, + ContainerInstances: []string{c.Env.CanaryInstanceArn}, + }); err != nil { + return err + } else { + containerInstance = outputs.ContainerInstances[0] + } + if o, err := c.Ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ + InstanceIds: []string{*containerInstance.Ec2InstanceId}, + }); err != nil { + return err + } else if sn, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ + SubnetIds: []string{*o.Reservations[0].Instances[0].SubnetId}, + }); err != nil { + return err + } else { + targetId = containerInstance.Ec2InstanceId + subnet = sn.Subnets[0] + } + log.Infof("canary task was placed: instanceId = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) + } + if _, err := c.Alb.RegisterTargets(ctx, &elbv2.RegisterTargetsInput{ + TargetGroupArn: c.lb.TargetGroupArn, + Targets: []elbv2types.TargetDescription{{ + AvailabilityZone: subnet.AvailabilityZone, + Id: targetId, + Port: targetPort, + }}, + }); err != nil { + return err + } + c.target = &CanaryTarget{ + targetGroupArn: c.lb.TargetGroupArn, + targetId: targetId, + targetPort: targetPort, + availabilityZone: subnet.AvailabilityZone, + } + return nil +} + +func (c *CanaryTask) waitUntilTargetHealthy( + ctx context.Context, +) error { + log.Infof("checking the health state of canary task...") + var unusedCount = 0 + var initialized = false + var recentState *elbv2types.TargetHealthStateEnum + for { + <-c.Time.NewTimer(time.Duration(15) * time.Second).C + if o, err := c.Alb.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{ + TargetGroupArn: c.target.targetGroupArn, + Targets: []elbv2types.TargetDescription{{ + Id: c.target.targetId, + Port: c.target.targetPort, + AvailabilityZone: c.target.availabilityZone, + }}, + }); err != nil { + return err + } else { + for _, desc := range o.TargetHealthDescriptions { + if *desc.Target.Id == *c.target.targetId && *desc.Target.Port == *c.target.targetPort { + recentState = &desc.TargetHealth.State + } + } + if recentState == nil { + return xerrors.Errorf("'%s' is not registered to the target group '%s'", c.target.targetId, c.target.targetGroupArn) + } + log.Infof("canary task '%s' (%s:%d) state is: %s", *c.taskArn, c.target.targetId, c.target.targetPort, *recentState) + switch *recentState { + case "healthy": + return nil + case "initial": + initialized = true + log.Infof("still checking the state...") + continue + case "unused": + unusedCount++ + if !initialized && unusedCount < 5 { + continue + } + default: + } + } + // unhealthy, draining, unused + log.Errorf("😨 canary task '%s' is unhealthy", *c.taskArn) + return xerrors.Errorf( + "canary task '%s' (%s:%d) hasn't become to be healthy. The most recent state: %s", + *c.taskArn, c.target.targetId, c.target.targetPort, *recentState, + ) + } +} + +func (c *CanaryTask) Stop(ctx context.Context) error { + if c.target == nil { + log.Info("no load balancer is attached to service. Skip deregisteration.") + } else { + log.Infof("deregistering the canary task from target group '%s'...", c.target.targetId) + if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{ + TargetGroupArn: c.target.targetGroupArn, + Targets: []elbv2types.TargetDescription{{ + AvailabilityZone: c.target.availabilityZone, + Id: c.target.targetId, + Port: c.target.targetPort, + }}, + }); err != nil { + return err + } + if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{ + TargetGroupArn: c.target.targetGroupArn, + Targets: []elbv2types.TargetDescription{{ + AvailabilityZone: c.target.availabilityZone, + Id: c.target.targetId, + Port: c.target.targetPort, + }}, + }, c.MaxWait); err != nil { + return err + } + log.Infof( + "canary task '%s' has successfully been deregistered from target group '%s'", + *c.taskArn, c.target.targetId, + ) + } + log.Infof("stopping the canary task '%s'...", *c.taskArn) + if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{ + Cluster: &c.Env.Cluster, + Task: c.taskArn, + }); err != nil { + return err + } + if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ + Cluster: &c.Env.Cluster, + Tasks: []string{*c.taskArn}, + }, c.MaxWait); err != nil { + return err + } + log.Infof("canary task '%s' has successfully been stopped", *c.taskArn) + return nil +} diff --git a/go.mod b/go.mod index ffc818c..0f41e7c 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect + golang.org/x/sync v0.7.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index da0e13b..1649eec 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/rollout.go b/rollout.go index 5d071d5..47e9601 100644 --- a/rollout.go +++ b/rollout.go @@ -2,17 +2,11 @@ package cage import ( "context" - "fmt" - "time" "github.com/apex/log" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/ec2" - ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/aws/aws-sdk-go-v2/service/ecs" ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types" - elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" - elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -22,50 +16,35 @@ type RollOutInput struct { } type RollOutResult struct { - StartTime time.Time - EndTime time.Time ServiceIntact bool } func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult, error) { - ret := &RollOutResult{ - StartTime: c.Time.Now(), + result := &RollOutResult{ ServiceIntact: true, } - var aggregatedError error - throw := func(err error) (*RollOutResult, error) { - ret.EndTime = c.Time.Now() - aggregatedError = err - return ret, err - } - defer func(result *RollOutResult) { - ret.EndTime = c.Time.Now() - }(ret) - var service ecstypes.Service if out, err := c.Ecs.DescribeServices(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{ c.Env.Service, }, }); err != nil { - log.Errorf("failed to describe current service due to: %s", err) - return throw(err) + return result, xerrors.Errorf("failed to describe current service due to: %w", err) } else if len(out.Services) == 0 { - return throw(xerrors.Errorf("service '%s' doesn't exist. Run 'cage up' or create service before rolling out", c.Env.Service)) + return result, xerrors.Errorf("service '%s' doesn't exist. Run 'cage up' or create service before rolling out", c.Env.Service) } else { - service = out.Services[0] - } - if *service.Status != "ACTIVE" { - return throw(xerrors.Errorf("😵 '%s' status is '%s'. Stop rolling out", c.Env.Service, *service.Status)) - } - if service.LaunchType == ecstypes.LaunchTypeEc2 && c.Env.CanaryInstanceArn == "" { - return throw(xerrors.Errorf("🥺 --canaryInstanceArn is required when LaunchType = 'EC2'")) + service := out.Services[0] + if *service.Status != "ACTIVE" { + return result, xerrors.Errorf("😵 '%s' status is '%s'. Stop rolling out", c.Env.Service, *service.Status) + } + if service.LaunchType == ecstypes.LaunchTypeEc2 && c.Env.CanaryInstanceArn == "" { + return result, xerrors.Errorf("🥺 --canaryInstanceArn is required when LaunchType = 'EC2'") + } } log.Infof("ensuring next task definition...") var nextTaskDefinition *ecstypes.TaskDefinition if o, err := c.CreateNextTaskDefinition(ctx); err != nil { - log.Errorf("failed to register next task definition due to: %s", err) - return throw(err) + return result, xerrors.Errorf("failed to register next task definition due to: %w", err) } else { nextTaskDefinition = o } @@ -73,46 +52,39 @@ func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult log.Info("--updateService flag is set. use provided service configurations for canary test instead of current service") } log.Infof("starting canary task...") - canaryTask, startCanaryTaskErr := c.StartCanaryTask(ctx, nextTaskDefinition, input) + canaryTasks, startCanaryTaskErr := c.StartCanaryTasks(ctx, nextTaskDefinition, input) // ensure canary task stopped after rolling out either success or failure - defer func(canaryTask *CanaryTask, result *RollOutResult) { - if canaryTask.taskArn == nil { - return - } - if err := c.StopCanaryTask(ctx, canaryTask); err != nil { - log.Fatalf("failed to stop canary task '%s': %s", *canaryTask.taskArn, err) + defer func() { + _ = recover() + eg := errgroup.Group{} + for _, canaryTask := range canaryTasks { + if canaryTask.taskArn == nil { + continue + } + eg.Go(func() error { + err := canaryTask.Stop(ctx) + if err != nil { + log.Errorf("failed to stop canary task '%s': %s", *canaryTask.taskArn, err) + } + return err + }) } - if aggregatedError == nil { - log.Infof( - "🐥 service '%s' successfully rolled out to '%s:%d'!", - c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, - ) - } else { - log.Errorf("😥 %s", aggregatedError) + if err := eg.Wait(); err != nil { + log.Errorf("failed to stop canary tasks due to: %s", err) } - }(&canaryTask, ret) + }() if startCanaryTaskErr != nil { - log.Errorf("failed to start canary task due to: %s", startCanaryTaskErr) - return throw(startCanaryTaskErr) + return result, xerrors.Errorf("failed to start canary task due to: %w", startCanaryTaskErr) } - log.Infof("😷 ensuring canary task container(s) to become healthy...") - if err := c.waitUntilContainersBecomeHealthy(ctx, *canaryTask.taskArn, nextTaskDefinition); err != nil { - return throw(err) + eg := errgroup.Group{} + for _, canaryTask := range canaryTasks { + eg.Go(func() error { + return canaryTask.Wait(ctx) + }) } - log.Info("🤩 canary task container(s) is healthy!") - log.Infof("canary task '%s' ensured.", *canaryTask.taskArn) - if canaryTask.target != nil { - log.Infof("😷 ensuring canary task to become healthy...") - if err := c.EnsureTaskHealthy( - ctx, - *canaryTask.taskArn, - canaryTask.target, - ); err != nil { - return throw(err) - } - log.Info("🤩 canary task is healthy!") + if err := eg.Wait(); err != nil { + return result, xerrors.Errorf("failed to wait for canary task due to: %w", err) } - ret.ServiceIntact = false log.Infof( "updating the task definition of '%s' into '%s:%d'...", c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, @@ -131,110 +103,32 @@ func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult updateInput.VolumeConfigurations = c.Env.ServiceDefinitionInput.VolumeConfigurations } if _, err := c.Ecs.UpdateService(ctx, updateInput); err != nil { - return throw(err) + return result, err } + result.ServiceIntact = false log.Infof("waiting for service '%s' to be stable...", c.Env.Service) if err := ecs.NewServicesStableWaiter(c.Ecs).Wait(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{c.Env.Service}, }, c.MaxWait); err != nil { - return throw(err) + return result, err } log.Infof("🥴 service '%s' has become to be stable!", c.Env.Service) - ret.EndTime = c.Time.Now() - return ret, nil -} - -func (c *cage) EnsureTaskHealthy( - ctx context.Context, - taskArn string, - p *CanaryTarget, -) error { - log.Infof("checking the health state of canary task...") - var unusedCount = 0 - var initialized = false - var recentState *elbv2types.TargetHealthStateEnum - for { - <-c.Time.NewTimer(time.Duration(15) * time.Second).C - if o, err := c.Alb.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{ - TargetGroupArn: &p.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - Id: &p.targetId, - Port: &p.targetPort, - AvailabilityZone: &p.availabilityZone, - }}, - }); err != nil { - return err - } else { - recentState = GetTargetIsHealthy(o, &p.targetId, &p.targetPort) - if recentState == nil { - return xerrors.Errorf("'%s' is not registered to the target group '%s'", p.targetId, p.targetGroupArn) - } - log.Infof("canary task '%s' (%s:%d) state is: %s", taskArn, p.targetId, p.targetPort, *recentState) - switch *recentState { - case "healthy": - return nil - case "initial": - initialized = true - log.Infof("still checking the state...") - continue - case "unused": - unusedCount++ - if !initialized && unusedCount < 5 { - continue - } - default: - } - } - // unhealthy, draining, unused - log.Errorf("😨 canary task '%s' is unhealthy", taskArn) - return xerrors.Errorf( - "canary task '%s' (%s:%d) hasn't become to be healthy. The most recent state: %s", - taskArn, p.targetId, p.targetPort, *recentState, - ) - } -} - -func GetTargetIsHealthy(o *elbv2.DescribeTargetHealthOutput, targetId *string, targetPort *int32) *elbv2types.TargetHealthStateEnum { - for _, desc := range o.TargetHealthDescriptions { - if *desc.Target.Id == *targetId && *desc.Target.Port == *targetPort { - return &desc.TargetHealth.State - } - } - return nil -} - -func (c *cage) DescribeSubnet(ctx context.Context, subnetId *string) (ec2types.Subnet, error) { - if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ - SubnetIds: []string{*subnetId}, - }); err != nil { - return ec2types.Subnet{}, err - } else { - return o.Subnets[0], nil - } -} - -type CanaryTarget struct { - targetGroupArn string - targetId string - targetPort int32 - availabilityZone string -} - -type CanaryTask struct { - taskArn *string - target *CanaryTarget + log.Infof( + "🐥 service '%s' successfully rolled out to '%s:%d'!", + c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, + ) + return result, nil } -func (c *cage) StartCanaryTask( +func (c *cage) StartCanaryTasks( ctx context.Context, nextTaskDefinition *ecstypes.TaskDefinition, input *RollOutInput, -) (CanaryTask, error) { +) ([]*CanaryTask, error) { var networkConfiguration *ecstypes.NetworkConfiguration var platformVersion *string var loadBalancers []ecstypes.LoadBalancer - var result CanaryTask if input.UpdateService { networkConfiguration = c.Env.ServiceDefinitionInput.NetworkConfiguration platformVersion = c.Env.ServiceDefinitionInput.PlatformVersion @@ -244,7 +138,7 @@ func (c *cage) StartCanaryTask( Cluster: &c.Env.Cluster, Services: []string{c.Env.Service}, }); err != nil { - return result, err + return nil, err } else { service := o.Services[0] networkConfiguration = service.NetworkConfiguration @@ -252,247 +146,25 @@ func (c *cage) StartCanaryTask( loadBalancers = service.LoadBalancers } } - // Phase 1: Start canary task - var taskArn *string - if c.Env.CanaryInstanceArn != "" { - // ec2 - startTask := &ecs.StartTaskInput{ - Cluster: &c.Env.Cluster, - Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), - NetworkConfiguration: networkConfiguration, - TaskDefinition: nextTaskDefinition.TaskDefinitionArn, - ContainerInstances: []string{c.Env.CanaryInstanceArn}, - } - if o, err := c.Ecs.StartTask(ctx, startTask); err != nil { - return result, err - } else { - taskArn = o.Tasks[0].TaskArn - } - } else { - // fargate - if o, err := c.Ecs.RunTask(ctx, &ecs.RunTaskInput{ - Cluster: &c.Env.Cluster, - Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), - NetworkConfiguration: networkConfiguration, - TaskDefinition: nextTaskDefinition.TaskDefinitionArn, - LaunchType: ecstypes.LaunchTypeFargate, - PlatformVersion: platformVersion, - }); err != nil { - return result, err - } else { - taskArn = o.Tasks[0].TaskArn - } - } - result.taskArn = taskArn - // Phase 2: Wait until canary task is running - log.Infof("🥚 waiting for canary task '%s' is running...", *taskArn) - if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }, c.MaxWait); err != nil { - return result, err - } - log.Infof("🐣 canary task '%s' is running!", *taskArn) + var results []*CanaryTask if len(loadBalancers) == 0 { - log.Infof("no load balancer is attached to service '%s'. skip registration to target group", c.Env.Service) - log.Infof("wait %d seconds for ensuring the task goes stable", c.Env.CanaryTaskIdleDuration) - wait := make(chan bool) - go func() { - duration := c.Env.CanaryTaskIdleDuration - for duration > 0 { - log.Infof("still waiting...; %d seconds left", duration) - wt := 10 - if duration < 10 { - wt = duration - } - <-c.Time.NewTimer(time.Duration(wt) * time.Second).C - duration -= 10 - } - wait <- true - }() - <-wait - o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }) - if err != nil { - return result, err + task := &CanaryTask{ + c, nextTaskDefinition, nil, networkConfiguration, platformVersion, nil, nil, } - task := o.Tasks[0] - if *task.LastStatus != "RUNNING" { - return result, xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) + results = append(results, task) + if err := task.Start(ctx); err != nil { + return results, err } - return result, nil - } - // Phase 3: Get task details after network interface is attached - var task ecstypes.Task - if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }); err != nil { - return result, err } else { - task = o.Tasks[0] - } - var targetId *string - var targetPort *int32 - var subnet ec2types.Subnet - for _, container := range nextTaskDefinition.ContainerDefinitions { - if *container.Name == *loadBalancers[0].ContainerName { - targetPort = container.PortMappings[0].HostPort - } - } - if targetPort == nil { - return result, xerrors.Errorf("couldn't find host port in container definition") - } - if c.Env.CanaryInstanceArn == "" { // Fargate - details := task.Attachments[0].Details - var subnetId *string - var privateIp *string - for _, v := range details { - if *v.Name == "subnetId" { - subnetId = v.Value - } else if *v.Name == "privateIPv4Address" { - privateIp = v.Value - } - } - if subnetId == nil || privateIp == nil { - return result, xerrors.Errorf("couldn't find subnetId or privateIPv4Address in task details") - } - if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ - SubnetIds: []string{*subnetId}, - }); err != nil { - return result, err - } else { - subnet = o.Subnets[0] - } - targetId = privateIp - log.Infof("canary task was placed: privateIp = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) - } else { - var containerInstance ecstypes.ContainerInstance - if outputs, err := c.Ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{ - Cluster: &c.Env.Cluster, - ContainerInstances: []string{c.Env.CanaryInstanceArn}, - }); err != nil { - return result, err - } else { - containerInstance = outputs.ContainerInstances[0] - } - if o, err := c.Ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ - InstanceIds: []string{*containerInstance.Ec2InstanceId}, - }); err != nil { - return result, err - } else if sn, err := c.DescribeSubnet(ctx, o.Reservations[0].Instances[0].SubnetId); err != nil { - return result, err - } else { - targetId = containerInstance.Ec2InstanceId - subnet = sn - } - log.Infof("canary task was placed: instanceId = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) - } - if _, err := c.Alb.RegisterTargets(ctx, &elbv2.RegisterTargetsInput{ - TargetGroupArn: loadBalancers[0].TargetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: subnet.AvailabilityZone, - Id: targetId, - Port: targetPort, - }}, - }); err != nil { - return result, err - } - result.target = &CanaryTarget{ - targetGroupArn: *loadBalancers[0].TargetGroupArn, - targetId: *targetId, - targetPort: *targetPort, - availabilityZone: *subnet.AvailabilityZone, - } - return result, nil -} - -func (c *cage) waitUntilContainersBecomeHealthy(ctx context.Context, taskArn string, nextTaskDefinition *ecstypes.TaskDefinition) error { - containerHasHealthChecks := map[string]struct{}{} - for _, definition := range nextTaskDefinition.ContainerDefinitions { - if definition.HealthCheck != nil { - containerHasHealthChecks[*definition.Name] = struct{}{} - } - } - - for count := 0; count < 10; count++ { - <-c.Time.NewTimer(time.Duration(15) * time.Second).C - log.Infof("canary task '%s' waits until %d container(s) become healthy", taskArn, len(containerHasHealthChecks)) - if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{taskArn}, - }); err != nil { - return err - } else { - task := o.Tasks[0] - if *task.LastStatus != "RUNNING" { - return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) + for _, lb := range loadBalancers { + task := &CanaryTask{ + c, nextTaskDefinition, &lb, networkConfiguration, platformVersion, nil, nil, } - - for _, container := range task.Containers { - if _, ok := containerHasHealthChecks[*container.Name]; !ok { - continue - } - if container.HealthStatus != ecstypes.HealthStatusHealthy { - log.Infof("container '%s' is not healthy: %s", *container.Name, container.HealthStatus) - continue - } - delete(containerHasHealthChecks, *container.Name) - } - if len(containerHasHealthChecks) == 0 { - return nil + results = append(results, task) + if err := task.Start(ctx); err != nil { + return results, err } } } - return xerrors.Errorf("😨 canary task hasn't become to be healthy") -} - -func (c *cage) StopCanaryTask(ctx context.Context, input *CanaryTask) error { - if input.target == nil { - log.Info("no load balancer is attached to service. Skip deregisteration.") - } else { - log.Infof("deregistering the canary task from target group '%s'...", input.target.targetId) - if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{ - TargetGroupArn: &input.target.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: &input.target.availabilityZone, - Id: &input.target.targetId, - Port: &input.target.targetPort, - }}, - }); err != nil { - return err - } - if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{ - TargetGroupArn: &input.target.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: &input.target.availabilityZone, - Id: &input.target.targetId, - Port: &input.target.targetPort, - }}, - }, c.MaxWait); err != nil { - return err - } - log.Infof( - "canary task '%s' has successfully been deregistered from target group '%s'", - *input.taskArn, input.target.targetId, - ) - } - - log.Infof("stopping the canary task '%s'...", *input.taskArn) - if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{ - Cluster: &c.Env.Cluster, - Task: input.taskArn, - }); err != nil { - return err - } - if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*input.taskArn}, - }, c.MaxWait); err != nil { - return err - } - log.Infof("canary task '%s' has successfully been stopped", *input.taskArn) - return nil + return results, nil } diff --git a/rollout_test.go b/rollout_test.go index fced6e2..13544c5 100644 --- a/rollout_test.go +++ b/rollout_test.go @@ -51,6 +51,28 @@ func TestCage_RollOut_FARGATE(t *testing.T) { assert.Equal(t, v, mctx.RunningTaskSize()) } }) + t.Run("multiple load balancers", func(t *testing.T) { + log.Info("====") + envars := test.DefaultEnvars() + lb := envars.ServiceDefinitionInput.LoadBalancers[0] + envars.ServiceDefinitionInput.LoadBalancers = []ecstypes.LoadBalancer{lb, lb} + ctrl := gomock.NewController(t) + + mctx, ecsMock, albMock, ec2Mock := test.Setup(ctrl, envars, 1, "FARGATE") + cagecli := cage.NewCage(&cage.Input{ + Env: envars, + ECS: ecsMock, + ALB: albMock, + EC2: ec2Mock, + Time: test.NewFakeTime(), + }) + ctx := context.Background() + result, err := cagecli.RollOut(ctx, &cage.RollOutInput{}) + assert.NoError(t, err) + assert.False(t, result.ServiceIntact) + assert.Equal(t, 1, mctx.ActiveServiceSize()) + assert.Equal(t, 1, mctx.RunningTaskSize()) + }) t.Run("wait until canary task is registered to target group", func(t *testing.T) { envars := test.DefaultEnvars() ctrl := gomock.NewController(t) @@ -186,7 +208,7 @@ func TestCage_RollOut_FARGATE(t *testing.T) { }, } result, err := cagecli.RollOut(ctx, &cage.RollOutInput{UpdateService: true}) - assert.EqualError(t, err, "couldn't find host port in container definition") + assert.EqualError(t, err, "failed to wait for canary task due to: couldn't find host port in container definition") assert.Equal(t, result.ServiceIntact, true) assert.Equal(t, 1, mctx.RunningTaskSize()) })