Skip to content

Commit

Permalink
Add more config options (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jun 11, 2024
1 parent afc0046 commit 3d4bd7f
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 42 deletions.
4 changes: 1 addition & 3 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ use crate::{connection_tables, to_micros};
use arroyo_rpc::config::config;
use cornucopia_async::{Database, DatabaseSource};

const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(10);

async fn compile_sql<'a>(
query: String,
local_udfs: &Vec<Udf>,
Expand Down Expand Up @@ -511,7 +509,7 @@ pub async fn create_pipeline(
let checkpoint_interval = pipeline_post
.checkpoint_interval_micros
.map(Duration::from_micros)
.unwrap_or(DEFAULT_CHECKPOINT_INTERVAL);
.unwrap_or(*config().default_checkpoint_interval);

let job_id = jobs::create_job(
&pipeline_post.name,
Expand Down
5 changes: 2 additions & 3 deletions crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub mod job_metrics;
const CHECKPOINTS_TO_KEEP: u32 = 4;
const CHECKPOINT_ROWS_TO_KEEP: u32 = 100;
const COMPACT_EVERY: u32 = 2;
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);

#[derive(Debug, PartialEq, Eq)]
pub enum WorkerState {
Expand All @@ -58,7 +57,7 @@ pub struct WorkerStatus {

impl WorkerStatus {
fn heartbeat_timeout(&self) -> bool {
self.last_heartbeat.elapsed() > HEARTBEAT_TIMEOUT
self.last_heartbeat.elapsed() > *config().pipeline.worker_heartbeat_timeout
}
}

Expand Down Expand Up @@ -369,7 +368,7 @@ impl RunningJobModel {
}

async fn compact_state(&mut self) -> anyhow::Result<()> {
if !config().controller.compaction.enabled {
if !config().pipeline.compaction.enabled {
info!("Compaction is disabled, skipping compaction");
return Ok(());
}
Expand Down
12 changes: 4 additions & 8 deletions crates/arroyo-controller/src/states/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ use serde_json::json;

use super::{JobContext, State, Transition};

// after this amount of time, we consider the job to be healthy and reset the restarts counter
const HEALTHY_DURATION: Duration = Duration::from_secs(2 * 60);

// how many times we allow the job to restart before moving it to failed
const RESTARTS_ALLOWED: usize = 10;

#[derive(Debug)]
pub struct Running {}

Expand All @@ -36,6 +30,8 @@ impl State for Running {
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
stop_if_desired_running!(self, ctx.config);

let pipeline_config = &config().clone().pipeline;

let running_start = Instant::now();

let mut log_interval = tokio::time::interval(Duration::from_secs(60));
Expand Down Expand Up @@ -92,7 +88,7 @@ impl State for Running {
}
}
_ = tokio::time::sleep(Duration::from_millis(200)) => {
if ctx.status.restarts > 0 && running_start.elapsed() > HEALTHY_DURATION {
if ctx.status.restarts > 0 && running_start.elapsed() > *pipeline_config.healthy_duration {
let restarts = ctx.status.restarts;
ctx.status.restarts = 0;
if let Err(e) = ctx.status.update_db(&ctx.db).await {
Expand Down Expand Up @@ -120,7 +116,7 @@ impl State for Running {
"job_id": ctx.config.id,
"error": format!("{:?}", err),
}));
if ctx.status.restarts >= RESTARTS_ALLOWED as i32 {
if pipeline_config.allowed_restarts != -1 && ctx.status.restarts >= pipeline_config.allowed_restarts {
return Err(fatal(
"Job has restarted too many times",
err
Expand Down
21 changes: 11 additions & 10 deletions crates/arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ use crate::{

use super::{running::Running, JobContext, State, Transition};

const WORKER_STARTUP_TIME: Duration = Duration::from_secs(10 * 60);
const TASK_STARTUP_TIME: Duration = Duration::from_secs(2 * 60);

#[derive(Debug, Clone)]
struct WorkerStatus {
id: WorkerId,
Expand Down Expand Up @@ -189,7 +186,7 @@ impl Scheduling {
slots_for_job = slots_needed,
slots_needed = s
);
if start.elapsed() > WORKER_STARTUP_TIME {
if start.elapsed() > *config().pipeline.worker_startup_time {
return Err(fatal(
"Not enough slots to schedule job",
anyhow!("scheduler error -- needed {} slots", slots_needed),
Expand Down Expand Up @@ -244,10 +241,13 @@ impl State for Scheduling {
let worker_connects = Arc::new(Mutex::new(HashMap::new()));
let mut handles = vec![];

let config = &config().pipeline;

let start = Instant::now();
loop {
let timeout = WORKER_STARTUP_TIME
.min(ctx.config.ttl.unwrap_or(WORKER_STARTUP_TIME))
let timeout = config
.worker_startup_time
.min(ctx.config.ttl.unwrap_or(*config.worker_startup_time))
.checked_sub(start.elapsed())
.unwrap_or(Duration::ZERO);

Expand All @@ -268,7 +268,7 @@ impl State for Scheduling {
_ = tokio::time::sleep(timeout) => {
return Err(ctx.retryable(self,
"timed out while waiting for workers to start",
anyhow!("timed out after {:?} while waiting for worker startup", WORKER_STARTUP_TIME), 3));
anyhow!("timed out after {:?} while waiting for worker startup", *config.worker_startup_time), 3));
}
}

Expand Down Expand Up @@ -499,8 +499,9 @@ impl State for Scheduling {
let start = Instant::now();
let mut started_tasks = HashSet::new();
while started_tasks.len() < ctx.program.task_count() {
let timeout = TASK_STARTUP_TIME
.min(ctx.config.ttl.unwrap_or(TASK_STARTUP_TIME))
let timeout = config
.task_startup_time
.min(ctx.config.ttl.unwrap_or(*config.task_startup_time))
.checked_sub(start.elapsed())
.unwrap_or(Duration::ZERO);

Expand Down Expand Up @@ -528,7 +529,7 @@ impl State for Scheduling {
_ = tokio::time::sleep(timeout) => {
return Err(ctx.retryable(self,
"timed out while waiting for tasks to start",
anyhow!("timed out after {:?} while waiting for worker startup", TASK_STARTUP_TIME), 3));
anyhow!("timed out after {:?} while waiting for worker startup", *config.task_startup_time), 3));
}
}
}
Expand Down
26 changes: 17 additions & 9 deletions crates/arroyo-rpc/default.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
checkpoint-url = "/tmp/arroyo/checkpoints"
default-checkpoint-interval = "10s"

[pipeline]
source-batch-size = 512
source-batch-linger = "100ms"
update-aggregate-flush-interval = "1s"
allowed-restarts = 20
worker-heartbeat-timeout = "30s"
healthy-duration = "2m"
worker-startup-time = "10m"
task-startup-time = "2m"

[pipeline.compaction]
enabled = false
checkpoints-to-compact = 4

# Services

Expand All @@ -11,11 +26,6 @@ bind-address = "0.0.0.0"
rpc-port = 9190
scheduler = "process"

[controller.compaction]
enabled = false
checkpoints-to-compact = 4


[compiler]
bind-address = "0.0.0.0"
rpc-port = 9191
Expand All @@ -40,10 +50,6 @@ task-slots = 16
bind-address = "0.0.0.0"
http-port = 8001

[pipeline]
source-batch-size = 512
source-batch-linger = "100ms"
update-aggregate-flush-interval = "1s"

# Schedulers

Expand All @@ -63,6 +69,8 @@ resources = { requests = { cpu = "900m", memory = "500Mi" } }
task-slots = 16
command = "/app/arroyo-bin worker"

# other

[database]
type = "postgres"

Expand Down
24 changes: 21 additions & 3 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ pub struct Config {
/// URL of an object store or filesystem for storing checkpoints
pub checkpoint_url: String,

/// Default interval for checkpointing
pub default_checkpoint_interval: HumanReadableDuration,

/// The endpoint of the controller, used by other services to connect to it. This must be set
/// if running the controller on a separate machine from the other services or on a separate
/// process with a non-standard port.
Expand Down Expand Up @@ -273,8 +276,6 @@ pub struct ControllerConfig {

/// The scheduler to use
pub scheduler: Scheduler,

pub compaction: CompactionConfig,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -381,8 +382,25 @@ pub struct PipelineConfig {
/// Batch linger time (how long to wait before flushing)
pub source_batch_linger: HumanReadableDuration,

// How often to flush aggregates
/// How often to flush aggregates
pub update_aggregate_flush_interval: HumanReadableDuration,

/// How many restarts to allow before moving to failed (-1 for infinite)
pub allowed_restarts: i32,

/// After this amount of time, we consider the job to be healthy and reset the restarts counter
pub healthy_duration: HumanReadableDuration,

/// Number of seconds to wait for a worker heartbeat before considering it dead
pub worker_heartbeat_timeout: HumanReadableDuration,

/// Amount of time to wait for workers to start up before considering them failed
pub worker_startup_time: HumanReadableDuration,

/// Amount of time to wait for tasks to startup before considering it failed
pub task_startup_time: HumanReadableDuration,

pub compaction: CompactionConfig,
}

#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ParquetBackend {
operator_id: String,
epoch: u32,
) -> Result<HashMap<String, TableCheckpointMetadata>> {
let min_files_to_compact = config().controller.compaction.checkpoints_to_compact as usize;
let min_files_to_compact = config().pipeline.compaction.checkpoints_to_compact as usize;

let operator_checkpoint_metadata =
Self::load_operator_metadata(&job_id, &operator_id, epoch)
Expand Down
2 changes: 1 addition & 1 deletion k8s/arroyo/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ maintainers:
email: [email protected]
url: https://arroyo.dev

icon: https://raw.githubusercontent.com/ArroyoSystems/arroyo/master/images/arroyo_logo.png
icon: https://raw.githubusercontent.com/ArroyoSystems/arroyo/afc004609351f6e4cd2c1f467bb7bf33ad044ccd/images/arroyo_logo.png
4 changes: 0 additions & 4 deletions k8s/arroyo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ Arroyo control plane. This is the easiest way to get a production quality Arroyo
See the [docs](https://doc.arroyo.dev/deployment/kubernetes) for full information on how to use this helm chart.

Each version of the helm chart is associated by default with a particular release of Arroyo. The latest release
<<<<<<< Updated upstream
is [0.10.0](https://www.arroyo.dev/blog/arroyo-0-10-0).
=======
is [0.10.3](https://www.arroyo.dev/blog/arroyo-0-10-0).
>>>>>>> Stashed changes

## Quickstart

Expand Down

0 comments on commit 3d4bd7f

Please sign in to comment.