This is an automated email from the ASF dual-hosted git repository.

hubgeter pushed a commit to branch iceberg-v3-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/iceberg-v3-test-branch-4.1 by 
this push:
     new 582d219d855 [fix](iceberg) Add missing Iceberg field IDs for position 
delete files
582d219d855 is described below

commit 582d219d8555b736650b2dea9c34cc2d172c1a0e
Author: daidai <[email protected]>
AuthorDate: Thu May 21 17:38:08 2026 +0800

    [fix](iceberg) Add missing Iceberg field IDs for position delete files
---
 .../writer/iceberg/viceberg_delete_file_writer.cpp | 25 ++++++++++--
 .../writer/iceberg/viceberg_delete_file_writer.h   |  4 ++
 be/test/exec/sink/viceberg_delete_sink_test.cpp    | 47 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
index 8d1724a4396..968ac987e9a 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
@@ -19,6 +19,8 @@
 
 #include <fmt/format.h>
 
+#include "format/table/iceberg/schema.h"
+#include "format/table/iceberg/types.h"
 #include "format/transformer/vorc_transformer.h"
 #include "format/transformer/vparquet_transformer.h"
 #include "io/file_factory.h"
@@ -26,6 +28,20 @@
 
 namespace doris {
 
+// Iceberg reserved field IDs for position delete files.
+constexpr int POSITION_DELETE_FILE_PATH_ID = 2147483546;
+constexpr int POSITION_DELETE_POS_ID = 2147483545;
+
+std::unique_ptr<iceberg::Schema> build_position_delete_schema() {
+    std::vector<iceberg::NestedField> fields;
+    fields.reserve(2);
+    fields.emplace_back(false, POSITION_DELETE_FILE_PATH_ID, "file_path",
+                        std::make_unique<iceberg::StringType>(), std::nullopt);
+    fields.emplace_back(false, POSITION_DELETE_POS_ID, "pos", 
std::make_unique<iceberg::LongType>(),
+                        std::nullopt);
+    return std::make_unique<iceberg::Schema>(std::move(fields));
+}
+
 VIcebergDeleteFileWriter::VIcebergDeleteFileWriter(TFileContent::type 
delete_type,
                                                    const std::string& 
output_path,
                                                    TFileFormatType::type 
file_format,
@@ -46,6 +62,7 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state, 
RuntimeProfile* profi
     if (_delete_type != TFileContent::POSITION_DELETES) {
         return Status::NotSupported("Iceberg delete file writer only supports 
position deletes");
     }
+    _position_delete_schema = build_position_delete_schema();
 
     _state = state;
 
@@ -83,15 +100,15 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state, 
RuntimeProfile* profi
 
         ParquetFileOptions parquet_options = {parquet_compression_type,
                                               TParquetVersion::PARQUET_1_0, 
false, false};
-        _file_format_transformer.reset(new VParquetTransformer(state, 
_file_writer.get(),
-                                                               output_exprs, 
column_names, false,
-                                                               
parquet_options, nullptr, nullptr));
+        _file_format_transformer.reset(new VParquetTransformer(
+                state, _file_writer.get(), output_exprs, column_names, false, 
parquet_options,
+                nullptr, _position_delete_schema.get()));
         return _file_format_transformer->open();
     }
     case TFileFormatType::FORMAT_ORC: {
         _file_format_transformer.reset(new VOrcTransformer(state, 
_file_writer.get(), output_exprs,
                                                            "", column_names, 
false, _compress_type,
-                                                           nullptr));
+                                                           
_position_delete_schema.get()));
         return _file_format_transformer->open();
     }
     default:
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h 
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
index e5de7143f2b..c242731dc15 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
@@ -32,6 +32,9 @@ namespace doris {
 class RuntimeState;
 class RuntimeProfile;
 class ObjectPool;
+namespace iceberg {
+class Schema;
+}
 
 namespace io {
 class FileSystem;
@@ -103,6 +106,7 @@ private:
     RuntimeState* _state = nullptr;
     std::shared_ptr<io::FileSystem> _fs;
     io::FileWriterPtr _file_writer;
+    std::unique_ptr<iceberg::Schema> _position_delete_schema;
     std::unique_ptr<VFileFormatTransformer> _file_format_transformer;
 
     int32_t _partition_spec_id = 0;
diff --git a/be/test/exec/sink/viceberg_delete_sink_test.cpp 
b/be/test/exec/sink/viceberg_delete_sink_test.cpp
index d9fc5086503..5af028b6b70 100644
--- a/be/test/exec/sink/viceberg_delete_sink_test.cpp
+++ b/be/test/exec/sink/viceberg_delete_sink_test.cpp
@@ -18,6 +18,8 @@
 #include "exec/sink/viceberg_delete_sink.h"
 
 #include <gtest/gtest.h>
+#include <parquet/api/reader.h>
+#include <parquet/schema.h>
 #include <rapidjson/document.h>
 
 #include <filesystem>
@@ -35,7 +37,9 @@
 #include "exec/common/endian.h"
 #include "gen_cpp/DataSinks_types.h"
 #include "gen_cpp/Types_types.h"
+#include "runtime/runtime_profile.h"
 #include "runtime/runtime_state.h"
+#include "testutil/mock/mock_runtime_state.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -480,6 +484,49 @@ TEST_F(VIcebergDeleteSinkTest, TestGenerateDeleteFilePath) 
{
     ASSERT_NE(std::string::npos, delete_file_path.find("delete_pos_"));
 }
 
+TEST_F(VIcebergDeleteSinkTest, TestWritePositionDeleteParquetFieldIds) {
+    std::filesystem::path temp_dir = std::filesystem::temp_directory_path() /
+                                     ("iceberg_position_delete_test_" + 
generate_uuid_string());
+    ASSERT_TRUE(std::filesystem::create_directories(temp_dir));
+
+    TDataSink t_data_sink = build_local_delete_sink(temp_dir.string(), 2);
+    VExprContextSPtrs output_exprs;
+    auto sink = std::make_shared<VIcebergDeleteSink>(t_data_sink, 
output_exprs, nullptr, nullptr);
+    ObjectPool pool;
+    ASSERT_TRUE(sink->init_properties(&pool).ok());
+
+    MockRuntimeState state;
+    RuntimeProfile profile("iceberg_delete_sink");
+    ASSERT_TRUE(sink->open(&state, &profile).ok());
+
+    std::map<std::string, IcebergFileDeletion> file_deletions;
+    auto [file_it, inserted] =
+            file_deletions.emplace("file1.parquet", IcebergFileDeletion(1, 
"[\"p=1\"]"));
+    ASSERT_TRUE(inserted);
+    file_it->second.rows_to_delete.add((uint32_t)10);
+    file_it->second.rows_to_delete.add((uint32_t)20);
+
+    ASSERT_TRUE(sink->_write_position_delete_files(file_deletions).ok());
+    ASSERT_EQ(1, sink->_commit_data_list.size());
+
+    const auto& commit_data = sink->_commit_data_list[0];
+    std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+            parquet::ParquetFileReader::OpenFile(commit_data.file_path, false);
+    std::shared_ptr<parquet::FileMetaData> file_metadata = 
parquet_reader->metadata();
+    const auto& group_node =
+            static_cast<const 
parquet::schema::GroupNode&>(*file_metadata->schema()->group_node());
+
+    ASSERT_EQ(2, group_node.field_count());
+    auto file_path_field = group_node.field(0);
+    auto pos_field = group_node.field(1);
+    EXPECT_EQ("file_path", file_path_field->name());
+    EXPECT_EQ(2147483546, file_path_field->field_id());
+    EXPECT_EQ("pos", pos_field->name());
+    EXPECT_EQ(2147483545, pos_field->field_id());
+
+    ASSERT_TRUE(std::filesystem::remove_all(temp_dir) > 0);
+}
+
 TEST_F(VIcebergDeleteSinkTest, TestUnsupportedDeleteType) {
     // Create a TDataSink for an unsupported delete type
     TDataSink t_eq_delete_sink;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to