diff --git a/crates/arroyo-operator/src/context.rs b/crates/arroyo-operator/src/context.rs index 28281ab42..32fb3610f 100644 --- a/crates/arroyo-operator/src/context.rs +++ b/crates/arroyo-operator/src/context.rs @@ -27,7 +27,6 @@ use std::time::{Instant, SystemTime}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio::sync::Notify; -use tracing::log::debug; use tracing::{trace, warn}; pub type QueueItem = ArrowMessage; diff --git a/crates/arroyo-sql-testing/src/smoke_tests.rs b/crates/arroyo-sql-testing/src/smoke_tests.rs index bde186622..82fdc2f5e 100644 --- a/crates/arroyo-sql-testing/src/smoke_tests.rs +++ b/crates/arroyo-sql-testing/src/smoke_tests.rs @@ -16,9 +16,7 @@ use tokio::sync::mpsc::{channel, Receiver}; use crate::udfs::get_udfs; use arroyo_rpc::config; -use arroyo_rpc::grpc::rpc::{ - CheckpointMetadata, StopMode, TaskCheckpointCompletedReq, TaskCheckpointEventReq, -}; +use arroyo_rpc::grpc::rpc::{StopMode, TaskCheckpointCompletedReq, TaskCheckpointEventReq}; use arroyo_rpc::{CompactionResult, ControlMessage, ControlResp}; use arroyo_state::checkpoint_state::CheckpointState; use arroyo_types::{to_micros, CheckpointBarrier}; diff --git a/crates/arroyo-worker/src/engine.rs b/crates/arroyo-worker/src/engine.rs index cc03688b7..5bc60d3a4 100644 --- a/crates/arroyo-worker/src/engine.rs +++ b/crates/arroyo-worker/src/engine.rs @@ -32,7 +32,7 @@ use bincode::{Decode, Encode}; use futures::stream::FuturesUnordered; use futures::StreamExt; use petgraph::graph::{DiGraph, NodeIndex}; -use petgraph::visit::{EdgeRef, IntoEdgesDirected, NodeRef}; +use petgraph::visit::EdgeRef; use petgraph::{dot, Direction}; use std::collections::{BTreeMap, HashMap}; use std::fmt::{Debug, Formatter}; @@ -224,7 +224,7 @@ impl Program { let checkpoint_metadata = if let Some(epoch) = restore_epoch { info!("Restoring checkpoint {} for job {}", epoch, job_id); Some( - StateBackend::load_checkpoint_metadata(&job_id, epoch) + StateBackend::load_checkpoint_metadata(job_id, epoch) .await .unwrap_or_else(|_| { panic!("failed to load checkpoint metadata for epoch {}", epoch) @@ -532,7 +532,7 @@ impl Engine { for idx in node_indexes { futures.push(self.schedule_node( - &self.program.control_tx.as_ref().unwrap(), + self.program.control_tx.as_ref().unwrap(), idx, ready.clone(), )); diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index f6dde776a..87c013663 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -44,7 +44,6 @@ use arroyo_df::physical::new_registry; use arroyo_rpc::config::config; use arroyo_server_common::shutdown::ShutdownGuard; use arroyo_server_common::wrap_start; -use arroyo_state::{BackingStore, StateBackend}; pub mod arrow;