Skip to content

Commit

Permalink
Merge pull request #105 from duftler/mush-rush-into-rosco
Browse files Browse the repository at this point in the history
Refactor rosco to run jobs directly instead of relying on rush.
  • Loading branch information
Travis Tomsu authored Jun 16, 2016
2 parents 5529639 + ce46066 commit 1ec7b9c
Show file tree
Hide file tree
Showing 29 changed files with 706 additions and 963 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ git clone [email protected]:spinnaker/spinnaker.git
docker-machine create --virtualbox-disk-size 8192 --virtualbox-memory 8192 -d virtualbox spinnaker
eval $(docker-machine env spinnaker)
cd spinnaker/experimental/docker-compose
docker-compose up -d redis rush
docker-compose up -d redis
```

## Verify redis
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ allprojects {
group = "com.netflix.spinnaker.rosco"

spinnaker {
dependenciesVersion = "0.19.0"
dependenciesVersion = "0.40.0"
}

configurations.all {
Expand Down
1 change: 1 addition & 0 deletions rosco-core/rosco-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dependencies {
spinnaker.group('retrofitDefault')
compile spinnaker.dependency('rxJava')
spinnaker.group('jackson')
compile spinnaker.dependency("commonsExec")
compile spinnaker.dependency("frigga")
compile spinnaker.dependency('jacksonGuava')
compile spinnaker.dependency('jedis')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.netflix.spinnaker.rosco.api
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.swagger.annotations.ApiModelProperty

/**
* The details of a completed bake.
Expand All @@ -29,6 +30,7 @@ import groovy.transform.ToString
@EqualsAndHashCode(includes = "id")
@ToString(includeNames = true)
class Bake {
@ApiModelProperty(value="The id of the bake job.")
String id
String ami
String image_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.netflix.spinnaker.rosco.api

import com.fasterxml.jackson.annotation.JsonIgnore
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.swagger.annotations.ApiModelProperty

/**
* The state of a bake as returned by the Bakery API when a bake is created. Once complete it provides a link to the
Expand All @@ -32,6 +34,7 @@ class BakeStatus implements Serializable {
/**
* The bake status id.
*/
@ApiModelProperty(value="The id of the bake request.")
String id

State state
Expand All @@ -43,10 +46,20 @@ class BakeStatus implements Serializable {
*
* @see BakeryController#lookupBake
*/
@ApiModelProperty(value="The id of the bake job. Can be passed to lookupBake() to retrieve the details of the newly-baked image.")
String resource_id

@JsonIgnore
String logsContent

@JsonIgnore
long createdTimestamp

@JsonIgnore
long updatedTimestamp

static enum State {
PENDING, RUNNING, COMPLETED, SUSPENDED, CANCELED
RUNNING, COMPLETED, CANCELED
}

static enum Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
package com.netflix.spinnaker.rosco.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.rosco.api.BakeStatus
import com.netflix.spinnaker.rosco.executor.BakePoller
import com.netflix.spinnaker.rosco.persistence.BakeStore
import com.netflix.spinnaker.rosco.persistence.RedisBackedBakeStore
import com.netflix.spinnaker.rosco.providers.registry.CloudProviderBakeHandlerRegistry
import com.netflix.spinnaker.rosco.providers.registry.DefaultCloudProviderBakeHandlerRegistry
import com.netflix.spinnaker.rosco.providers.util.DefaultImageNameFactory
import com.netflix.spinnaker.rosco.providers.util.DockerFriendlyPackerCommandFactory
import com.netflix.spinnaker.rosco.providers.util.ImageNameFactory
import com.netflix.spinnaker.rosco.providers.util.LocalJobFriendlyPackerCommandFactory
import com.netflix.spinnaker.rosco.providers.util.PackerCommandFactory
import groovy.transform.CompileStatic
import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Scope
Expand Down Expand Up @@ -89,58 +85,10 @@ class RoscoConfiguration {
return new LocalJobFriendlyPackerCommandFactory()
}

@Bean
@ConditionalOnProperty('rush.docker.enabled')
PackerCommandFactory dockerFriendlyPackerCommandFactory() {
return new DockerFriendlyPackerCommandFactory()
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
ObjectMapper mapper() {
return new ObjectMapper()
}

@Bean
@ConfigurationProperties('executionStatusToBakeStates')
ExecutionStatusToBakeStateMap executionStatusToBakeStateMap() {
new ExecutionStatusToBakeStateMap()
}

static class ExecutionStatusToBakeStateMap {
List<ExecutionStatusToBakeState> associations

public BakeStatus.State convertExecutionStatusToBakeState(String executionStatus) {
associations.find {
it.executionStatus == executionStatus
}?.bakeState
}
}

static class ExecutionStatusToBakeState {
String executionStatus
BakeStatus.State bakeState
}

@Bean
@ConfigurationProperties('executionStatusToBakeResults')
ExecutionStatusToBakeResultMap executionStatusToBakeResultMap() {
new ExecutionStatusToBakeResultMap()
}

static class ExecutionStatusToBakeResultMap {
List<ExecutionStatusToBakeResult> associations

public BakeStatus.Result convertExecutionStatusToBakeResult(String executionStatus) {
associations.find {
it.executionStatus == executionStatus
}?.bakeResult
}
}

static class ExecutionStatusToBakeResult {
String executionStatus
BakeStatus.Result bakeResult
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,99 +18,143 @@ package com.netflix.spinnaker.rosco.executor

import com.netflix.spinnaker.rosco.api.Bake
import com.netflix.spinnaker.rosco.api.BakeStatus
import com.netflix.spinnaker.rosco.config.RoscoConfiguration
import com.netflix.spinnaker.rosco.jobs.JobExecutor
import com.netflix.spinnaker.rosco.persistence.BakeStore
import com.netflix.spinnaker.rosco.providers.registry.CloudProviderBakeHandlerRegistry
import com.netflix.spinnaker.rosco.rush.api.RushService
import com.netflix.spinnaker.rosco.rush.api.ScriptExecution
import com.netflix.spinnaker.rosco.rush.api.ScriptRequest
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.ApplicationListener
import org.springframework.context.event.ContextRefreshedEvent
import org.springframework.stereotype.Component
import retrofit.RetrofitError
import retrofit.mime.TypedByteArray
import rx.functions.Action0
import rx.schedulers.Schedulers

import java.util.concurrent.TimeUnit

/**
* BakePoller periodically queries the bake store for incomplete bakes. For each incomplete bake, it queries
* the scripting engine for an up-to-date status and logs. The status and logs are then persisted via the bake
* the job executor for an up-to-date status and logs. The status and logs are then persisted via the bake
* store. When a bake completes, it is the BakePoller that persists the completed bake details via the bake store.
* The polling interval defaults to 15 seconds and can be overridden by specifying the pollingIntervalSeconds
* property.
* The polling interval defaults to 15 seconds and can be overridden by specifying the
* rosco.polling.pollingIntervalSeconds property.
*/
@Slf4j
@Component
class BakePoller implements ApplicationListener<ContextRefreshedEvent> {

@Value('${pollingIntervalSeconds:15}')
int pollingIntervalSeconds

@Autowired
BakeStore bakeStore
String roscoInstanceId

@Autowired
ScriptRequest baseScriptRequest
@Value('${rosco.polling.pollingIntervalSeconds:15}')
int pollingIntervalSeconds

@Autowired
RushService rushService
@Value('${rosco.polling.orphanedJobPollingIntervalSeconds:30}')
int orphanedJobPollingIntervalSeconds

@Value('${rosco.polling.orphanedJobTimeoutMinutes:30}')
long orphanedJobTimeoutMinutes

@Autowired
CloudProviderBakeHandlerRegistry cloudProviderBakeHandlerRegistry
BakeStore bakeStore

@Autowired
RoscoConfiguration.ExecutionStatusToBakeStateMap executionStatusToBakeStateMap
JobExecutor executor

@Autowired
RoscoConfiguration.ExecutionStatusToBakeResultMap executionStatusToBakeResultMap
CloudProviderBakeHandlerRegistry cloudProviderBakeHandlerRegistry

@Override
void onApplicationEvent(ContextRefreshedEvent event) {
log.info("Starting polling agent for rosco instance $roscoInstanceId...")

// Update this rosco instance's incomplete bakes.
Schedulers.io().createWorker().schedulePeriodically(
{
try {
rx.Observable.from(bakeStore.incompleteBakeIds)
.subscribe(
{ String statusId ->
updateBakeStatusAndLogs(statusId)
rx.Observable.from(bakeStore.thisInstanceIncompleteBakeIds)
.subscribe(
{ String incompleteBakeId ->
try {
updateBakeStatusAndLogs(incompleteBakeId)
} catch (Exception e) {
log.error("Polling Error:", e)
}
},
{
log.error("Error: ${it.message}")
},
{} as Action0
)
} catch (Exception e) {
log.error("Polling Error:", e)
}
)
} as Action0, 0, pollingIntervalSeconds, TimeUnit.SECONDS
)

// Check _all_ rosco instances' incomplete bakes for staleness.
Schedulers.io().createWorker().schedulePeriodically(
{
rx.Observable.from(bakeStore.allIncompleteBakeIds.entrySet())
.subscribe(
{ Map.Entry<String, Set<String>> entry ->
String roscoInstanceId = entry.key
Set<String> incompleteBakeIds = entry.value

if (roscoInstanceId != this.roscoInstanceId) {
try {
rx.Observable.from(incompleteBakeIds)
.subscribe(
{ String statusId ->
BakeStatus bakeStatus = bakeStore.retrieveBakeStatusById(statusId)

// The updatedTimestamp key will not be present if the in-flight bake is managed by an
// older-style (i.e. rosco/rush) rosco instance.
if (bakeStatus?.updatedTimestamp) {
long currentTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(bakeStore.timeInMilliseconds)
long lastUpdatedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(bakeStatus.updatedTimestamp)
long eTimeMinutes = TimeUnit.SECONDS.toMinutes(currentTimeSeconds - lastUpdatedTimeSeconds)

if (eTimeMinutes >= orphanedJobTimeoutMinutes) {
log.info("The staleness of bake $statusId ($eTimeMinutes minutes) has met or exceeded the " +
"value of orphanedJobTimeoutMinutes ($orphanedJobTimeoutMinutes minutes).")

boolean cancellationSucceeded = bakeStore.cancelBakeById(statusId)

if (!cancellationSucceeded) {
bakeStore.removeFromIncompletes(roscoInstanceId, statusId)
}
}
}
},
{
log.error("Error: ${it.message}")
},
{} as Action0
)
} catch (Exception e) {
log.error("Polling Error:", e)
}
}
},
{
log.error("Error: ${it.message}")
},
{} as Action0
)
} as Action0, 0, orphanedJobPollingIntervalSeconds, TimeUnit.SECONDS
)
}

// TODO(duftler): Support retries here, or at least some number of regular-interval communication failures before
// considering the bake a failure.
void updateBakeStatusAndLogs(String statusId) {
try {
ScriptExecution scriptExecution = rushService.scriptDetails(statusId).toBlocking().single()
Map logsContentMap = rushService.getLogs(statusId, baseScriptRequest).toBlocking().single()
BakeStatus.State state = executionStatusToBakeStateMap.convertExecutionStatusToBakeState(scriptExecution.status)
BakeStatus bakeStatus = executor.updateJob(statusId)

if (state == BakeStatus.State.COMPLETED) {
completeBake(statusId, logsContentMap?.logsContent)
if (bakeStatus) {
if (bakeStatus.state == BakeStatus.State.COMPLETED) {
completeBake(statusId, bakeStatus.logsContent)
}

bakeStore.updateBakeStatus(new BakeStatus(id: scriptExecution.id,
resource_id: scriptExecution.id,
state: state,
result: executionStatusToBakeResultMap.convertExecutionStatusToBakeResult(scriptExecution.status)),
logsContentMap)
} catch (RetrofitError e) {
handleRetrofitError(e, "Unable to retrieve status for '$statusId'.", statusId)

bakeStore.updateBakeStatus(bakeStatus)
} else {
String errorMessage = "Unable to retrieve status for '$statusId'."
log.error(errorMessage)
bakeStore.storeBakeError(statusId, errorMessage)
bakeStore.cancelBakeById(statusId)
}
}
Expand Down Expand Up @@ -139,13 +183,4 @@ class BakePoller implements ApplicationListener<ContextRefreshedEvent> {

log.error("Unable to retrieve bake details for '$bakeId'.")
}

private handleRetrofitError(RetrofitError e, String errMessage, String id) {
log.error(errMessage, e)

def errorBytes = ((TypedByteArray)e?.response?.body)?.bytes
def errorMessage = errorBytes ? new String(errorBytes) : "{}"

bakeStore.storeBakeError(id, errorMessage)
}
}
Loading

0 comments on commit 1ec7b9c

Please sign in to comment.