This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new aa26c0cfb1 [thrift-remodel] Use new writer to write Parquet file
metadata (#8445)
aa26c0cfb1 is described below
commit aa26c0cfb1493ce112cf65144783bfef42bf8c94
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Sep 26 09:23:33 2025 -0700
[thrift-remodel] Use new writer to write Parquet file metadata (#8445)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
- Part of #5854.
# Rationale for this change
This PR closes the loop and and now Parquet metadata is completely
handled by the new code.
# What changes are included in this PR?
Changes the metadata builders to use the new structs rather than those
from `format`. As a consequence, the `close` methods no longer return a
`format::FileMetaData` but instead return a `ParquetMetaData`.
# Are these changes tested?
Covered by existing tests, but many tests were modified to deal with the
switch to `ParquetMetaData` mentioned above.
# Are there any user-facing changes?
Yes
---
parquet/benches/metadata.rs | 46 ++--
parquet/src/arrow/arrow_reader/mod.rs | 3 +-
parquet/src/arrow/arrow_writer/mod.rs | 87 ++++---
parquet/src/arrow/async_writer/mod.rs | 6 +-
parquet/src/column/writer/mod.rs | 13 +-
parquet/src/encryption/encrypt.rs | 35 +--
parquet/src/file/column_crypto_metadata.rs | 16 ++
parquet/src/file/metadata/memory.rs | 7 +
parquet/src/file/metadata/mod.rs | 10 +-
parquet/src/file/metadata/thrift_gen.rs | 373 +++++++++++++++++++++++++--
parquet/src/file/metadata/writer.rs | 328 +++++++++++------------
parquet/src/file/serialized_reader.rs | 4 +-
parquet/src/file/writer.rs | 116 ++++-----
parquet/src/schema/types.rs | 18 ++
parquet/tests/encryption/encryption.rs | 20 +-
parquet/tests/encryption/encryption_async.rs | 10 +-
16 files changed, 711 insertions(+), 381 deletions(-)
diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index ced0175da8..d05f1e09cb 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#[cfg(feature = "arrow")]
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::ParquetMetaDataReader;
use rand::Rng;
use thrift::protocol::TCompactOutputProtocol;
@@ -164,7 +166,7 @@ fn get_footer_bytes(data: Bytes) -> Bytes {
}
#[cfg(feature = "arrow")]
-fn rewrite_file(bytes: Bytes) -> (Bytes, FileMetaData) {
+fn rewrite_file(bytes: Bytes) -> (Bytes, ParquetMetaData) {
use arrow::array::RecordBatchReader;
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
ArrowWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
@@ -217,6 +219,7 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
+ // FIXME(ets): remove benches of private APIs
c.bench_function("decode thrift file metadata", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&meta_data);
@@ -237,45 +240,42 @@ fn criterion_benchmark(c: &mut Criterion) {
});
// rewrite file with page statistics. then read page headers.
+ // FIXME(ets): remove the page header benches when remodel is complete
#[cfg(feature = "arrow")]
let (file_bytes, metadata) = rewrite_file(data.clone());
#[cfg(feature = "arrow")]
c.bench_function("page headers", |b| {
b.iter(|| {
- metadata.row_groups.iter().for_each(|rg| {
- rg.columns.iter().for_each(|col| {
- if let Some(col_meta) = &col.meta_data {
- if let Some(dict_offset) =
col_meta.dictionary_page_offset {
- parquet::thrift::bench_page_header(
- &file_bytes.slice(dict_offset as usize..),
- );
- }
+ for rg in metadata.row_groups() {
+ for col in rg.columns() {
+ if let Some(dict_offset) = col.dictionary_page_offset() {
parquet::thrift::bench_page_header(
- &file_bytes.slice(col_meta.data_page_offset as
usize..),
+ &file_bytes.slice(dict_offset as usize..),
);
}
- });
- });
+ parquet::thrift::bench_page_header(
+ &file_bytes.slice(col.data_page_offset() as usize..),
+ );
+ }
+ }
})
});
#[cfg(feature = "arrow")]
c.bench_function("page headers (no stats)", |b| {
b.iter(|| {
- metadata.row_groups.iter().for_each(|rg| {
- rg.columns.iter().for_each(|col| {
- if let Some(col_meta) = &col.meta_data {
- if let Some(dict_offset) =
col_meta.dictionary_page_offset {
- parquet::thrift::bench_page_header_no_stats(
- &file_bytes.slice(dict_offset as usize..),
- );
- }
+ for rg in metadata.row_groups() {
+ for col in rg.columns() {
+ if let Some(dict_offset) = col.dictionary_page_offset() {
parquet::thrift::bench_page_header_no_stats(
- &file_bytes.slice(col_meta.data_page_offset as
usize..),
+ &file_bytes.slice(dict_offset as usize..),
);
}
- });
- });
+ parquet::thrift::bench_page_header_no_stats(
+ &file_bytes.slice(col.data_page_offset() as usize..),
+ );
+ }
+ }
})
});
}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 00beaa364f..8d5b3b55c1 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -1185,6 +1185,7 @@ mod tests {
FloatType, Int32Type, Int64Type, Int96, Int96Type,
};
use crate::errors::Result;
+ use crate::file::metadata::ParquetMetaData;
use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
@@ -2913,7 +2914,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
- ) -> Result<crate::format::FileMetaData> {
+ ) -> Result<ParquetMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 4fae03affa..6b4dc87abb 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -43,7 +43,7 @@ use crate::data_type::{ByteArray, FixedLenByteArray};
#[cfg(feature = "encryption")]
use crate::encryption::encrypt::FileEncryptor;
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{KeyValue, RowGroupMetaData};
+use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
@@ -397,13 +397,13 @@ impl<W: Write + Send> ArrowWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
- pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+ pub fn finish(&mut self) -> Result<ParquetMetaData> {
self.flush()?;
self.writer.finish()
}
/// Close and finalize the underlying Parquet writer
- pub fn close(mut self) -> Result<crate::format::FileMetaData> {
+ pub fn close(mut self) -> Result<ParquetMetaData> {
self.finish()
}
@@ -754,7 +754,7 @@ impl ArrowColumnChunk {
/// row_group_writer.close().unwrap();
///
/// let metadata = writer.close().unwrap();
-/// assert_eq!(metadata.num_rows, 3);
+/// assert_eq!(metadata.file_metadata().num_rows(), 3);
/// ```
pub struct ArrowColumnWriter {
writer: ArrowColumnWriterImpl,
@@ -1510,7 +1510,6 @@ mod tests {
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::column::page::{Page, PageReader};
use crate::file::metadata::thrift_gen::PageHeader;
- use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
@@ -2579,12 +2578,12 @@ mod tests {
ArrowWriter::try_new(&mut out, batch.schema(),
None).expect("Unable to write file");
writer.write(&batch).unwrap();
let file_meta_data = writer.close().unwrap();
- for row_group in file_meta_data.row_groups {
- for column in row_group.columns {
- assert!(column.offset_index_offset.is_some());
- assert!(column.offset_index_length.is_some());
- assert!(column.column_index_offset.is_none());
- assert!(column.column_index_length.is_none());
+ for row_group in file_meta_data.row_groups() {
+ for column in row_group.columns() {
+ assert!(column.offset_index_offset().is_some());
+ assert!(column.offset_index_length().is_some());
+ assert!(column.column_index_offset().is_none());
+ assert!(column.column_index_length().is_none());
}
}
}
@@ -3033,14 +3032,18 @@ mod tests {
writer.write(&batch).unwrap();
let file_metadata = writer.close().unwrap();
+ let schema = file_metadata.file_metadata().schema();
// Coerced name of "item" should be "element"
- assert_eq!(file_metadata.schema[3].name, "element");
+ let list_field = &schema.get_fields()[0].get_fields()[0];
+ assert_eq!(list_field.get_fields()[0].name(), "element");
+
+ let map_field = &schema.get_fields()[1].get_fields()[0];
// Coerced name of "entries" should be "key_value"
- assert_eq!(file_metadata.schema[5].name, "key_value");
+ assert_eq!(map_field.name(), "key_value");
// Coerced name of "keys" should be "key"
- assert_eq!(file_metadata.schema[6].name, "key");
+ assert_eq!(map_field.get_fields()[0].name(), "key");
// Coerced name of "values" should be "value"
- assert_eq!(file_metadata.schema[7].name, "value");
+ assert_eq!(map_field.get_fields()[1].name(), "value");
// Double check schema after reading from the file
let reader = SerializedFileReader::new(file).unwrap();
@@ -3984,15 +3987,15 @@ mod tests {
writer.write(&batch).unwrap();
let metadata = writer.close().unwrap();
- assert_eq!(metadata.row_groups.len(), 1);
- let row_group = &metadata.row_groups[0];
- assert_eq!(row_group.columns.len(), 2);
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 2);
// Column "a" has both offset and column index, as requested
- assert!(row_group.columns[0].offset_index_offset.is_some());
- assert!(row_group.columns[0].column_index_offset.is_some());
+ assert!(row_group.column(0).offset_index_offset().is_some());
+ assert!(row_group.column(0).column_index_offset().is_some());
// Column "b" should only have offset index
- assert!(row_group.columns[1].offset_index_offset.is_some());
- assert!(row_group.columns[1].column_index_offset.is_none());
+ assert!(row_group.column(1).offset_index_offset().is_some());
+ assert!(row_group.column(1).column_index_offset().is_none());
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(buf),
options).unwrap();
@@ -4059,15 +4062,15 @@ mod tests {
writer.write(&batch).unwrap();
let metadata = writer.close().unwrap();
- assert_eq!(metadata.row_groups.len(), 1);
- let row_group = &metadata.row_groups[0];
- assert_eq!(row_group.columns.len(), 2);
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 2);
// Column "a" should only have offset index
- assert!(row_group.columns[0].offset_index_offset.is_some());
- assert!(row_group.columns[0].column_index_offset.is_none());
+ assert!(row_group.column(0).offset_index_offset().is_some());
+ assert!(row_group.column(0).column_index_offset().is_none());
// Column "b" should only have offset index
- assert!(row_group.columns[1].offset_index_offset.is_some());
- assert!(row_group.columns[1].column_index_offset.is_none());
+ assert!(row_group.column(1).offset_index_offset().is_some());
+ assert!(row_group.column(1).column_index_offset().is_none());
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(buf),
options).unwrap();
@@ -4331,14 +4334,18 @@ mod tests {
writer.write(&batch).unwrap();
let file_metadata = writer.close().unwrap();
- assert_eq!(file_metadata.row_groups.len(), 1);
- assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
- let chunk_meta = file_metadata.row_groups[0].columns[0]
- .meta_data
- .as_ref()
- .expect("column metadata missing");
- assert!(chunk_meta.encoding_stats.is_some());
- let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
+ assert_eq!(file_metadata.num_row_groups(), 1);
+ assert_eq!(file_metadata.row_group(0).num_columns(), 1);
+ assert!(file_metadata
+ .row_group(0)
+ .column(0)
+ .page_encoding_stats()
+ .is_some());
+ let chunk_page_stats = file_metadata
+ .row_group(0)
+ .column(0)
+ .page_encoding_stats()
+ .unwrap();
// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
@@ -4349,11 +4356,7 @@ mod tests {
let column = rowgroup.metadata().column(0);
assert!(column.page_encoding_stats().is_some());
let file_page_stats = column.page_encoding_stats().unwrap();
- let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
- .iter()
- .map(|x|
crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
- .collect();
- assert_eq!(&chunk_stats, file_page_stats);
+ assert_eq!(chunk_page_stats, file_page_stats);
}
#[test]
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index e61b8f47c3..232333a1b4 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -65,7 +65,7 @@ use crate::{
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{
- metadata::{KeyValue, RowGroupMetaData},
+ metadata::{KeyValue, ParquetMetaData, RowGroupMetaData},
properties::WriterProperties,
},
};
@@ -247,7 +247,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
- pub async fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+ pub async fn finish(&mut self) -> Result<ParquetMetaData> {
let metadata = self.sync_writer.finish()?;
// Force to flush the remaining data.
@@ -260,7 +260,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
- pub async fn close(mut self) -> Result<crate::format::FileMetaData> {
+ pub async fn close(mut self) -> Result<ParquetMetaData> {
self.finish().await
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 3f516462f2..ee400f200e 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -3602,19 +3602,12 @@ mod tests {
col_writer.close().unwrap();
row_group_writer.close().unwrap();
let file_metadata = writer.close().unwrap();
- assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
- let stats = file_metadata.row_groups[0].columns[0]
- .meta_data
- .as_ref()
- .unwrap()
- .statistics
- .as_ref()
- .unwrap();
- assert!(!stats.is_max_value_exact.unwrap());
+ let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
+ assert!(!stats.max_is_exact());
// Truncation of invalid UTF-8 should fall back to binary truncation,
so last byte should
// be incremented by 1.
assert_eq!(
- stats.max_value,
+ stats.max_bytes_opt().map(|v| v.to_vec()),
Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
);
}
diff --git a/parquet/src/encryption/encrypt.rs
b/parquet/src/encryption/encrypt.rs
index 9789302169..1a22abff56 100644
--- a/parquet/src/encryption/encrypt.rs
+++ b/parquet/src/encryption/encrypt.rs
@@ -24,11 +24,9 @@ use crate::errors::{ParquetError, Result};
use crate::file::column_crypto_metadata::{ColumnCryptoMetaData,
EncryptionWithColumnKey};
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use crate::thrift::TSerializable;
use ring::rand::{SecureRandom, SystemRandom};
use std::collections::{HashMap, HashSet};
use std::io::Write;
-use thrift::protocol::TCompactOutputProtocol;
#[derive(Debug, Clone, PartialEq)]
struct EncryptionKey {
@@ -365,18 +363,6 @@ impl FileEncryptor {
}
}
-/// Write an encrypted Thrift serializable object
-pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
- object: &T,
- encryptor: &mut Box<dyn BlockEncryptor>,
- sink: &mut W,
- module_aad: &[u8],
-) -> Result<()> {
- let encrypted_buffer = encrypt_object_to_vec(object, encryptor,
module_aad)?;
- sink.write_all(&encrypted_buffer)?;
- Ok(())
-}
-
/// Write an encrypted Thrift serializable object
pub(crate) fn encrypt_thrift_object<T: WriteThrift, W: Write>(
object: &T,
@@ -389,7 +375,7 @@ pub(crate) fn encrypt_thrift_object<T: WriteThrift, W:
Write>(
Ok(())
}
-pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
+pub(crate) fn write_signed_plaintext_thrift_object<T: WriteThrift, W: Write>(
object: &T,
encryptor: &mut Box<dyn BlockEncryptor>,
sink: &mut W,
@@ -397,8 +383,8 @@ pub(crate) fn write_signed_plaintext_object<T:
TSerializable, W: Write>(
) -> Result<()> {
let mut buffer: Vec<u8> = vec![];
{
- let mut protocol = TCompactOutputProtocol::new(&mut buffer);
- object.write_to_out_protocol(&mut protocol)?;
+ let mut protocol = ThriftCompactOutputProtocol::new(&mut buffer);
+ object.write_thrift(&mut protocol)?;
}
sink.write_all(&buffer)?;
buffer = encryptor.encrypt(buffer.as_ref(), module_aad)?;
@@ -412,21 +398,6 @@ pub(crate) fn write_signed_plaintext_object<T:
TSerializable, W: Write>(
Ok(())
}
-/// Encrypt a Thrift serializable object to a byte vector
-pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
- object: &T,
- encryptor: &mut Box<dyn BlockEncryptor>,
- module_aad: &[u8],
-) -> Result<Vec<u8>> {
- let mut buffer: Vec<u8> = vec![];
- {
- let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut
buffer);
- object.write_to_out_protocol(&mut unencrypted_protocol)?;
- }
-
- encryptor.encrypt(buffer.as_ref(), module_aad)
-}
-
/// Encrypt a Thrift serializable object to a byte vector
pub(crate) fn encrypt_thrift_object_to_vec<T: WriteThrift>(
object: &T,
diff --git a/parquet/src/file/column_crypto_metadata.rs
b/parquet/src/file/column_crypto_metadata.rs
index 6a538bd42b..429e7946dd 100644
--- a/parquet/src/file/column_crypto_metadata.rs
+++ b/parquet/src/file/column_crypto_metadata.rs
@@ -20,6 +20,7 @@
use std::io::Write;
use crate::errors::{ParquetError, Result};
+use crate::file::metadata::HeapSize;
use crate::format::{
ColumnCryptoMetaData as TColumnCryptoMetaData,
EncryptionWithColumnKey as TEncryptionWithColumnKey,
@@ -45,6 +46,12 @@ pub struct EncryptionWithColumnKey {
}
);
+impl HeapSize for EncryptionWithColumnKey {
+ fn heap_size(&self) -> usize {
+ self.path_in_schema.heap_size() + self.key_metadata.heap_size()
+ }
+}
+
thrift_union!(
/// ColumnCryptoMetadata for a column chunk
union ColumnCryptoMetaData {
@@ -53,6 +60,15 @@ union ColumnCryptoMetaData {
}
);
+impl HeapSize for ColumnCryptoMetaData {
+ fn heap_size(&self) -> usize {
+ match self {
+ Self::ENCRYPTION_WITH_FOOTER_KEY => 0,
+ Self::ENCRYPTION_WITH_COLUMN_KEY(path) => path.heap_size(),
+ }
+ }
+}
+
/// Converts Thrift definition into `ColumnCryptoMetadata`.
pub fn try_from_thrift(
thrift_column_crypto_metadata: &TColumnCryptoMetaData,
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 19122a1b55..bfe6b0255c 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -94,6 +94,12 @@ impl HeapSize for RowGroupMetaData {
impl HeapSize for ColumnChunkMetaData {
fn heap_size(&self) -> usize {
+ #[cfg(feature = "encryption")]
+ let encryption_heap_size =
+ self.column_crypto_metadata.heap_size() +
self.encrypted_column_metadata.heap_size();
+ #[cfg(not(feature = "encryption"))]
+ let encryption_heap_size = 0;
+
// don't count column_descr here because it is already counted in
// FileMetaData
self.encodings.heap_size()
@@ -104,6 +110,7 @@ impl HeapSize for ColumnChunkMetaData {
+ self.unencoded_byte_array_data_bytes.heap_size()
+ self.repetition_level_histogram.heap_size()
+ self.definition_level_histogram.heap_size()
+ + encryption_heap_size
}
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 94e289ae81..22c2f8fb44 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -848,6 +848,8 @@ pub struct ColumnChunkMetaData {
definition_level_histogram: Option<LevelHistogram>,
#[cfg(feature = "encryption")]
column_crypto_metadata: Option<ColumnCryptoMetaData>,
+ #[cfg(feature = "encryption")]
+ encrypted_column_metadata: Option<Vec<u8>>,
}
/// Histograms for repetition and definition levels.
@@ -1232,6 +1234,8 @@ impl ColumnChunkMetaData {
definition_level_histogram,
#[cfg(feature = "encryption")]
column_crypto_metadata,
+ #[cfg(feature = "encryption")]
+ encrypted_column_metadata: None,
};
Ok(result)
}
@@ -1370,6 +1374,8 @@ impl ColumnChunkMetaDataBuilder {
definition_level_histogram: None,
#[cfg(feature = "encryption")]
column_crypto_metadata: None,
+ #[cfg(feature = "encryption")]
+ encrypted_column_metadata: None,
})
}
@@ -2067,7 +2073,7 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2280;
#[cfg(feature = "encryption")]
- let base_expected_size = 2616;
+ let base_expected_size = 2712;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -2107,7 +2113,7 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2704;
#[cfg(feature = "encryption")]
- let bigger_expected_size = 3040;
+ let bigger_expected_size = 3136;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/thrift_gen.rs
b/parquet/src/file/metadata/thrift_gen.rs
index 7515a70a63..5665ad2ce9 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -38,7 +38,9 @@ use crate::{
read_thrift_vec, ElementType, FieldType, ReadThrift,
ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
- schema::types::{parquet_schema_from_array, ColumnDescriptor,
SchemaDescriptor},
+ schema::types::{
+ num_nodes, parquet_schema_from_array, ColumnDescriptor,
SchemaDescriptor, TypePtr,
+ },
thrift_struct, thrift_union,
util::bit_util::FromBytes,
};
@@ -68,12 +70,12 @@ pub(crate) struct SchemaElement<'a> {
);
thrift_struct!(
-pub(crate) struct AesGcmV1<'a> {
+pub(crate) struct AesGcmV1 {
/// AAD prefix
- 1: optional binary<'a> aad_prefix
+ 1: optional binary aad_prefix
/// Unique file identifier part of AAD suffix
- 2: optional binary<'a> aad_file_unique
+ 2: optional binary aad_file_unique
/// In files encrypted with AAD prefix without storing it,
/// readers must supply the prefix
@@ -82,12 +84,12 @@ pub(crate) struct AesGcmV1<'a> {
);
thrift_struct!(
-pub(crate) struct AesGcmCtrV1<'a> {
+pub(crate) struct AesGcmCtrV1 {
/// AAD prefix
- 1: optional binary<'a> aad_prefix
+ 1: optional binary aad_prefix
/// Unique file identifier part of AAD suffix
- 2: optional binary<'a> aad_file_unique
+ 2: optional binary aad_file_unique
/// In files encrypted with AAD prefix without storing it,
/// readers must supply the prefix
@@ -96,24 +98,24 @@ pub(crate) struct AesGcmCtrV1<'a> {
);
thrift_union!(
-union EncryptionAlgorithm<'a> {
- 1: (AesGcmV1<'a>) AES_GCM_V1
- 2: (AesGcmCtrV1<'a>) AES_GCM_CTR_V1
+union EncryptionAlgorithm {
+ 1: (AesGcmV1) AES_GCM_V1
+ 2: (AesGcmCtrV1) AES_GCM_CTR_V1
}
);
#[cfg(feature = "encryption")]
thrift_struct!(
/// Crypto metadata for files with encrypted footer
-pub(crate) struct FileCryptoMetaData<'a> {
+pub(crate) struct FileCryptoMetaData {
/// Encryption algorithm. This field is only used for files
/// with encrypted footer. Files with plaintext footer store algorithm id
/// inside footer (FileMetaData structure).
- 1: required EncryptionAlgorithm<'a> encryption_algorithm
+ 1: required EncryptionAlgorithm encryption_algorithm
/** Retrieval metadata of key used for encryption of footer,
* and (possibly) columns **/
- 2: optional binary<'a> key_metadata
+ 2: optional binary key_metadata
}
);
@@ -135,8 +137,8 @@ struct FileMetaData<'a> {
5: optional list<KeyValue> key_value_metadata
6: optional string created_by
7: optional list<ColumnOrder> column_orders;
- 8: optional EncryptionAlgorithm<'a> encryption_algorithm
- 9: optional binary<'a> footer_signing_key_metadata
+ 8: optional EncryptionAlgorithm encryption_algorithm
+ 9: optional binary footer_signing_key_metadata
}
);
@@ -337,8 +339,6 @@ fn convert_column(
let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
- // FIXME: need column crypto
-
let result = ColumnChunkMetaData {
column_descr,
encodings,
@@ -364,6 +364,8 @@ fn convert_column(
definition_level_histogram,
#[cfg(feature = "encryption")]
column_crypto_metadata: column.crypto_metadata,
+ #[cfg(feature = "encryption")]
+ encrypted_column_metadata: None,
};
Ok(result)
}
@@ -632,7 +634,7 @@ pub(crate) fn parquet_metadata_with_encryption(
}
let decryptor = get_file_decryptor(
t_file_crypto_metadata.encryption_algorithm,
- t_file_crypto_metadata.key_metadata,
+ t_file_crypto_metadata.key_metadata.as_ref(),
file_decryption_properties,
)?;
let footer_decryptor = decryptor.get_footer_decryptor();
@@ -672,7 +674,7 @@ pub(crate) fn parquet_metadata_with_encryption(
// File has a plaintext footer but encryption algorithm is set
let file_decryptor_value = get_file_decryptor(
algo,
- file_meta.footer_signing_key_metadata,
+ file_meta.footer_signing_key_metadata.as_ref(),
file_decryption_properties,
)?;
if file_decryption_properties.check_plaintext_footer_integrity() &&
!encrypted_footer {
@@ -733,7 +735,7 @@ pub(crate) fn parquet_metadata_with_encryption(
#[cfg(feature = "encryption")]
pub(super) fn get_file_decryptor(
encryption_algorithm: EncryptionAlgorithm,
- footer_key_metadata: Option<&[u8]>,
+ footer_key_metadata: Option<&Vec<u8>>,
file_decryption_properties: &FileDecryptionProperties,
) -> Result<FileDecryptor> {
match encryption_algorithm {
@@ -750,7 +752,7 @@ pub(super) fn get_file_decryptor(
FileDecryptor::new(
file_decryption_properties,
- footer_key_metadata,
+ footer_key_metadata.map(|v| v.as_slice()),
aad_file_unique,
aad_prefix,
)
@@ -1158,6 +1160,335 @@ impl PageHeader {
}
}
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+// 1: required Type type
+// 2: required list<Encoding> encodings
+// 3: required list<string> path_in_schema
+// 4: required CompressionCodec codec
+// 5: required i64 num_values
+// 6: required i64 total_uncompressed_size
+// 7: required i64 total_compressed_size
+// 8: optional list<KeyValue> key_value_metadata
+// 9: required i64 data_page_offset
+// 10: optional i64 index_page_offset
+// 11: optional i64 dictionary_page_offset
+// 12: optional Statistics statistics;
+// 13: optional list<PageEncodingStats> encoding_stats;
+// 14: optional i64 bloom_filter_offset;
+// 15: optional i32 bloom_filter_length;
+// 16: optional SizeStatistics size_statistics;
+// 17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+ column_chunk: &ColumnChunkMetaData,
+ w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+ use crate::file::statistics::page_stats_to_thrift;
+
+ column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+ column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+ let path = column_chunk.column_descr.path().parts();
+ let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+ path.write_thrift_field(w, 3, 2)?;
+ column_chunk.compression.write_thrift_field(w, 4, 3)?;
+ column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+ column_chunk
+ .total_uncompressed_size
+ .write_thrift_field(w, 6, 5)?;
+ column_chunk
+ .total_compressed_size
+ .write_thrift_field(w, 7, 6)?;
+ // no key_value_metadata here
+ let mut last_field_id =
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+ if let Some(index_page_offset) = column_chunk.index_page_offset {
+ last_field_id = index_page_offset.write_thrift_field(w, 10,
last_field_id)?;
+ }
+ if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+ last_field_id = dictionary_page_offset.write_thrift_field(w, 11,
last_field_id)?;
+ }
+ // PageStatistics is the same as thrift Statistics, but writable
+ let stats = page_stats_to_thrift(column_chunk.statistics());
+ if let Some(stats) = stats {
+ last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+ }
+ if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+ last_field_id = page_encoding_stats.write_thrift_field(w, 13,
last_field_id)?;
+ }
+ if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+ last_field_id = bloom_filter_offset.write_thrift_field(w, 14,
last_field_id)?;
+ }
+ if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+ last_field_id = bloom_filter_length.write_thrift_field(w, 15,
last_field_id)?;
+ }
+
+ // SizeStatistics
+ let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+ || column_chunk.repetition_level_histogram.is_some()
+ || column_chunk.definition_level_histogram.is_some()
+ {
+ let repetition_level_histogram = column_chunk
+ .repetition_level_histogram()
+ .map(|hist| hist.clone().into_inner());
+
+ let definition_level_histogram = column_chunk
+ .definition_level_histogram()
+ .map(|hist| hist.clone().into_inner());
+
+ Some(SizeStatistics {
+ unencoded_byte_array_data_bytes:
column_chunk.unencoded_byte_array_data_bytes,
+ repetition_level_histogram,
+ definition_level_histogram,
+ })
+ } else {
+ None
+ };
+ if let Some(size_stats) = size_stats {
+ size_stats.write_thrift_field(w, 16, last_field_id)?;
+ }
+
+ // TODO: field 17 geo spatial stats here
+ w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+ pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+ pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+ pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+ pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
+ self.file_metadata
+ .version
+ .write_thrift_field(writer, 1, 0)?;
+
+ // field 2 is schema. do depth-first traversal of tree, converting to
SchemaElement and
+ // writing along the way.
+ let root = self.file_metadata.schema_descr().root_schema_ptr();
+ let schema_len = num_nodes(&root);
+ writer.write_field_begin(FieldType::List, 2, 1)?;
+ writer.write_list_begin(ElementType::Struct, schema_len)?;
+ // recursively write Type nodes as SchemaElements
+ write_schema(&root, writer)?;
+
+ self.file_metadata
+ .num_rows
+ .write_thrift_field(writer, 3, 2)?;
+
+ // this will call RowGroupMetaData::write_thrift
+ let mut last_field_id = self.row_groups.write_thrift_field(writer, 4,
3)?;
+
+ if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+ last_field_id = kv_metadata.write_thrift_field(writer, 5,
last_field_id)?;
+ }
+ if let Some(created_by) = self.file_metadata.created_by() {
+ last_field_id = created_by.write_thrift_field(writer, 6,
last_field_id)?;
+ }
+ if let Some(column_orders) = self.file_metadata.column_orders() {
+ last_field_id = column_orders.write_thrift_field(writer, 7,
last_field_id)?;
+ }
+ if let Some(algo) = self.encryption_algorithm.as_ref() {
+ last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+ }
+ if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+ key.as_slice()
+ .write_thrift_field(writer, 9, last_field_id)?;
+ }
+
+ writer.write_struct_end()
+ }
+}
+
+fn write_schema<W: Write>(
+ node: &TypePtr,
+ writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+ match node.as_ref() {
+ crate::schema::types::Type::PrimitiveType {
+ basic_info,
+ physical_type,
+ type_length,
+ scale,
+ precision,
+ } => {
+ let element = SchemaElement {
+ type_: Some(*physical_type),
+ type_length: if *type_length >= 0 {
+ Some(*type_length)
+ } else {
+ None
+ },
+ repetition_type: Some(basic_info.repetition()),
+ name: basic_info.name(),
+ num_children: None,
+ converted_type: match basic_info.converted_type() {
+ ConvertedType::NONE => None,
+ other => Some(other),
+ },
+ scale: if *scale >= 0 { Some(*scale) } else { None },
+ precision: if *precision >= 0 {
+ Some(*precision)
+ } else {
+ None
+ },
+ field_id: if basic_info.has_id() {
+ Some(basic_info.id())
+ } else {
+ None
+ },
+ logical_type: basic_info.logical_type(),
+ };
+ element.write_thrift(writer)
+ }
+ crate::schema::types::Type::GroupType { basic_info, fields } => {
+ let repetition = if basic_info.has_repetition() {
+ Some(basic_info.repetition())
+ } else {
+ None
+ };
+
+ let element = SchemaElement {
+ type_: None,
+ type_length: None,
+ repetition_type: repetition,
+ name: basic_info.name(),
+ num_children: Some(fields.len().try_into()?),
+ converted_type: match basic_info.converted_type() {
+ ConvertedType::NONE => None,
+ other => Some(other),
+ },
+ scale: None,
+ precision: None,
+ field_id: if basic_info.has_id() {
+ Some(basic_info.id())
+ } else {
+ None
+ },
+ logical_type: basic_info.logical_type(),
+ };
+
+ element.write_thrift(writer)?;
+
+ // Add child elements for a group
+ for field in fields {
+ write_schema(field, writer)?;
+ }
+ Ok(())
+ }
+ }
+}
+
+// struct RowGroup {
+// 1: required list<ColumnChunk> columns
+// 2: required i64 total_byte_size
+// 3: required i64 num_rows
+// 4: optional list<SortingColumn> sorting_columns
+// 5: optional i64 file_offset
+// 6: optional i64 total_compressed_size
+// 7: optional i16 ordinal
+// }
+impl WriteThrift for RowGroupMetaData {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
+ // this will call ColumnChunkMetaData::write_thrift
+ self.columns.write_thrift_field(writer, 1, 0)?;
+ self.total_byte_size.write_thrift_field(writer, 2, 1)?;
+ let mut last_field_id = self.num_rows.write_thrift_field(writer, 3,
2)?;
+ if let Some(sorting_columns) = self.sorting_columns() {
+ last_field_id = sorting_columns.write_thrift_field(writer, 4,
last_field_id)?;
+ }
+ if let Some(file_offset) = self.file_offset() {
+ last_field_id = file_offset.write_thrift_field(writer, 5,
last_field_id)?;
+ }
+ // this is optional, but we'll always write it
+ last_field_id = self
+ .compressed_size()
+ .write_thrift_field(writer, 6, last_field_id)?;
+ if let Some(ordinal) = self.ordinal() {
+ ordinal.write_thrift_field(writer, 7, last_field_id)?;
+ }
+ writer.write_struct_end()
+ }
+}
+
+// struct ColumnChunk {
+// 1: optional string file_path
+// 2: required i64 file_offset = 0
+// 3: optional ColumnMetaData meta_data
+// 4: optional i64 offset_index_offset
+// 5: optional i32 offset_index_length
+// 6: optional i64 column_index_offset
+// 7: optional i32 column_index_length
+// 8: optional ColumnCryptoMetaData crypto_metadata
+// 9: optional binary encrypted_column_metadata
+// }
+impl WriteThrift for ColumnChunkMetaData {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ #[allow(unused_assignments)]
+ fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
+ let mut last_field_id = 0i16;
+ if let Some(file_path) = self.file_path() {
+ last_field_id = file_path.write_thrift_field(writer, 1,
last_field_id)?;
+ }
+ last_field_id = self
+ .file_offset()
+ .write_thrift_field(writer, 2, last_field_id)?;
+
+ #[cfg(feature = "encryption")]
+ {
+ // only write the ColumnMetaData if we haven't already encrypted it
+ if self.encrypted_column_metadata.is_none() {
+ writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
+ serialize_column_meta_data(self, writer)?;
+ last_field_id = 3;
+ }
+ }
+ #[cfg(not(feature = "encryption"))]
+ {
+ // always write the ColumnMetaData
+ writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
+ serialize_column_meta_data(self, writer)?;
+ last_field_id = 3;
+ }
+
+ if let Some(offset_idx_off) = self.offset_index_offset() {
+ last_field_id = offset_idx_off.write_thrift_field(writer, 4,
last_field_id)?;
+ }
+ if let Some(offset_idx_len) = self.offset_index_length() {
+ last_field_id = offset_idx_len.write_thrift_field(writer, 5,
last_field_id)?;
+ }
+ if let Some(column_idx_off) = self.column_index_offset() {
+ last_field_id = column_idx_off.write_thrift_field(writer, 6,
last_field_id)?;
+ }
+ if let Some(column_idx_len) = self.column_index_length() {
+ last_field_id = column_idx_len.write_thrift_field(writer, 7,
last_field_id)?;
+ }
+ #[cfg(feature = "encryption")]
+ {
+ if let Some(crypto_metadata) = self.crypto_metadata() {
+ last_field_id = crypto_metadata.write_thrift_field(writer, 8,
last_field_id)?;
+ }
+ if let Some(encrypted_meta) =
self.encrypted_column_metadata.as_ref() {
+ encrypted_meta
+ .as_slice()
+ .write_thrift_field(writer, 9, last_field_id)?;
+ }
+ }
+
+ writer.write_struct_end()
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::file::metadata::thrift_gen::BoundingBox;
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index a09a703ade..6396e454fb 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -15,22 +15,26 @@
// specific language governing permissions and limitations
// under the License.
+use crate::file::metadata::thrift_gen::{EncryptionAlgorithm, FileMeta};
+use crate::file::metadata::{
+ ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex,
RowGroupMetaData,
+};
+use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
+use crate::{
+ basic::ColumnOrder,
+ file::metadata::{FileMetaData, ParquetMetaDataBuilder},
+};
#[cfg(feature = "encryption")]
-use crate::encryption::{
- encrypt::{
- encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object,
FileEncryptor,
+use crate::{
+ encryption::{
+ encrypt::{encrypt_thrift_object, write_signed_plaintext_thrift_object,
FileEncryptor},
+ modules::{create_footer_aad, create_module_aad, ModuleType},
},
- modules::{create_footer_aad, create_module_aad, ModuleType},
+ file::column_crypto_metadata::ColumnCryptoMetaData,
+ file::metadata::thrift_gen::{AesGcmV1, FileCryptoMetaData},
};
-#[cfg(feature = "encryption")]
-use crate::errors::ParquetError;
-use crate::format::EncryptionAlgorithm;
-#[cfg(feature = "encryption")]
-use crate::format::{AesGcmV1, ColumnCryptoMetaData};
-use crate::schema::types;
-use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
-use crate::thrift::TSerializable;
use crate::{errors::Result,
file::page_index::column_index::ColumnIndexMetaData};
+
use crate::{
file::writer::{get_file_magic, TrackedWrite},
parquet_thrift::WriteThrift,
@@ -44,18 +48,16 @@ use crate::{
};
use std::io::Write;
use std::sync::Arc;
-use thrift::protocol::TCompactOutputProtocol;
/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
///
/// See [`ParquetMetaDataWriter`] for background and example.
pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
buf: &'a mut TrackedWrite<W>,
- schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
- row_groups: Vec<crate::format::RowGroup>,
- column_indexes: Option<&'a [Vec<Option<ColumnIndexMetaData>>]>,
- offset_indexes: Option<&'a [Vec<Option<OffsetIndexMetaData>>]>,
+ row_groups: Vec<RowGroupMetaData>,
+ column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
+ offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
key_value_metadata: Option<Vec<KeyValue>>,
created_by: Option<String>,
object_writer: MetadataObjectWriter,
@@ -130,14 +132,17 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
}
/// Assembles and writes the final metadata to self.buf
- pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
+ pub fn finish(mut self) -> Result<ParquetMetaData> {
let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
+ let column_indexes = std::mem::take(&mut self.column_indexes);
+ let offset_indexes = std::mem::take(&mut self.offset_indexes);
+
// Write column indexes and offset indexes
- if let Some(column_indexes) = self.column_indexes {
+ if let Some(column_indexes) = column_indexes.as_ref() {
self.write_column_indexes(column_indexes)?;
}
- if let Some(offset_indexes) = self.offset_indexes {
+ if let Some(offset_indexes) = offset_indexes.as_ref() {
self.write_offset_indexes(offset_indexes)?;
}
@@ -146,32 +151,44 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
// for all leaf nodes.
// Even if the column has an undefined sort order, such as INTERVAL,
this
// is still technically the defined TYPEORDER so it should still be
set.
- let column_orders = (0..self.schema_descr.num_columns())
- .map(|_|
crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
+ let column_orders = self
+ .schema_descr
+ .columns()
+ .iter()
+ .map(|col| {
+ let sort_order = ColumnOrder::get_sort_order(
+ col.logical_type(),
+ col.converted_type(),
+ col.physical_type(),
+ );
+ ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
+ })
.collect();
+
// This field is optional, perhaps in cases where no min/max fields
are set
// in any Statistics or ColumnIndex object in the whole file.
// But for simplicity we always set this field.
let column_orders = Some(column_orders);
+
let (row_groups, unencrypted_row_groups) = self
.object_writer
.apply_row_group_encryption(self.row_groups)?;
let (encryption_algorithm, footer_signing_key_metadata) =
self.object_writer.get_plaintext_footer_crypto_metadata();
- let key_value_metadata = self.key_value_metadata.map(|vkv| {
- vkv.into_iter()
- .map(|kv| crate::format::KeyValue::new(kv.key, kv.value))
- .collect::<Vec<crate::format::KeyValue>>()
- });
- let mut file_metadata = crate::format::FileMetaData {
+
+ let file_metadata = FileMetaData::new(
+ self.writer_version,
num_rows,
- row_groups,
- key_value_metadata,
- version: self.writer_version,
- schema: types::to_thrift(self.schema.as_ref())?,
- created_by: self.created_by.clone(),
+ self.created_by,
+ self.key_value_metadata,
+ self.schema_descr.clone(),
column_orders,
+ );
+
+ let file_meta = FileMeta {
+ file_metadata: &file_metadata,
+ row_groups: &row_groups,
encryption_algorithm,
footer_signing_key_metadata,
};
@@ -179,7 +196,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
// Write file metadata
let start_pos = self.buf.bytes_written();
self.object_writer
- .write_file_metadata(&file_metadata, &mut self.buf)?;
+ .write_file_metadata(&file_meta, &mut self.buf)?;
let end_pos = self.buf.bytes_written();
// Write footer
@@ -188,28 +205,49 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
self.buf.write_all(&metadata_len.to_le_bytes())?;
self.buf.write_all(self.object_writer.get_file_magic())?;
- if let Some(row_groups) = unencrypted_row_groups {
- // If row group metadata was encrypted, we replace the encrypted
row groups with
- // unencrypted metadata before it is returned to users. This
allows the metadata
- // to be usable for retrieving the row group statistics for
example, without users
- // needing to decrypt the metadata.
- file_metadata.row_groups = row_groups;
- }
+ // If row group metadata was encrypted, we replace the encrypted row
groups with
+ // unencrypted metadata before it is returned to users. This allows
the metadata
+ // to be usable for retrieving the row group statistics for example,
without users
+ // needing to decrypt the metadata.
+ let mut builder = ParquetMetaDataBuilder::new(file_metadata);
+
+ builder = match unencrypted_row_groups {
+ Some(rg) => builder.set_row_groups(rg),
+ None => builder.set_row_groups(row_groups),
+ };
+
+ let column_indexes: Option<ParquetColumnIndex> =
column_indexes.map(|ovvi| {
+ ovvi.into_iter()
+ .map(|vi| {
+ vi.into_iter()
+ .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
+ .collect()
+ })
+ .collect()
+ });
+
+ // FIXME(ets): this will panic if there's a missing index.
+ let offset_indexes: Option<ParquetOffsetIndex> =
offset_indexes.map(|ovvi| {
+ ovvi.into_iter()
+ .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
+ .collect()
+ });
- Ok(file_metadata)
+ builder = builder.set_column_index(column_indexes);
+ builder = builder.set_offset_index(offset_indexes);
+
+ Ok(builder.build())
}
pub fn new(
buf: &'a mut TrackedWrite<W>,
- schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
- row_groups: Vec<crate::format::RowGroup>,
+ row_groups: Vec<RowGroupMetaData>,
created_by: Option<String>,
writer_version: i32,
) -> Self {
Self {
buf,
- schema,
schema_descr,
row_groups,
column_indexes: None,
@@ -223,7 +261,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub fn with_column_indexes(
mut self,
- column_indexes: &'a [Vec<Option<ColumnIndexMetaData>>],
+ column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
) -> Self {
self.column_indexes = Some(column_indexes);
self
@@ -231,7 +269,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub fn with_offset_indexes(
mut self,
- offset_indexes: &'a [Vec<Option<OffsetIndexMetaData>>],
+ offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
) -> Self {
self.offset_indexes = Some(offset_indexes);
self
@@ -361,12 +399,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
let created_by = file_metadata.created_by().map(str::to_string);
- let row_groups = self
- .metadata
- .row_groups()
- .iter()
- .map(|rg| rg.to_thrift())
- .collect::<Vec<_>>();
+ let row_groups = self.metadata.row_groups.clone();
let key_value_metadata = file_metadata.key_value_metadata().cloned();
@@ -375,14 +408,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
- &schema,
&schema_descr,
row_groups,
created_by,
file_metadata.version(),
);
- encoder = encoder.with_column_indexes(&column_indexes);
- encoder = encoder.with_offset_indexes(&offset_indexes);
+
+ if let Some(column_indexes) = column_indexes {
+ encoder = encoder.with_column_indexes(column_indexes);
+ }
+
+ if let Some(offset_indexes) = offset_indexes {
+ encoder = encoder.with_offset_indexes(offset_indexes);
+ }
+
if let Some(key_value_metadata) = key_value_metadata {
encoder = encoder.with_key_value_metadata(key_value_metadata);
}
@@ -391,46 +430,38 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
Ok(())
}
- fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndexMetaData>>> {
- if let Some(row_group_column_indexes) = self.metadata.column_index() {
- (0..self.metadata.row_groups().len())
- .map(|rg_idx| {
- let column_indexes = &row_group_column_indexes[rg_idx];
- column_indexes
- .iter()
- .map(|column_index| Some(column_index.clone()))
- .collect()
- })
- .collect()
- } else {
- // make a None for each row group, for each column
- self.metadata
- .row_groups()
- .iter()
- .map(|rg| std::iter::repeat_n(None,
rg.columns().len()).collect())
- .collect()
- }
+ fn convert_column_indexes(&self) ->
Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
+ // FIXME(ets): we're converting from ParquetColumnIndex to
vec<vec<option>>,
+ // but then converting back to ParquetColumnIndex in the end. need to
unify this.
+ self.metadata
+ .column_index()
+ .map(|row_group_column_indexes| {
+ (0..self.metadata.row_groups().len())
+ .map(|rg_idx| {
+ let column_indexes = &row_group_column_indexes[rg_idx];
+ column_indexes
+ .iter()
+ .map(|column_index| Some(column_index.clone()))
+ .collect()
+ })
+ .collect()
+ })
}
- fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndexMetaData>>> {
- if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
- (0..self.metadata.row_groups().len())
- .map(|rg_idx| {
- let offset_indexes = &row_group_offset_indexes[rg_idx];
- offset_indexes
- .iter()
- .map(|offset_index| Some(offset_index.clone()))
- .collect()
- })
- .collect()
- } else {
- // make a None for each row group, for each column
- self.metadata
- .row_groups()
- .iter()
- .map(|rg| std::iter::repeat_n(None,
rg.columns().len()).collect())
- .collect()
- }
+ fn convert_offset_index(&self) ->
Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
+ self.metadata
+ .offset_index()
+ .map(|row_group_offset_indexes| {
+ (0..self.metadata.row_groups().len())
+ .map(|rg_idx| {
+ let offset_indexes = &row_group_offset_indexes[rg_idx];
+ offset_indexes
+ .iter()
+ .map(|offset_index| Some(offset_index.clone()))
+ .collect()
+ })
+ .collect()
+ })
}
}
@@ -441,13 +472,6 @@ struct MetadataObjectWriter {
}
impl MetadataObjectWriter {
- #[inline]
- fn write_object(object: &impl TSerializable, sink: impl Write) ->
Result<()> {
- let mut protocol = TCompactOutputProtocol::new(sink);
- object.write_to_out_protocol(&mut protocol)?;
- Ok(())
- }
-
#[inline]
fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) ->
Result<()> {
let mut protocol = ThriftCompactOutputProtocol::new(sink);
@@ -460,19 +484,15 @@ impl MetadataObjectWriter {
#[cfg(not(feature = "encryption"))]
impl MetadataObjectWriter {
/// Write [`FileMetaData`] in Thrift format
- fn write_file_metadata(
- &self,
- file_metadata: &crate::format::FileMetaData,
- sink: impl Write,
- ) -> Result<()> {
- Self::write_object(file_metadata, sink)
+ fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write)
-> Result<()> {
+ Self::write_thrift_object(file_metadata, sink)
}
/// Write a column [`OffsetIndex`] in Thrift format
fn write_offset_index(
&self,
offset_index: &OffsetIndexMetaData,
- _column_chunk: &crate::format::ColumnChunk,
+ _column_chunk: &ColumnChunkMetaData,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
@@ -484,7 +504,7 @@ impl MetadataObjectWriter {
fn write_column_index(
&self,
column_index: &ColumnIndexMetaData,
- _column_chunk: &crate::format::ColumnChunk,
+ _column_chunk: &ColumnChunkMetaData,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
@@ -495,11 +515,8 @@ impl MetadataObjectWriter {
/// No-op implementation of row-group metadata encryption
fn apply_row_group_encryption(
&self,
- row_groups: Vec<crate::format::RowGroup>,
- ) -> Result<(
- Vec<crate::format::RowGroup>,
- Option<Vec<crate::format::RowGroup>>,
- )> {
+ row_groups: Vec<RowGroupMetaData>,
+ ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
Ok((row_groups, None))
}
@@ -527,29 +544,25 @@ impl MetadataObjectWriter {
/// Write [`FileMetaData`] in Thrift format, possibly encrypting it if
required
///
/// [`FileMetaData`]: crate::format::FileMetaData
- fn write_file_metadata(
- &self,
- file_metadata: &crate::format::FileMetaData,
- mut sink: impl Write,
- ) -> Result<()> {
+ fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl
Write) -> Result<()> {
match self.file_encryptor.as_ref() {
Some(file_encryptor) if
file_encryptor.properties().encrypt_footer() => {
// First write FileCryptoMetadata
let crypto_metadata =
Self::file_crypto_metadata(file_encryptor)?;
- let mut protocol = TCompactOutputProtocol::new(&mut sink);
- crypto_metadata.write_to_out_protocol(&mut protocol)?;
+ let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
+ crypto_metadata.write_thrift(&mut protocol)?;
// Then write encrypted footer
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
- encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
+ encrypt_thrift_object(file_metadata, &mut encryptor, &mut
sink, &aad)
}
Some(file_encryptor) if
file_metadata.encryption_algorithm.is_some() => {
let aad = create_footer_aad(file_encryptor.file_aad())?;
let mut encryptor = file_encryptor.get_footer_encryptor()?;
- write_signed_plaintext_object(file_metadata, &mut encryptor,
&mut sink, &aad)
+ write_signed_plaintext_thrift_object(file_metadata, &mut
encryptor, &mut sink, &aad)
}
- _ => Self::write_object(file_metadata, &mut sink),
+ _ => Self::write_thrift_object(file_metadata, &mut sink),
}
}
@@ -559,7 +572,7 @@ impl MetadataObjectWriter {
fn write_offset_index(
&self,
offset_index: &OffsetIndexMetaData,
- column_chunk: &crate::format::ColumnChunk,
+ column_chunk: &ColumnChunkMetaData,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
@@ -584,7 +597,7 @@ impl MetadataObjectWriter {
fn write_column_index(
&self,
column_index: &ColumnIndexMetaData,
- column_chunk: &crate::format::ColumnChunk,
+ column_chunk: &ColumnChunkMetaData,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
@@ -608,11 +621,8 @@ impl MetadataObjectWriter {
/// and possibly unencrypted metadata to be returned to clients if data
was encrypted.
fn apply_row_group_encryption(
&self,
- row_groups: Vec<crate::format::RowGroup>,
- ) -> Result<(
- Vec<crate::format::RowGroup>,
- Option<Vec<crate::format::RowGroup>>,
- )> {
+ row_groups: Vec<RowGroupMetaData>,
+ ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
match &self.file_encryptor {
Some(file_encryptor) => {
let unencrypted_row_groups = row_groups.clone();
@@ -636,21 +646,12 @@ impl MetadataObjectWriter {
object: &impl WriteThrift,
mut sink: impl Write,
file_encryptor: &FileEncryptor,
- column_metadata: &crate::format::ColumnChunk,
+ column_metadata: &ColumnChunkMetaData,
module_type: ModuleType,
row_group_index: usize,
column_index: usize,
) -> Result<()> {
- let column_path_vec = &column_metadata
- .meta_data
- .as_ref()
- .ok_or_else(|| {
- general_err!(
- "Column metadata not set for column {} when encrypting
object",
- column_index
- )
- })?
- .path_in_schema;
+ let column_path_vec = column_metadata.column_path().as_ref();
let joined_column_path;
let column_path = if column_path_vec.len() == 1 {
@@ -699,36 +700,34 @@ impl MetadataObjectWriter {
.aad_prefix()
.map(|_| !file_encryptor.properties().store_aad_prefix());
let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
- file_encryptor.properties().aad_prefix().cloned()
+ file_encryptor.properties().aad_prefix()
} else {
None
};
- EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
- aad_prefix,
+ EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
+ aad_prefix: aad_prefix.cloned(),
aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
supply_aad_prefix,
})
}
- fn file_crypto_metadata(
- file_encryptor: &FileEncryptor,
- ) -> Result<crate::format::FileCryptoMetaData> {
+ fn file_crypto_metadata(file_encryptor: &FileEncryptor) ->
Result<FileCryptoMetaData> {
let properties = file_encryptor.properties();
- Ok(crate::format::FileCryptoMetaData {
+ Ok(FileCryptoMetaData {
encryption_algorithm:
Self::encryption_algorithm_from_encryptor(file_encryptor),
key_metadata: properties.footer_key_metadata().cloned(),
})
}
fn encrypt_row_groups(
- row_groups: Vec<crate::format::RowGroup>,
+ row_groups: Vec<RowGroupMetaData>,
file_encryptor: &Arc<FileEncryptor>,
- ) -> Result<Vec<crate::format::RowGroup>> {
+ ) -> Result<Vec<RowGroupMetaData>> {
row_groups
.into_iter()
.enumerate()
.map(|(rg_idx, mut rg)| {
- let cols: Result<Vec<crate::format::ColumnChunk>> = rg
+ let cols: Result<Vec<ColumnChunkMetaData>> = rg
.columns
.into_iter()
.enumerate()
@@ -744,26 +743,24 @@ impl MetadataObjectWriter {
/// Apply column encryption to column chunk metadata
fn encrypt_column_chunk(
- mut column_chunk: crate::format::ColumnChunk,
+ mut column_chunk: ColumnChunkMetaData,
file_encryptor: &Arc<FileEncryptor>,
row_group_index: usize,
column_index: usize,
- ) -> Result<crate::format::ColumnChunk> {
+ ) -> Result<ColumnChunkMetaData> {
// Column crypto metadata should have already been set when the column
was created.
// Here we apply the encryption by encrypting the column metadata if
required.
- match column_chunk.crypto_metadata.as_ref() {
+ match column_chunk.column_crypto_metadata.as_ref() {
None => {}
- Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
+ Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
// When uniform encryption is used the footer is already
encrypted,
// so the column chunk does not need additional encryption.
}
- Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
+ Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) =>
{
+ use
crate::file::metadata::thrift_gen::serialize_column_meta_data;
+
let column_path = col_key.path_in_schema.join(".");
let mut column_encryptor =
file_encryptor.get_column_encryptor(&column_path)?;
- let meta_data = column_chunk
- .meta_data
- .take()
- .ok_or_else(|| general_err!("Column metadata not set for
encryption"))?;
let aad = create_module_aad(
file_encryptor.file_aad(),
ModuleType::ColumnMetaData,
@@ -771,10 +768,15 @@ impl MetadataObjectWriter {
column_index,
None,
)?;
- let ciphertext = encrypt_object_to_vec(&meta_data, &mut
column_encryptor, &aad)?;
+ // create temp ColumnMetaData that we can encrypt
+ let mut buffer: Vec<u8> = vec![];
+ {
+ let mut prot = ThriftCompactOutputProtocol::new(&mut
buffer);
+ serialize_column_meta_data(&column_chunk, &mut prot)?;
+ }
+ let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
column_chunk.encrypted_column_metadata = Some(ciphertext);
- debug_assert!(column_chunk.meta_data.is_none());
}
}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 1442f0f67c..b0d64ea760 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -2546,8 +2546,8 @@ mod tests {
}
let file_metadata = file_writer.close().unwrap();
- assert_eq!(file_metadata.num_rows, 25);
- assert_eq!(file_metadata.row_groups.len(), 5);
+ assert_eq!(file_metadata.file_metadata().num_rows(), 25);
+ assert_eq!(file_metadata.num_row_groups(), 5);
// read only the 3rd row group
let read_options = ReadOptionsBuilder::new()
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index d6f742c137..1ce7ad2912 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -155,7 +155,6 @@ pub type OnCloseRowGroup<'a, W> = Box<
/// - After all row groups have been written, close the file writer using
`close` method.
pub struct SerializedFileWriter<W: Write> {
buf: TrackedWrite<W>,
- schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaData>,
@@ -195,7 +194,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Self::start_file(&properties, &mut buf)?;
Ok(Self {
buf,
- schema,
descr: Arc::new(schema_descriptor),
props: properties,
row_groups: vec![],
@@ -298,7 +296,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
- pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+ pub fn finish(&mut self) -> Result<ParquetMetaData> {
self.assert_previous_writer_closed()?;
let metadata = self.write_metadata()?;
self.buf.flush()?;
@@ -306,7 +304,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}
/// Closes and finalises file writer, returning the file metadata.
- pub fn close(mut self) -> Result<crate::format::FileMetaData> {
+ pub fn close(mut self) -> Result<ParquetMetaData> {
self.finish()
}
@@ -326,8 +324,9 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}
- /// Assembles and writes metadata at the end of the file.
- fn write_metadata(&mut self) -> Result<crate::format::FileMetaData> {
+ /// Assembles and writes metadata at the end of the file. This will take
ownership
+ /// of `row_groups` and the page index structures.
+ fn write_metadata(&mut self) -> Result<ParquetMetaData> {
self.finished = true;
// write out any remaining bloom filters after all row groups
@@ -341,15 +340,13 @@ impl<W: Write + Send> SerializedFileWriter<W> {
None => Some(self.kv_metadatas.clone()),
};
- let row_groups = self
- .row_groups
- .iter()
- .map(|v| v.to_thrift())
- .collect::<Vec<_>>();
+ // take ownership of metadata
+ let row_groups = std::mem::take(&mut self.row_groups);
+ let column_indexes = std::mem::take(&mut self.column_indexes);
+ let offset_indexes = std::mem::take(&mut self.offset_indexes);
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
- &self.schema,
&self.descr,
row_groups,
Some(self.props.created_by().to_string()),
@@ -365,8 +362,10 @@ impl<W: Write + Send> SerializedFileWriter<W> {
encoder = encoder.with_key_value_metadata(key_value_metadata)
}
- encoder = encoder.with_column_indexes(&self.column_indexes);
- encoder = encoder.with_offset_indexes(&self.offset_indexes);
+ encoder = encoder.with_column_indexes(column_indexes);
+ if !self.props.offset_index_disabled() {
+ encoder = encoder.with_offset_indexes(offset_indexes);
+ }
encoder.finish()
}
@@ -1629,7 +1628,7 @@ mod tests {
file: W,
data: Vec<Vec<i32>>,
compression: Compression,
- ) -> crate::format::FileMetaData
+ ) -> ParquetMetaData
where
W: Write + Send,
R: ChunkReader + From<W> + 'static,
@@ -1644,7 +1643,7 @@ mod tests {
data: Vec<Vec<D::T>>,
value: F,
compression: Compression,
- ) -> crate::format::FileMetaData
+ ) -> ParquetMetaData
where
W: Write + Send,
R: ChunkReader + From<W> + 'static,
@@ -1715,7 +1714,7 @@ mod tests {
/// File write-read roundtrip.
/// `data` consists of arrays of values for each row group.
- fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) ->
crate::format::FileMetaData {
+ fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData
{
test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
}
@@ -1790,13 +1789,12 @@ mod tests {
fn test_column_offset_index_file() {
let file = tempfile::tempfile().unwrap();
let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4,
5]]);
- file_metadata.row_groups.iter().for_each(|row_group| {
- row_group.columns.iter().for_each(|column_chunk| {
- assert_ne!(None, column_chunk.column_index_offset);
- assert_ne!(None, column_chunk.column_index_length);
-
- assert_ne!(None, column_chunk.offset_index_offset);
- assert_ne!(None, column_chunk.offset_index_length);
+ file_metadata.row_groups().iter().for_each(|row_group| {
+ row_group.columns().iter().for_each(|column_chunk| {
+ assert!(column_chunk.column_index_offset().is_some());
+ assert!(column_chunk.column_index_length().is_some());
+ assert!(column_chunk.offset_index_offset().is_some());
+ assert!(column_chunk.offset_index_length().is_some());
})
});
}
@@ -2037,15 +2035,15 @@ mod tests {
row_group_writer.close().unwrap();
let metadata = file_writer.finish().unwrap();
- assert_eq!(metadata.row_groups.len(), 1);
- let row_group = &metadata.row_groups[0];
- assert_eq!(row_group.columns.len(), 2);
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 2);
// Column "a" has both offset and column index, as requested
- assert!(row_group.columns[0].offset_index_offset.is_some());
- assert!(row_group.columns[0].column_index_offset.is_some());
+ assert!(row_group.column(0).offset_index_offset().is_some());
+ assert!(row_group.column(0).column_index_offset().is_some());
// Column "b" should only have offset index
- assert!(row_group.columns[1].offset_index_offset.is_some());
- assert!(row_group.columns[1].column_index_offset.is_none());
+ assert!(row_group.column(1).offset_index_offset().is_some());
+ assert!(row_group.column(1).column_index_offset().is_none());
let err = file_writer.next_row_group().err().unwrap().to_string();
assert_eq!(err, "Parquet error: SerializedFileWriter already
finished");
@@ -2099,9 +2097,8 @@ mod tests {
row_group_writer.close().unwrap();
let file_metadata = writer.close().unwrap();
- assert_eq!(file_metadata.row_groups.len(), 1);
- assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
- assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+ assert_eq!(file_metadata.num_row_groups(), 1);
+ assert_eq!(file_metadata.row_group(0).num_columns(), 1);
let check_def_hist = |def_hist: &[i64]| {
assert_eq!(def_hist.len(), 2);
@@ -2109,29 +2106,26 @@ mod tests {
assert_eq!(def_hist[1], 7);
};
- assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
- let meta_data = file_metadata.row_groups[0].columns[0]
- .meta_data
- .as_ref()
- .unwrap();
- assert!(meta_data.size_statistics.is_some());
- let size_stats = meta_data.size_statistics.as_ref().unwrap();
+ let meta_data = file_metadata.row_group(0).column(0);
- assert!(size_stats.repetition_level_histogram.is_none());
- assert!(size_stats.definition_level_histogram.is_some());
- assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
+ assert!(meta_data.repetition_level_histogram().is_none());
+ assert!(meta_data.definition_level_histogram().is_some());
+ assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
assert_eq!(
unenc_size,
- size_stats.unencoded_byte_array_data_bytes.unwrap()
+ meta_data.unencoded_byte_array_data_bytes().unwrap()
);
-
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
+
check_def_hist(meta_data.definition_level_histogram().unwrap().values());
// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file,
options).unwrap();
let rfile_metadata = reader.metadata().file_metadata();
- assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+ assert_eq!(
+ rfile_metadata.num_rows(),
+ file_metadata.file_metadata().num_rows()
+ );
assert_eq!(reader.num_row_groups(), 1);
let rowgroup = reader.get_row_group(0).unwrap();
assert_eq!(rowgroup.num_columns(), 1);
@@ -2251,9 +2245,8 @@ mod tests {
row_group_writer.close().unwrap();
let file_metadata = writer.close().unwrap();
- assert_eq!(file_metadata.row_groups.len(), 1);
- assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
- assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+ assert_eq!(file_metadata.num_row_groups(), 1);
+ assert_eq!(file_metadata.row_group(0).num_columns(), 1);
let check_def_hist = |def_hist: &[i64]| {
assert_eq!(def_hist.len(), 4);
@@ -2271,25 +2264,22 @@ mod tests {
// check that histograms are set properly in the write and read
metadata
// also check that unencoded_byte_array_data_bytes is not set
- assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
- let meta_data = file_metadata.row_groups[0].columns[0]
- .meta_data
- .as_ref()
- .unwrap();
- assert!(meta_data.size_statistics.is_some());
- let size_stats = meta_data.size_statistics.as_ref().unwrap();
- assert!(size_stats.repetition_level_histogram.is_some());
- assert!(size_stats.definition_level_histogram.is_some());
- assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
-
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
-
check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
+ let meta_data = file_metadata.row_group(0).column(0);
+ assert!(meta_data.repetition_level_histogram().is_some());
+ assert!(meta_data.definition_level_histogram().is_some());
+ assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
+
check_def_hist(meta_data.definition_level_histogram().unwrap().values());
+
check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file,
options).unwrap();
let rfile_metadata = reader.metadata().file_metadata();
- assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+ assert_eq!(
+ rfile_metadata.num_rows(),
+ file_metadata.file_metadata().num_rows()
+ );
assert_eq!(reader.num_row_groups(), 1);
let rowgroup = reader.get_row_group(0).unwrap();
assert_eq!(rowgroup.num_columns(), 1);
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index 1406295c3a..dca0f84179 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -1114,6 +1114,24 @@ impl SchemaDescriptor {
}
}
+// walk tree and count nodes
+pub(crate) fn num_nodes(tp: &TypePtr) -> usize {
+ let mut n_nodes = 1usize; // count root
+ for f in tp.get_fields().iter() {
+ count_nodes(f, &mut n_nodes);
+ }
+ n_nodes
+}
+
+pub(crate) fn count_nodes(tp: &TypePtr, n_nodes: &mut usize) {
+ *n_nodes += 1;
+ if let Type::GroupType { ref fields, .. } = tp.as_ref() {
+ for f in fields {
+ count_nodes(f, n_nodes);
+ }
+ }
+}
+
// do a quick walk of the tree to get proper sizing for SchemaDescriptor arrays
fn num_leaves(tp: &TypePtr) -> usize {
let mut n_leaves = 0usize;
diff --git a/parquet/tests/encryption/encryption.rs
b/parquet/tests/encryption/encryption.rs
index 96dd8654cd..0261c22c2c 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -982,23 +982,17 @@ pub fn
test_retrieve_row_group_statistics_after_encrypted_write() {
}
let file_metadata = writer.close().unwrap();
- assert_eq!(file_metadata.row_groups.len(), 1);
- let row_group = &file_metadata.row_groups[0];
- assert_eq!(row_group.columns.len(), 1);
- let column = &row_group.columns[0];
- let column_stats = column
- .meta_data
- .as_ref()
- .unwrap()
- .statistics
- .as_ref()
- .unwrap();
+ assert_eq!(file_metadata.num_row_groups(), 1);
+ let row_group = file_metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 1);
+ let column = row_group.column(0);
+ let column_stats = column.statistics().unwrap();
assert_eq!(
- column_stats.min_value.as_deref(),
+ column_stats.min_bytes_opt(),
Some(3i32.to_le_bytes().as_slice())
);
assert_eq!(
- column_stats.max_value.as_deref(),
+ column_stats.max_bytes_opt(),
Some(19i32.to_le_bytes().as_slice())
);
}
diff --git a/parquet/tests/encryption/encryption_async.rs
b/parquet/tests/encryption/encryption_async.rs
index 9c1e0c00a3..6999b1a931 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -34,9 +34,9 @@ use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
-use parquet::format::FileMetaData;
use std::io::Write;
use std::sync::Arc;
use tokio::fs::File;
@@ -647,7 +647,7 @@ fn spawn_column_parallel_row_group_writer(
async fn concatenate_parallel_row_groups<W: Write + Send>(
mut parquet_writer: SerializedFileWriter<W>,
mut serialize_rx: Receiver<JoinHandle<RBStreamSerializeResult>>,
-) -> Result<FileMetaData, ParquetError> {
+) -> Result<ParquetMetaData, ParquetError> {
while let Some(task) = serialize_rx.recv().await {
let result = task.await;
let mut rg_out = parquet_writer.next_row_group()?;
@@ -818,8 +818,7 @@ async fn test_multi_threaded_encrypted_writing() {
let metadata = serialized_file_writer.close().unwrap();
// Close the file writer which writes the footer
- assert_eq!(metadata.num_rows, 50);
- assert_eq!(metadata.schema, metadata.schema);
+ assert_eq!(metadata.file_metadata().num_rows(), 50);
// Check that the file was written correctly
let (read_record_batches, read_metadata) =
@@ -909,8 +908,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated()
{
// Close the file writer which writes the footer
let metadata = writer.finish().unwrap();
- assert_eq!(metadata.num_rows, 100);
- assert_eq!(metadata.schema, metadata.schema);
+ assert_eq!(metadata.file_metadata().num_rows(), 100);
// Check that the file was written correctly
let (read_record_batches, read_metadata) =