github-actions[bot] commented on code in PR #61398:
URL: https://github.com/apache/doris/pull/61398#discussion_r3003488593


##########
be/src/format/orc/vorc_reader.cpp:
##########
@@ -1249,6 +1261,10 @@ Status OrcReader::set_fill_columns(
 
                 _lazy_read_ctx.predicate_orc_columns.emplace_back(
                         
_table_info_node_ptr->children_file_column_name(iter->first));
+                if (check_iceberg_row_lineage_column_idx(read_table_col) != 
-1) {
+                    // Todo : enable lazy mat where filter iceberg row lineage 
column.
+                    _enable_lazy_mat = false;

Review Comment:
   **[Medium] Same issue as in Parquet: `_last_updated_sequence_number` fill 
incorrectly gated behind `need_row_ids()` and `first_row_id >= 0`**
   
   See comment on `vparquet_group_reader.cpp` for detailed explanation. The 
guard condition should not require `first_row_id >= 0` or `need_row_ids()` for 
filling the `last_updated_sequence_number` column.



##########
be/src/exec/sink/viceberg_delete_sink.cpp:
##########
@@ -475,6 +568,160 @@ std::string VIcebergDeleteSink::_get_file_extension() 
const {
     return fmt::format("{}{}", compress_name, file_format_name);
 }
 
+Status VIcebergDeleteSink::_write_deletion_vector_files(
+        const std::map<std::string, IcebergFileDeletion>& file_deletions) {
+    std::vector<DeletionVectorBlob> blobs;
+    for (const auto& [data_file_path, deletion] : file_deletions) {
+        if (deletion.rows_to_delete.isEmpty()) {
+            continue;
+        }
+        roaring::Roaring64Map merged_rows = deletion.rows_to_delete;
+        DeletionVectorBlob blob;
+        blob.delete_count = static_cast<int64_t>(merged_rows.cardinality());
+        auto previous_delete_it = 
_rewritable_delete_files.find(data_file_path);
+        if (previous_delete_it != _rewritable_delete_files.end()) {
+            roaring::Roaring64Map previous_rows;
+            RETURN_IF_ERROR(load_rewritable_delete_rows(
+                    _state, _state->runtime_profile(), data_file_path, 
previous_delete_it->second,
+                    _hadoop_conf, _file_type, _broker_addresses, 
&previous_rows));
+            merged_rows |= previous_rows;
+        }
+
+        size_t bitmap_size = merged_rows.getSizeInBytes();
+        blob.referenced_data_file = data_file_path;
+        blob.partition_spec_id = deletion.partition_spec_id;
+        blob.partition_data_json = deletion.partition_data_json;
+        blob.merged_count = static_cast<int64_t>(merged_rows.cardinality());
+        blob.content_size_in_bytes = static_cast<int64_t>(4 + 4 + bitmap_size 
+ 4);
+        blob.blob_data.resize(static_cast<size_t>(blob.content_size_in_bytes));
+        merged_rows.write(blob.blob_data.data() + 8);
+
+        uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size);
+        BigEndian::Store32(blob.blob_data.data(), total_length);
+
+        constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
+        memcpy(blob.blob_data.data() + 4, DV_MAGIC, 4);
+
+        uint32_t crc = static_cast<uint32_t>(
+                ::crc32(0, reinterpret_cast<const 
Bytef*>(blob.blob_data.data() + 4),
+                        4 + (uInt)bitmap_size));
+        BigEndian::Store32(blob.blob_data.data() + 8 + bitmap_size, crc);
+        blobs.emplace_back(std::move(blob));
+    }
+
+    if (blobs.empty()) {
+        return Status::OK();
+    }
+
+    std::string puffin_path = _generate_puffin_file_path();
+    int64_t puffin_file_size = 0;
+    RETURN_IF_ERROR(_write_puffin_file(puffin_path, &blobs, 
&puffin_file_size));
+
+    for (const auto& blob : blobs) {
+        TIcebergCommitData commit_data;
+        commit_data.__set_file_path(puffin_path);
+        commit_data.__set_row_count(blob.delete_count);
+        commit_data.__set_file_size(puffin_file_size);
+        commit_data.__set_file_content(TFileContent::DELETION_VECTOR);
+        commit_data.__set_content_offset(blob.content_offset);
+        commit_data.__set_content_size_in_bytes(blob.content_size_in_bytes);
+        commit_data.__set_referenced_data_file_path(blob.referenced_data_file);
+        if (blob.partition_spec_id != 0 || !blob.partition_data_json.empty()) {
+            commit_data.__set_partition_spec_id(blob.partition_spec_id);
+            commit_data.__set_partition_data_json(blob.partition_data_json);
+        }
+
+        _commit_data_list.push_back(commit_data);
+        _delete_file_count++;
+    }
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::_write_puffin_file(const std::string& puffin_path,
+                                              std::vector<DeletionVectorBlob>* 
blobs,
+                                              int64_t* out_file_size) {
+    DCHECK(blobs != nullptr);
+    DCHECK(!blobs->empty());
+
+    io::FSPropertiesRef fs_properties(_file_type);
+    fs_properties.properties = &_hadoop_conf;
+    if (!_broker_addresses.empty()) {
+        fs_properties.broker_addresses = &_broker_addresses;
+    }
+    io::FileDescription file_description = {.path = puffin_path, .fs_name {}};
+    auto fs = DORIS_TRY(FileFactory::create_fs(fs_properties, 
file_description));
+    io::FileWriterOptions file_writer_options = {.used_by_s3_committer = 
false};
+    io::FileWriterPtr file_writer;
+    RETURN_IF_ERROR(fs->create_file(file_description.path, &file_writer, 
&file_writer_options));
+
+    constexpr char PUFFIN_MAGIC[] = {'\x50', '\x46', '\x41', '\x31'};
+    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const 
uint8_t*>(PUFFIN_MAGIC), 4)));
+    int64_t current_offset = 4;
+    for (auto& blob : *blobs) {
+        blob.content_offset = current_offset;
+        RETURN_IF_ERROR(file_writer->append(Slice(
+                reinterpret_cast<const uint8_t*>(blob.blob_data.data()), 
blob.blob_data.size())));
+        current_offset += static_cast<int64_t>(blob.blob_data.size());
+    }
+    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const 
uint8_t*>(PUFFIN_MAGIC), 4)));
+
+    std::string footer_json = _build_puffin_footer_json(*blobs);
+    RETURN_IF_ERROR(file_writer->append(
+            Slice(reinterpret_cast<const uint8_t*>(footer_json.data()), 
footer_json.size())));
+
+    char footer_size_buf[4];
+    LittleEndian::Store32(footer_size_buf, 
static_cast<uint32_t>(footer_json.size()));
+    RETURN_IF_ERROR(file_writer->append(
+            Slice(reinterpret_cast<const uint8_t*>(footer_size_buf), 
sizeof(footer_size_buf))));
+
+    char flags[4] = {0, 0, 0, 0};
+    RETURN_IF_ERROR(
+            file_writer->append(Slice(reinterpret_cast<const uint8_t*>(flags), 
sizeof(flags))));
+    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const 
uint8_t*>(PUFFIN_MAGIC), 4)));
+    RETURN_IF_ERROR(file_writer->close());
+
+    *out_file_size = current_offset + 4 + 
static_cast<int64_t>(footer_json.size()) + 4 + 4 + 4;
+    return Status::OK();
+}
+
+std::string VIcebergDeleteSink::_build_puffin_footer_json(
+        const std::vector<DeletionVectorBlob>& blobs) {
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    writer.StartObject();
+    writer.Key("blobs");
+    writer.StartArray();
+    for (const auto& blob : blobs) {
+        writer.StartObject();
+        writer.Key("type");
+        writer.String("deletion-vector-v1");
+        writer.Key("fields");
+        writer.StartArray();
+        writer.EndArray();
+        writer.Key("snapshot-id");
+        writer.Int64(-1);
+        writer.Key("sequence-number");
+        writer.Int64(-1);
+        writer.Key("offset");
+        writer.Int64(blob.content_offset);
+        writer.Key("length");
+        writer.Int64(blob.content_size_in_bytes);
+        writer.Key("properties");
+        writer.StartObject();
+        writer.Key("referenced-data-file");
+        writer.String(blob.referenced_data_file.c_str(),
+                      
static_cast<rapidjson::SizeType>(blob.referenced_data_file.size()));
+        std::string cardinality = std::to_string(blob.merged_count);
+        writer.Key("cardinality");
+        writer.String(cardinality.c_str(), 
static_cast<rapidjson::SizeType>(cardinality.size()));
+        writer.EndObject();
+        writer.EndObject();
+    }
+    writer.EndArray();
+    writer.EndObject();
+    return {buffer.GetString(), buffer.GetSize()};
+}

Review Comment:
   **[Medium] Puffin footer JSON missing top-level `"properties"` field**
   
   The Puffin spec v1 defines the footer payload as:
   ```json
   {
     "blobs": [ ... ],
     "properties": { }
   }
   ```
   
   The current code only emits `"blobs"` at the top level. While some 
implementations tolerate its absence, strict Puffin readers (e.g., Spark, 
Trino) could reject the file.
   
   **Fix:** Add before the top-level `EndObject()`:
   ```cpp
   writer.Key("properties");
   writer.StartObject();
   writer.EndObject();
   ```



##########
be/src/format/parquet/vparquet_group_reader.cpp:
##########
@@ -1067,16 +1075,55 @@ Status RowGroupReader::_get_current_batch_row_id(size_t 
read_rows) {
 
 Status RowGroupReader::_fill_row_id_columns(Block* block, size_t read_rows,
                                             bool is_current_row_ids) {
+    const bool need_row_ids =
+            _row_id_column_iterator_pair.first != nullptr ||
+            (_row_lineage_columns != nullptr && 
_row_lineage_columns->need_row_ids());
+    if (need_row_ids && !is_current_row_ids) {
+        RETURN_IF_ERROR(_get_current_batch_row_id(read_rows));
+    }
     if (_row_id_column_iterator_pair.first != nullptr) {
-        if (!is_current_row_ids) {
-            RETURN_IF_ERROR(_get_current_batch_row_id(read_rows));
-        }
         auto col = block->get_by_position(_row_id_column_iterator_pair.second)
                            .column->assume_mutable();
         RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids(

Review Comment:
   **[Medium] `_last_updated_sequence_number` fill incorrectly gated behind 
`need_row_ids()` and `first_row_id >= 0`**
   
   `last_updated_sequence_number` is a per-file constant (the sequence number 
of the operation that last wrote this file). It has no logical dependency on 
`_row_id` or whether row IDs are being computed.
   
   If a query selects only `_last_updated_sequence_number` without `_row_id`, 
then `need_row_ids()` returns false (since `row_id_column_idx` would be -1), 
and the sequence number column would incorrectly remain null.
   
   The same issue exists in the ORC reader at `vorc_reader.cpp:1576-1579`.
   
   **Fix:** The guard for `_last_updated_sequence_number` should only check:
   ```cpp
   if (_row_lineage_columns != nullptr &&
       _row_lineage_columns->has_last_updated_sequence_number_column() &&
       _row_lineage_columns->last_updated_sequence_number >= 0) {
   ```
   
   Remove the `first_row_id >= 0` and `need_row_ids()` conditions.



##########
be/src/exec/sink/viceberg_delete_sink.cpp:
##########
@@ -30,14 +33,79 @@
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/data_type_string.h"
 #include "core/data_type/data_type_struct.h"
+#include "exec/common/endian.h"
 #include "exprs/vexpr.h"
+#include "format/table/iceberg_delete_file_reader_helper.h"
 #include "format/transformer/vfile_format_transformer.h"
+#include "io/file_factory.h"
 #include "runtime/runtime_state.h"
+#include "util/slice.h"
 #include "util/string_util.h"
 #include "util/uid_util.h"
 
 namespace doris {
 
+namespace {
+
+class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
+public:
+    RewriteBitmapVisitor(const std::string& referenced_data_file_path,
+                         roaring::Roaring64Map* rows_to_delete)

Review Comment:
   **[Critical] `row_count` uses pre-merge `delete_count` instead of post-merge 
`merged_count`**
   
   `blob.delete_count` is assigned at line 580 **before** the merge with old 
deletion vectors (line 587: `merged_rows |= previous_rows`). The DV blob 
physically written to disk contains the **merged** bitmap, but `row_count` 
reports only the new deletes.
   
   This creates an inconsistency:
   - Puffin footer JSON correctly uses `merged_count` (line 714: 
`std::to_string(blob.merged_count)`)
   - But the thrift commit data claims a smaller count
   
   In FE, `IcebergWriterHelper.convertToDeleteFiles()` uses `getRowCount()` as 
the manifest `recordCount` for the `DeleteFile` (line 218: 
`withRecordCount(recordCount)`). The Iceberg manifest will thus contain an 
incorrect record count for the DV, which can cause issues with Iceberg readers 
(Spark, Trino, etc.) that validate or use this count.
   
   Also, `IcebergTransaction.getUpdateCnt()` (line 569) accumulates 
`getRowCount()` for delete rows — this would undercount affected rows.
   
   **Fix:**
   ```cpp
   commit_data.__set_row_count(blob.merged_count);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to