This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new efe7bfd8b3 Support decryption of Parquet column and offset indexes
(#7399)
efe7bfd8b3 is described below
commit efe7bfd8b327ed0cef61908ce0875f1e6b489485
Author: Adam Reeve <[email protected]>
AuthorDate: Fri Apr 18 01:24:50 2025 +1200
Support decryption of Parquet column and offset indexes (#7399)
* Add initial test for applying decryption to page index.
* Add initial code for column_index read
* Handle non-uniform encryption
* Verify column and offset index are read
* Extend tests
* Tidy ups
---------
Co-authored-by: Corwin Joy <[email protected]>
---
parquet/src/encryption/decrypt.rs | 24 ++++++
parquet/src/file/metadata/reader.rs | 111 +++++++++++++++++++++++++--
parquet/tests/encryption/encryption.rs | 52 ++++++++++++-
parquet/tests/encryption/encryption_async.rs | 56 +++++++++++++-
parquet/tests/encryption/encryption_util.rs | 32 ++++++++
5 files changed, 266 insertions(+), 9 deletions(-)
diff --git a/parquet/src/encryption/decrypt.rs
b/parquet/src/encryption/decrypt.rs
index 0927421344..6a51f1a657 100644
--- a/parquet/src/encryption/decrypt.rs
+++ b/parquet/src/encryption/decrypt.rs
@@ -220,6 +220,26 @@ impl CryptoContext {
)
}
+ pub(crate) fn create_column_index_aad(&self) -> Result<Vec<u8>> {
+ create_module_aad(
+ self.file_aad(),
+ ModuleType::ColumnIndex,
+ self.row_group_idx,
+ self.column_ordinal,
+ self.page_ordinal,
+ )
+ }
+
+ pub(crate) fn create_offset_index_aad(&self) -> Result<Vec<u8>> {
+ create_module_aad(
+ self.file_aad(),
+ ModuleType::OffsetIndex,
+ self.row_group_idx,
+ self.column_ordinal,
+ self.page_ordinal,
+ )
+ }
+
pub(crate) fn for_dictionary_page(&self) -> Self {
Self {
row_group_idx: self.row_group_idx,
@@ -236,6 +256,10 @@ impl CryptoContext {
&self.data_decryptor
}
+ pub(crate) fn metadata_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
+ &self.metadata_decryptor
+ }
+
pub(crate) fn file_aad(&self) -> &Vec<u8> {
&self.file_aad
}
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index aebf1a8906..00b6b2d4f5 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -27,7 +27,7 @@ use crate::encryption::{
};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
+use crate::file::metadata::{ColumnChunkMetaData, FileMetaData,
ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
use crate::file::reader::ChunkReader;
@@ -41,6 +41,9 @@ use crate::thrift::{TCompactSliceInputProtocol,
TSerializable};
#[cfg(all(feature = "async", feature = "arrow"))]
use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::CryptoContext;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
/// Reads the [`ParquetMetaData`] from a byte stream.
///
@@ -511,14 +514,22 @@ impl ParquetMetaDataReader {
let index = metadata
.row_groups()
.iter()
- .map(|x| {
+ .enumerate()
+ .map(|(rg_idx, x)| {
x.columns()
.iter()
- .map(|c| match c.column_index_range() {
+ .enumerate()
+ .map(|(col_idx, c)| match c.column_index_range() {
Some(r) => {
let r_start = usize::try_from(r.start -
start_offset)?;
let r_end = usize::try_from(r.end -
start_offset)?;
- decode_column_index(&bytes[r_start..r_end],
c.column_type())
+ Self::parse_single_column_index(
+ &bytes[r_start..r_end],
+ metadata,
+ c,
+ rg_idx,
+ col_idx,
+ )
}
None => Ok(Index::NONE),
})
@@ -530,20 +541,67 @@ impl ParquetMetaDataReader {
Ok(())
}
+ #[cfg(feature = "encryption")]
+ fn parse_single_column_index(
+ bytes: &[u8],
+ metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ row_group_index: usize,
+ col_index: usize,
+ ) -> Result<Index> {
+ match &column.column_crypto_metadata {
+ Some(crypto_metadata) => {
+ let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
+ general_err!("Cannot decrypt column index, no file
decryptor set")
+ })?;
+ let crypto_context = CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ row_group_index,
+ col_index,
+ )?;
+ let column_decryptor = crypto_context.metadata_decryptor();
+ let aad = crypto_context.create_column_index_aad()?;
+ let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+ decode_column_index(&plaintext, column.column_type())
+ }
+ None => decode_column_index(bytes, column.column_type()),
+ }
+ }
+
+ #[cfg(not(feature = "encryption"))]
+ fn parse_single_column_index(
+ bytes: &[u8],
+ _metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ _row_group_index: usize,
+ _col_index: usize,
+ ) -> Result<Index> {
+ decode_column_index(bytes, column.column_type())
+ }
+
fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) ->
Result<()> {
let metadata = self.metadata.as_mut().unwrap();
if self.offset_index {
let index = metadata
.row_groups()
.iter()
- .map(|x| {
+ .enumerate()
+ .map(|(rg_idx, x)| {
x.columns()
.iter()
- .map(|c| match c.offset_index_range() {
+ .enumerate()
+ .map(|(col_idx, c)| match c.offset_index_range() {
Some(r) => {
let r_start = usize::try_from(r.start -
start_offset)?;
let r_end = usize::try_from(r.end -
start_offset)?;
- decode_offset_index(&bytes[r_start..r_end])
+ Self::parse_single_offset_index(
+ &bytes[r_start..r_end],
+ metadata,
+ c,
+ rg_idx,
+ col_idx,
+ )
}
None => Err(general_err!("missing offset index")),
})
@@ -556,6 +614,45 @@ impl ParquetMetaDataReader {
Ok(())
}
+ #[cfg(feature = "encryption")]
+ fn parse_single_offset_index(
+ bytes: &[u8],
+ metadata: &ParquetMetaData,
+ column: &ColumnChunkMetaData,
+ row_group_index: usize,
+ col_index: usize,
+ ) -> Result<OffsetIndexMetaData> {
+ match &column.column_crypto_metadata {
+ Some(crypto_metadata) => {
+ let file_decryptor =
metadata.file_decryptor.as_ref().ok_or_else(|| {
+ general_err!("Cannot decrypt offset index, no file
decryptor set")
+ })?;
+ let crypto_context = CryptoContext::for_column(
+ file_decryptor,
+ crypto_metadata,
+ row_group_index,
+ col_index,
+ )?;
+ let column_decryptor = crypto_context.metadata_decryptor();
+ let aad = crypto_context.create_offset_index_aad()?;
+ let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+ decode_offset_index(&plaintext)
+ }
+ None => decode_offset_index(bytes),
+ }
+ }
+
+ #[cfg(not(feature = "encryption"))]
+ fn parse_single_offset_index(
+ bytes: &[u8],
+ _metadata: &ParquetMetaData,
+ _column: &ColumnChunkMetaData,
+ _row_group_index: usize,
+ _col_index: usize,
+ ) -> Result<OffsetIndexMetaData> {
+ decode_offset_index(bytes)
+ }
+
fn range_for_page_index(&self) -> Option<Range<u64>> {
// sanity check
self.metadata.as_ref()?;
diff --git a/parquet/tests/encryption/encryption.rs
b/parquet/tests/encryption/encryption.rs
index 86a148be2b..664850e507 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -17,7 +17,9 @@
//! This module contains tests for reading encrypted Parquet files with the
Arrow API
-use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use crate::encryption_util::{
+ verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
+};
use arrow::array::*;
use arrow::error::Result as ArrowResult;
use arrow_array::{Int32Array, RecordBatch};
@@ -29,6 +31,7 @@ use parquet::arrow::ArrowWriter;
use parquet::data_type::{ByteArray, ByteArrayType};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
@@ -726,6 +729,53 @@ pub fn
test_retrieve_row_group_statistics_after_encrypted_write() {
);
}
+#[test]
+fn test_decrypt_page_index_uniform() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+
+ let key_code: &[u8] = "0123456789012345".as_bytes();
+ let decryption_properties =
FileDecryptionProperties::builder(key_code.to_vec())
+ .build()
+ .unwrap();
+
+ test_decrypt_page_index(&path, decryption_properties).unwrap();
+}
+
+#[test]
+fn test_decrypt_page_index_non_uniform() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+
+ let footer_key = "0123456789012345".as_bytes().to_vec();
+ let column_1_key = "1234567890123450".as_bytes().to_vec();
+ let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+ let decryption_properties =
FileDecryptionProperties::builder(footer_key.to_vec())
+ .with_column_key("double_field", column_1_key)
+ .with_column_key("float_field", column_2_key)
+ .build()
+ .unwrap();
+
+ test_decrypt_page_index(&path, decryption_properties).unwrap();
+}
+
+fn test_decrypt_page_index(
+ path: &str,
+ decryption_properties: FileDecryptionProperties,
+) -> Result<(), ParquetError> {
+ let file = File::open(path)?;
+ let options = ArrowReaderOptions::default()
+ .with_file_decryption_properties(decryption_properties)
+ .with_page_index(true);
+
+ let arrow_metadata = ArrowReaderMetadata::load(&file, options)?;
+
+ verify_column_indexes(arrow_metadata.metadata());
+
+ Ok(())
+}
+
fn read_and_roundtrip_to_encrypted_file(
path: &str,
decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/encryption/encryption_async.rs
b/parquet/tests/encryption/encryption_async.rs
index 11448207c6..e0fbbcdfaf 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -17,7 +17,9 @@
//! This module contains tests for reading encrypted Parquet files with the
async Arrow API
-use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use crate::encryption_util::{
+ verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
+};
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::arrow_writer::ArrowWriterOptions;
@@ -382,6 +384,58 @@ async fn test_uniform_encryption_with_key_retriever() {
.unwrap();
}
+#[tokio::test]
+async fn test_decrypt_page_index_uniform() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+
+ let key_code: &[u8] = "0123456789012345".as_bytes();
+ let decryption_properties =
FileDecryptionProperties::builder(key_code.to_vec())
+ .build()
+ .unwrap();
+
+ test_decrypt_page_index(&path, decryption_properties)
+ .await
+ .unwrap();
+}
+
+#[tokio::test]
+async fn test_decrypt_page_index_non_uniform() {
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+
+ let footer_key = "0123456789012345".as_bytes().to_vec();
+ let column_1_key = "1234567890123450".as_bytes().to_vec();
+ let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+ let decryption_properties =
FileDecryptionProperties::builder(footer_key.to_vec())
+ .with_column_key("double_field", column_1_key)
+ .with_column_key("float_field", column_2_key)
+ .build()
+ .unwrap();
+
+ test_decrypt_page_index(&path, decryption_properties)
+ .await
+ .unwrap();
+}
+
+async fn test_decrypt_page_index(
+ path: &str,
+ decryption_properties: FileDecryptionProperties,
+) -> Result<(), ParquetError> {
+ let mut file = File::open(&path).await?;
+
+ let options = ArrowReaderOptions::new()
+ .with_file_decryption_properties(decryption_properties)
+ .with_page_index(true);
+
+ let arrow_metadata = ArrowReaderMetadata::load_async(&mut file,
options).await?;
+
+ verify_column_indexes(arrow_metadata.metadata());
+
+ Ok(())
+}
+
async fn verify_encryption_test_file_read_async(
file: &mut tokio::fs::File,
decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/encryption/encryption_util.rs
b/parquet/tests/encryption/encryption_util.rs
index f0d1152fb0..382193d258 100644
--- a/parquet/tests/encryption/encryption_util.rs
+++ b/parquet/tests/encryption/encryption_util.rs
@@ -88,6 +88,38 @@ pub fn verify_encryption_test_data(record_batches:
Vec<RecordBatch>, metadata: &
assert_eq!(row_count, file_metadata.num_rows() as usize);
}
+/// Verifies that the column and offset indexes were successfully read from an
+/// encrypted test file.
+pub fn verify_column_indexes(metadata: &ParquetMetaData) {
+ let offset_index = metadata.offset_index().unwrap();
+ // 1 row group, 8 columns
+ assert_eq!(offset_index.len(), 1);
+ assert_eq!(offset_index[0].len(), 8);
+ // Check float column, which is encrypted in the non-uniform test file
+ let float_col_idx = 4;
+ let offset_index = &offset_index[0][float_col_idx];
+ assert_eq!(offset_index.page_locations.len(), 1);
+ assert!(offset_index.page_locations[0].offset > 0);
+
+ let column_index = metadata.column_index().unwrap();
+ assert_eq!(column_index.len(), 1);
+ assert_eq!(column_index[0].len(), 8);
+ let column_index = &column_index[0][float_col_idx];
+
+ match column_index {
+ parquet::file::page_index::index::Index::FLOAT(float_index) => {
+ assert_eq!(float_index.indexes.len(), 1);
+ assert_eq!(float_index.indexes[0].min, Some(0.0f32));
+ assert!(float_index.indexes[0]
+ .max
+ .is_some_and(|max| (max - 53.9).abs() < 1e-6));
+ }
+ _ => {
+ panic!("Expected a float column index for column {}",
float_col_idx);
+ }
+ };
+}
+
/// A KeyRetriever to use in Parquet encryption tests,
/// which stores a map from key names/metadata to encryption key bytes.
pub struct TestKeyRetriever {