liurenjie1024 commented on code in PR #1769:
URL: https://github.com/apache/iceberg-rust/pull/1769#discussion_r2454561841
##########
crates/iceberg/src/arrow/record_batch_partition_splitter.rs:
##########
@@ -40,62 +43,83 @@ use crate::{Error, ErrorKind, Result};
pub struct RecordBatchPartitionSplitter {
schema: SchemaRef,
partition_spec: PartitionSpecRef,
- projector: RecordBatchProjector,
+ projector: Option<RecordBatchProjector>,
Review Comment:
This change is somehow ugly. The splitter could be split into two parts:
1. Calculate partition value.
2. Split record batch according to partition value.
We could abstract out the process of calculating partition value.
##########
crates/integrations/datafusion/src/writer/task.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TaskWriter for DataFusion integration.
+//!
+//! This module provides a high-level writer that handles partitioning and
routing
+//! of RecordBatch data to Iceberg tables.
+
+use datafusion::arrow::array::RecordBatch;
+use iceberg::Result;
+use iceberg::arrow::RecordBatchPartitionSplitter;
+use iceberg::spec::{DataFile, PartitionSpecRef, SchemaRef};
+use iceberg::writer::IcebergWriterBuilder;
+use iceberg::writer::partitioning::PartitioningWriter;
+use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
+use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
+use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
+
+/// High-level writer for DataFusion that handles partitioning and routing of
RecordBatch data.
+///
+/// TaskWriter coordinates writing data to Iceberg tables by:
+/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout,
or clustered)
+/// - Lazily initializing the partition splitter on first write
+/// - Routing data to the underlying writer
+/// - Collecting all written data files
+///
+/// # Type Parameters
+///
+/// * `B` - The IcebergWriterBuilder type used to create underlying writers
+///
+/// # Example
+///
+/// ```rust,ignore
+/// use iceberg::spec::{PartitionSpec, Schema};
+/// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+/// use iceberg_datafusion::writer::task::TaskWriter;
+///
+/// // Create a TaskWriter for an unpartitioned table
+/// let task_writer = TaskWriter::new(
+/// data_file_writer_builder,
+/// false, // fanout_enabled
+/// schema,
+/// partition_spec,
+/// );
+///
+/// // Write data
+/// task_writer.write(record_batch).await?;
+///
+/// // Close and get data files
+/// let data_files = task_writer.close().await?;
+/// ```
+pub struct TaskWriter<B: IcebergWriterBuilder> {
+ /// The underlying writer (UnpartitionedWriter, FanoutWriter, or
ClusteredWriter)
+ writer: SupportedWriter<B>,
+ /// Lazily initialized partition splitter for partitioned tables
+ partition_splitter: Option<RecordBatchPartitionSplitter>,
+ /// Iceberg schema reference
+ schema: SchemaRef,
+ /// Partition specification reference
+ partition_spec: PartitionSpecRef,
+}
+
+/// Internal enum to hold the different writer types.
+///
+/// This enum allows TaskWriter to work with different partitioning strategies
+/// while maintaining a unified interface.
+enum SupportedWriter<B: IcebergWriterBuilder> {
+ /// Writer for unpartitioned tables
+ Unpartitioned(UnpartitionedWriter<B>),
+ /// Writer for partitioned tables with unsorted data (maintains multiple
active writers)
+ Fanout(FanoutWriter<B>),
+ /// Writer for partitioned tables with sorted data (maintains single
active writer)
+ Clustered(ClusteredWriter<B>),
Review Comment:
We could simplify this as
```rust
Partitioned {
splitter: RecordBatchSplitter,
partitioned_writer: Arc<dyn PartitionedWriter>
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]