ZENOTME commented on code in PR #1657:
URL: https://github.com/apache/iceberg-rust/pull/1657#discussion_r2328690254
##########
crates/iceberg/src/writer/file_writer/rolling_writer.rs:
##########
@@ -17,65 +17,75 @@
use arrow_array::RecordBatch;
-use crate::spec::DataFileBuilder;
-use crate::writer::CurrentFileStatus;
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::io::FileIO;
+use crate::spec::{DataFile, PartitionKey};
+use crate::writer::file_writer::location_generator::{FileNameGenerator,
LocationGenerator};
+use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};
-/// Builder for creating a `RollingFileWriter` that rolls over to a new file
-/// when the data size exceeds a target threshold.
-#[derive(Clone)]
-pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
+/// A writer that automatically rolls over to a new file when the data size
+/// exceeds a target threshold.
+///
+/// This writer wraps another writer that tracks the amount of data written.
+/// When the data size exceeds the target size, it closes the current file and
+/// starts writing to a new one.
+pub struct RollingWriter<B, L, F>
+where
+ B: IcebergWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ inner: Option<B::R>,
inner_builder: B,
target_file_size: usize,
+ location_generator: L,
+ file_name_generator: F,
+ file_io: FileIO,
+ partition_key: Option<PartitionKey>,
+ data_files: Vec<DataFile>, // todo this should be B::R::O? DefaultOutput?
}
-impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
- /// Creates a new `RollingFileWriterBuilder` with the specified inner
builder and target size.
+impl<B, L, F> RollingWriter<B, L, F>
+where
+ B: IcebergWriterBuilder,
Review Comment:
One thing need to noticed is that following is what IcebergWriterBuilder
looks like.
```
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Send + Clone + 'static
{
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
}
```
For writer like position delete writer, it has different input like
following, see: https://github.com/apache/iceberg-rust/pull/704
```
#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
for PositionDeleteWriterBuilder<B>
{
type R = PositionDeleteWriter<B>;
async fn build(self) -> Result<Self::R> {
Ok(PositionDeleteWriter {
inner_writer: Some(self.inner.build().await?),
partition_value: self.partition_value.unwrap_or(Struct::empty()),
})
}
}
```
And that's why rolling writer is a FileWriter at first. After we adopt this
design, how can we something like
```
RollingWriter<PostitionDeletWriter>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]