evindj commented on code in PR #582:
URL: https://github.com/apache/iceberg-cpp/pull/582#discussion_r2918437396


##########
src/iceberg/test/data_writer_test.cc:
##########
@@ -264,4 +266,134 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
   EXPECT_GT(data_file->file_size_in_bytes, 0);
 }
 
+class PositionDeleteWriterTest : public DataWriterTest {
+ protected:
+  PositionDeleteWriterOptions MakeDeleteOptions() {
+    return PositionDeleteWriterOptions{
+        .path = "test_deletes.parquet",
+        .schema = schema_,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+  }
+
+  std::shared_ptr<::arrow::Array> CreatePositionDeleteData() {
+    auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+        MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos});
+
+    ArrowSchema arrow_c_schema;
+    ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema));
+    auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    return ::arrow::json::ArrayFromJSONString(
+               ::arrow::struct_(arrow_type->fields()),
+               R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5], 
["data_file_1.parquet", 10]])")
+        .ValueOrDie();
+  }
+};
+
+TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto length_result = writer->Length();
+  ASSERT_THAT(length_result, IsOk());
+  EXPECT_GT(length_result.value(), 0);
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataAfterClose) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& write_result = metadata_result.value();
+  ASSERT_EQ(write_result.data_files.size(), 1);
+
+  const auto& data_file = write_result.data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_EQ(data_file->file_path, "test_deletes.parquet");
+  EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+  EXPECT_FALSE(data_file->sort_order_id.has_value());
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(metadata_result,
+              HasErrorMessage("Cannot get metadata before closing the 
writer"));
+}
+
+TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  for (int64_t i = 0; i < 100; ++i) {
+    ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
+  }
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+TEST_F(PositionDeleteWriterTest, WriteBatchData) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  auto test_data = CreatePositionDeleteData();
+  ArrowArray arrow_array;
+  ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+  ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}

Review Comment:
   It would be nice if you could test the automatic flush logic.



##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,203 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    // Build the position delete schema with file_path and pos columns
+    std::vector<SchemaField> fields;
+    fields.push_back(MetadataColumns::kDeleteFilePath);
+    fields.push_back(MetadataColumns::kDeleteFilePos);
+
+    auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = delete_schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(delete_schema), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->Write(data);
+  }
+
+  Status WriteDelete(std::string_view file_path, int64_t pos) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    buffered_paths_.emplace_back(file_path);
+    buffered_positions_.push_back(pos);
+    referenced_paths_.emplace(file_path);
+
+    if (static_cast<int64_t>(buffered_paths_.size()) >= kFlushThreshold) {

Review Comment:
   Do we want to make ```kFlushThreshold``` configurable via options?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to