Skip to content

Commit

Permalink
Add flowctl catalog delete subcommand
Browse files Browse the repository at this point in the history
Deletion of catalog specs requires several steps, and plenty of
opportunities for mistakes. This commit adds a single-step deletion
command, with confirmation.

Before:

- `flowctl draft crate`
- `flowctl catalog draft --name acmeCo/foo --delete`
- `flowctl catalog draft --name acmeCo/bar --delete`
- `flowctl draft publish`

After: `flowctl catalog delete --prefix acmeCo/`
  • Loading branch information
psFried committed Feb 1, 2023
1 parent 253c707 commit 0f46a3e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 16 deletions.
132 changes: 132 additions & 0 deletions crates/flowctl/src/catalog/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::{api_exec, catalog, draft, CliContext};
use anyhow::Context;
use serde::Serialize;

#[derive(Debug, clap::Args)]
pub struct Delete {
#[clap(flatten)]
pub name_selector: catalog::NameSelector,
#[clap(flatten)]
pub type_selector: catalog::SpecTypeSelector,
/// Proceed with deletion without prompting for confirmation.
///
/// Normally, delete will stop and ask for confirmation before it proceeds. This flag disables
/// that confirmation. This is sometimes required in order to run flowctl non-interactively,
/// such as in a shell script.
pub dangerous_auto_approve: bool,
}

#[derive(Serialize, Debug)]
struct DraftSpec {
draft_id: String,
catalog_name: String,
expect_pub_id: String,
spec_type: serde_json::Value, // always null, since we're deleting
spec: serde_json::Value, // always null, since we're deleting
}

pub async fn do_delete(
ctx: &mut CliContext,
Delete {
name_selector,
type_selector,
dangerous_auto_approve,
}: &Delete,
) -> anyhow::Result<()> {
let list_args = catalog::List {
flows: false,
name_selector: name_selector.clone(),
type_selector: type_selector.clone(),
deleted: false,
};

let client = ctx.controlplane_client()?;
let specs = catalog::fetch_live_specs(
client.clone(),
&list_args,
vec![
"id",
"catalog_name",
"spec_type",
"updated_at",
"last_pub_id",
"last_pub_user_email",
"last_pub_user_id",
"last_pub_user_full_name",
],
)
.await
.context("fetching live specs")?;

if specs.is_empty() {
anyhow::bail!("no specs found matching given selector");
}

// show the user the specs before we ask for confirmation
ctx.write_all(specs.clone(), false)?;

if !(*dangerous_auto_approve || prompt_to_continue().await) {
anyhow::bail!("delete operation cancelled");
}

let draft = draft::create_draft(client.clone())
.await
.context("failed to create draft")?;
println!(
"Deleting {} item(s) using draft: {}",
specs.len(),
&draft.id
);
tracing::info!(draft_id = %draft.id, "created draft");

// create the draft specs now, so we can pass owned `specs` to `write_all`
let draft_specs = specs
.into_iter()
.map(|spec| DraftSpec {
draft_id: draft.id.clone(),
catalog_name: spec.catalog_name.clone(),
spec_type: serde_json::Value::Null,
spec: serde_json::Value::Null,
expect_pub_id: spec
.last_pub_id
.clone()
.expect("spec is missing last_pub_id"),
})
.collect::<Vec<DraftSpec>>();

api_exec::<Vec<serde_json::Value>>(
ctx.controlplane_client()?
.from("draft_specs")
//.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&draft_specs).unwrap())
.on_conflict("draft_id,catalog_name"),
)
.await?;
tracing::debug!("added deletions to draft");

draft::publish(client.clone(), false, &draft.id).await?;

// extra newline before, since `publish` will output a bunch of logs
println!("\nsuccessfully deleted {} spec(s)", draft_specs.len());
Ok(())
}

async fn prompt_to_continue() -> bool {
tokio::task::spawn_blocking(|| {
println!(
"\nIf you continue, the listed specs will all be deleted. This cannot be undone.\n\
Enter the word 'delete' to continue, or anything else to abort:\n"
);
let mut buf = String::with_capacity(8);

match std::io::stdin().read_line(&mut buf) {
Ok(_) => buf.trim() == "delete",
Err(err) => {
tracing::error!(error = %err, "failed to read from stdin");
false
}
}
})
.await
.expect("failed to join spawned task")
}
70 changes: 55 additions & 15 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod delete;
mod publish;
mod pull_specs;
mod test;
Expand All @@ -23,6 +24,13 @@ pub struct Catalog {
pub enum Command {
/// List catalog specifications.
List(List),

/// Delete catalog specifications.
///
/// Permanently deletes catalog specifications.
/// **WARNING:** deleting a task is permanent and cannot be undone.
Delete(delete::Delete),

/// Pull down catalog specifications into a local directory.
///
/// Writes catalog specifications into a local directory so that
Expand Down Expand Up @@ -143,32 +151,50 @@ pub struct SpecTypeSelector {
}

impl SpecTypeSelector {
/// Adds postgrest query parameters based on the arugments provided to filter specs based on the `spec_type` column.
/// The `include_deleted` paraemeter specifies whether to include deleted specs. This is handled outside of the
/// `SpecTypeSelector` arguments, because it doesn't make sense in all contexts. For example, it wouldn't make sense
/// to have a `--deleted` selector in `flowctl catalog delete`, but it does make sense as part of `flowctl catalog list`.
pub fn add_live_specs_filters<'a>(
&self,
mut builder: postgrest::Builder<'a>,
include_deleted: bool,
) -> postgrest::Builder<'a> {
let all = &[
(CatalogSpecType::Capture, self.captures),
(CatalogSpecType::Collection, self.collections),
(CatalogSpecType::Materialization, self.materializations),
(CatalogSpecType::Test, self.tests),
(CatalogSpecType::Capture.as_ref(), "eq", self.captures),
(CatalogSpecType::Collection.as_ref(), "eq", self.collections),
(
CatalogSpecType::Materialization.as_ref(),
"eq",
self.materializations,
),
(CatalogSpecType::Test.as_ref(), "eq", self.tests),
// Deleted specs, will always be Some, so that an empty type selector
("null", "is", Some(include_deleted)),
];

// If any of the types were explicitly included, then we'll add
// an `or.` that only includes items for each explicitly included type.
if self.has_any_include_types() {
if self.has_any_include_types() || include_deleted {
let expr = all
.iter()
.filter(|(_, inc)| inc.unwrap_or(false))
.map(|(ty, _)| format!("spec_type.eq.{ty}"))
.filter(|(_, _, inc)| inc.unwrap_or(false))
.map(|(ty, op, _)| format!("spec_type.{op}.{ty}"))
.join(",");
builder = builder.or(expr);
} else {
// If no types were explicitly included, then we can just add
// an `neq.` for each explicitly excluded type, since postgrest
// implicitly applies AND logic there.
for (ty, _) in all.iter().filter(|(_, inc)| *inc == Some(false)) {
// If no types were explicitly included, then we can just filter out the types we _don't_ want.
// We need to use `IS NOT NULL` to filter out deleted specs, rather than using `neq`.
// Postgrest implicitly applies AND logic for these.
for (ty, _, _) in all
.iter()
.filter(|(_, op, inc)| *op == "eq" && *inc == Some(false))
{
builder = builder.neq("spec_type", ty);
}
if !include_deleted {
builder = builder.not("is", "spec_type", "null");
}
}
builder
}
Expand Down Expand Up @@ -218,6 +244,9 @@ pub struct List {
pub name_selector: NameSelector,
#[clap(flatten)]
pub type_selector: SpecTypeSelector,
/// Include deleted specs, which have no type.
#[clap(long)]
pub deleted: bool,
}

impl List {
Expand Down Expand Up @@ -265,6 +294,7 @@ impl Catalog {
pub async fn run(&self, ctx: &mut crate::CliContext) -> Result<(), anyhow::Error> {
match &self.cmd {
Command::List(list) => do_list(ctx, list).await,
Command::Delete(del) => delete::do_delete(ctx, del).await,
Command::PullSpecs(pull) => pull_specs::do_pull_specs(ctx, pull).await,
Command::Publish(publish) => publish::do_publish(ctx, publish).await,
Command::Test(source) => test::do_test(ctx, source).await,
Expand All @@ -280,24 +310,34 @@ pub async fn fetch_live_specs(
columns: Vec<&'static str>,
) -> anyhow::Result<Vec<LiveSpecRow>> {
let builder = cp_client.from("live_specs_ext").select(columns.join(","));
let builder = list.type_selector.add_live_specs_filters(builder);
let builder = list
.type_selector
.add_live_specs_filters(builder, list.deleted);
let builder = list.name_selector.add_live_specs_filters(builder);

let rows = api_exec(builder).await?;
Ok(rows)
}

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveSpecRow {
pub catalog_name: String,
pub id: String,
pub updated_at: crate::Timestamp,
pub spec_type: Option<CatalogSpecType>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_pub_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_pub_user_email: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_pub_user_full_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_pub_user_id: Option<uuid::Uuid>,
pub spec_type: Option<CatalogSpecType>,
pub updated_at: crate::Timestamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub reads_from: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub writes_to: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub spec: Option<Box<serde_json::value::RawValue>>,
}

Expand Down
1 change: 1 addition & 0 deletions crates/flowctl/src/catalog/pull_specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Re
flows: true,
name_selector: args.name_selector.clone(),
type_selector: args.type_selector.clone(),
deleted: false, // deleted specs have nothing to pull
};

let live_specs = fetch_live_specs(client, &list_args, columns).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ async fn fetch_async(resource: url::Url) -> Result<bytes::Bytes, anyhow::Error>
}
}

#[derive(serde::Deserialize, serde::Serialize, Debug)]
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
pub struct Timestamp(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime);

impl std::fmt::Display for Timestamp {
Expand Down

0 comments on commit 0f46a3e

Please sign in to comment.