Skip to content

Commit

Permalink
feat: add ParquetEncryptionMode, to permit unencrypted files in initi…
Browse files Browse the repository at this point in the history
…al key rotation
  • Loading branch information
srh authored Sep 12, 2024
1 parent 419678c commit 1963799
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 25 deletions.
23 changes: 18 additions & 5 deletions parquet/src/file/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,41 @@ pub struct ParquetEncryptionKeyInfo {
pub key: ParquetEncryptionKey,
}

/// Tells what mode (and also the key value(s)) a file is to be encrypted in (when writing) or is
/// permitted to be encrypted in (when reading).
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ParquetEncryptionMode {
/// Means the file is unencrypted
Unencrypted,
/// Means the file is encrypted with encrypted footer mode. The same
/// key is used for all the columns too, in this implementation.
EncryptedFooter(ParquetEncryptionKeyInfo),
}

/// Describes general parquet encryption configuration -- new files are encrypted with the
/// write_key(), but old files can be decrypted with any of the valid read keys.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ParquetEncryptionConfig {
// The last key is the write key, and all the keys are valid read keys.
keys: Vec<ParquetEncryptionKeyInfo>,
// The last mode is the write mode (i.e. it has the write key), and all the prior modes are
// valid read modes (i.e. valid read keys, or Unencrypted mode, if a user turned on encryption
// but hasn't key-rotated unencrypted files away yet).
keys: Vec<ParquetEncryptionMode>,
}

impl ParquetEncryptionConfig {
pub fn new(keys: Vec<ParquetEncryptionKeyInfo>) -> Option<ParquetEncryptionConfig> {
pub fn new(keys: Vec<ParquetEncryptionMode>) -> Option<ParquetEncryptionConfig> {
if keys.is_empty() {
None
} else {
Some(ParquetEncryptionConfig { keys })
}
}

pub fn write_key(&self) -> &ParquetEncryptionKeyInfo {
pub fn write_key(&self) -> &ParquetEncryptionMode {
self.keys.last().unwrap()
}

pub fn read_keys(&self) -> &[ParquetEncryptionKeyInfo] {
pub fn read_keys(&self) -> &[ParquetEncryptionMode] {
self.keys.as_slice()
}
}
Expand Down
55 changes: 40 additions & 15 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::schema::types::{self, SchemaDescriptor};
use crate::file::{
encryption::{
decrypt_module, parquet_magic, ParquetEncryptionConfig, ParquetEncryptionKey,
ParquetEncryptionKeyInfo, RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE,
PARQUET_KEY_HASH_LENGTH,
ParquetEncryptionKeyInfo, ParquetEncryptionMode, RandomFileIdentifier,
AAD_FILE_UNIQUE_SIZE, PARQUET_KEY_HASH_LENGTH,
},
PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE, PARQUET_MAGIC_UNSUPPORTED_PARE,
};
Expand All @@ -59,10 +59,15 @@ fn select_key(
}
let mut key_id_arr = [0u8; PARQUET_KEY_HASH_LENGTH];
key_id_arr.copy_from_slice(&key_id);
let read_keys: &[ParquetEncryptionKeyInfo] = encryption_config.read_keys();
for key_info in read_keys {
if key_info.key.compute_key_hash() == key_id_arr {
return Ok(key_info.key);
let read_keys: &[ParquetEncryptionMode] = encryption_config.read_keys();
for mode in read_keys {
match mode {
ParquetEncryptionMode::Unencrypted => {}
ParquetEncryptionMode::EncryptedFooter(key_info) => {
if key_info.key.compute_key_hash() == key_id_arr {
return Ok(key_info.key);
}
}
}
}
return Err(general_err!(
Expand Down Expand Up @@ -103,20 +108,38 @@ pub fn parse_metadata<R: ChunkReader>(
default_end_reader.read_exact(&mut default_len_end_buf)?;

// check this is indeed a parquet file
let encrypted_footer: bool;
{
// and check that its encryption setting conceivably matches our encryption_config (but without yet checking keys)
let trailing_magic: &[u8] = &default_len_end_buf[default_end_len - 4..];
if trailing_magic != parquet_magic(encryption_config.is_some()) {
if trailing_magic == PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file in encrypted mode. File (or at least the Parquet footer) is not encrypted"));
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE {
if trailing_magic == PARQUET_MAGIC {
if let Some(config) = encryption_config {
if !config
.read_keys()
.iter()
.any(|m| matches!(m, ParquetEncryptionMode::Unencrypted))
{
return Err(general_err!("Invalid Parquet file in encrypted mode. File (or at least the Parquet footer) is not encrypted"));
}
}
encrypted_footer = false;
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE {
let has_keys = encryption_config.as_ref().map_or(false, |config| {
config
.read_keys()
.iter()
.any(|m| matches!(m, ParquetEncryptionMode::EncryptedFooter(_)))
});
if !has_keys {
return Err(general_err!(
"Invalid Parquet file in unencrypted mode. File is encrypted"
));
} else if trailing_magic == PARQUET_MAGIC_UNSUPPORTED_PARE {
return Err(general_err!("Unsupported Parquet file. File is encrypted with the standard PARE encryption format"));
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
encrypted_footer = true;
} else if trailing_magic == PARQUET_MAGIC_UNSUPPORTED_PARE {
return Err(general_err!("Unsupported Parquet file. File is encrypted with the standard PARE encryption format"));
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
}

Expand Down Expand Up @@ -159,7 +182,9 @@ pub fn parse_metadata<R: ChunkReader>(
let returned_encryption_key: Option<ParquetEncryptionKey>;

let random_file_identifier: Option<RandomFileIdentifier>;
if let Some(encryption_config) = encryption_config {
if encrypted_footer {
let encryption_config: &ParquetEncryptionConfig =
encryption_config.as_ref().unwrap();
let file_crypto_metadata = {
let mut prot = TCompactInputProtocol::new(&mut metadata_read);
TFileCryptoMetaData::read_from_in_protocol(&mut prot).map_err(|e| {
Expand Down
18 changes: 13 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ mod tests {
use crate::compression::{create_codec, Codec};
use crate::file::encryption::{
generate_random_file_identifier, ParquetEncryptionConfig,
ParquetEncryptionKeyInfo,
ParquetEncryptionKeyInfo, ParquetEncryptionMode,
};
use crate::file::reader::Length;
use crate::file::{
Expand Down Expand Up @@ -1367,8 +1367,12 @@ mod tests {

file_writer.close().unwrap();

let encryption_config = encryption_info
.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap());
let encryption_config = encryption_info.map(|(key_info, _)| {
ParquetEncryptionConfig::new(vec![ParquetEncryptionMode::EncryptedFooter(
key_info,
)])
.unwrap()
});
let reader = assert_send(
SerializedFileReader::new_maybe_encrypted(file, &encryption_config).unwrap(),
);
Expand Down Expand Up @@ -1479,8 +1483,12 @@ mod tests {
let buffer = cursor.into_inner().unwrap();

let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let encryption_config = encryption_info
.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap());
let encryption_config = encryption_info.map(|(key_info, _)| {
ParquetEncryptionConfig::new(vec![ParquetEncryptionMode::EncryptedFooter(
key_info,
)])
.unwrap()
});
let reader =
SerializedFileReader::new_maybe_encrypted(reading_cursor, &encryption_config)
.unwrap();
Expand Down

0 comments on commit 1963799

Please sign in to comment.