Copilot commented on code in PR #62306:
URL: https://github.com/apache/doris/pull/62306#discussion_r3061798109
##########
be/src/format/table/equality_delete.cpp:
##########
@@ -52,8 +52,14 @@ Status SimpleEqualityDelete::filter_data_block(
DCHECK(_delete_col_ids.size() == 1);
auto column_field_id = _delete_col_ids[0];
- auto column_and_type = data_block->get_by_position(
-
col_name_to_block_idx->at(id_to_block_column_name.at(column_field_id)));
+ const auto& block_col_name = id_to_block_column_name.at(column_field_id);
+ auto block_idx = col_name_to_block_idx->at(block_col_name);
+ LOG(INFO) << "[EqDeleteDebug] SimpleEqualityDelete::filter_data_block:
field_id="
+ << column_field_id << ", block_col_name=" << block_col_name
+ << ", block_idx=" << block_idx << ", delete_block_rows=" <<
_delete_block->rows()
+ << ", data_block_rows=" << data_block->rows();
Review Comment:
This LOG(INFO) statement inside SimpleEqualityDelete::filter_data_block()
will run for every data block filtered, which can significantly increase log
volume and slow down scans. Please remove it or change to a low-verbosity debug
log (e.g. VLOG) behind a feature flag.
```suggestion
```
##########
be/src/format/table/transactional_hive_reader.cpp:
##########
@@ -21,8 +21,8 @@
#include "core/data_type/data_type_factory.hpp"
#include "format/orc/vorc_reader.h"
-#include "format/table/table_format_reader.h"
-#include "format/table/transactional_hive_common.h"
+#include "format/table/table_schema_change_helper.h"
+#include "transactional_hive_common.h"
Review Comment:
This file now includes the header as "transactional_hive_common.h", but
other translation units include it via the full path (e.g.
"format/table/transactional_hive_common.h"). Using the short include path is
likely to break compilation depending on include directories. Prefer the
consistent full include path here as well.
##########
be/src/format/table/iceberg_reader.h:
##########
@@ -58,203 +61,88 @@ class GenericReader;
class ShardedKVCache;
class VExprContext;
-struct RowLineageColumns {
- int row_id_column_idx = -1;
- int last_updated_sequence_number_column_idx = -1;
- int64_t first_row_id = -1;
- int64_t last_updated_sequence_number = -1;
-
- bool need_row_ids() const { return row_id_column_idx >= 0; }
- bool has_last_updated_sequence_number_column() const {
- return last_updated_sequence_number_column_idx >= 0;
- }
-};
-
-class IcebergTableReader : public TableFormatReader, public
TableSchemaChangeHelper {
-public:
- static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
- static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
- "_last_updated_sequence_number";
-
- IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
- RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx,
- FileMetaCache* meta_cache);
- ~IcebergTableReader() override = default;
-
- void set_need_row_id_column(bool need) { _need_row_id_column = need; }
- bool need_row_id_column() const { return _need_row_id_column; }
- void set_row_id_column_position(int position) { _row_id_column_position =
position; }
-
- Status init_row_filters() final;
-
- Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
-
- enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
- enum Fileformat { NONE, PARQUET, ORC, AVRO };
-
- virtual void set_delete_rows() = 0;
-
- bool has_delete_operations() const override {
- return _equality_delete_impls.size() > 0 ||
TableFormatReader::has_delete_operations();
- }
-
- Status read_deletion_vector(const std::string& data_file_path,
- const TIcebergDeleteFileDesc&
delete_file_desc);
-
- void set_row_lineage_columns(std::shared_ptr<RowLineageColumns>
row_lineage_columns) {
- _row_lineage_columns = std::move(row_lineage_columns);
- }
-
+struct IcebergTableReader {
static bool _is_fully_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata);
-
-protected:
- struct IcebergProfile {
- RuntimeProfile::Counter* num_delete_files;
- RuntimeProfile::Counter* num_delete_rows;
- RuntimeProfile::Counter* delete_files_read_time;
- RuntimeProfile::Counter* delete_rows_sort_time;
- RuntimeProfile::Counter* parse_delete_file_time;
- };
- using DeleteRows = std::vector<int64_t>;
- using DeleteFile = phmap::parallel_flat_hash_map<
- std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>,
std::equal_to<>,
- std::allocator<std::pair<const std::string,
std::unique_ptr<DeleteRows>>>, 8,
- std::mutex>;
-
- // $row_id metadata column generation state
- bool _need_row_id_column = false;
- int _row_id_column_position = -1;
- /**
- * https://iceberg.apache.org/spec/#position-delete-files
- * The rows in the delete file must be sorted by file_path then position
to optimize filtering rows while scanning.
- * Sorting by file_path allows filter pushdown by file in columnar storage
formats.
- * Sorting by position allows filtering rows while scanning, to avoid
keeping deletes in memory.
- */
- static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>&
delete_rows_array,
- int64_t num_delete_rows,
std::vector<int64_t>& result);
-
- static std::string _delet_file_cache_key(const std::string& path) { return
"delete_" + path; }
-
- Status _position_delete_base(const std::string data_file_path,
- const std::vector<TIcebergDeleteFileDesc>&
delete_files);
- virtual Status _process_equality_delete(
- const std::vector<TIcebergDeleteFileDesc>& delete_files) = 0;
- void _generate_equality_delete_block(Block* block,
- const std::vector<std::string>&
equality_delete_col_names,
- const std::vector<DataTypePtr>&
equality_delete_col_types);
- // Equality delete should read the primary columns. Add the missing columns
- Status _expand_block_if_need(Block* block);
- // Remove the added delete columns
- Status _shrink_block_if_need(Block* block);
-
- // owned by scan node
- ShardedKVCache* _kv_cache;
- IcebergProfile _iceberg_profile;
- // _iceberg_delete_rows from kv_cache
- const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
-
- // Pointer to external column name to block index mapping (from
FileScanner)
- // Used to dynamically add expand columns for equality delete
- std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx =
nullptr;
-
- Fileformat _file_format = Fileformat::NONE;
-
- const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
- const int READ_DELETE_FILE_BATCH_SIZE = 102400;
-
- // Read a position delete file from the full Iceberg delete descriptor.
- Status _read_position_delete_file(const TIcebergDeleteFileDesc&,
DeleteFile*);
-
- // read table colummn + extra equality delete columns
- std::vector<std::string> _all_required_col_names;
-
- // extra equality delete name and type
- std::vector<std::string> _expand_col_names;
- std::vector<ColumnWithTypeAndName> _expand_columns;
-
- // all ids that need read for eq delete (from all qe delte file.)
- std::set<int> _equality_delete_col_ids;
- // eq delete column ids -> location of _equality_delete_blocks /
_equality_delete_impls
- std::map<std::vector<int>, int> _equality_delete_block_map;
- // EqualityDeleteBase stores raw pointers to these blocks, so do not
modify this vector after
- // creating entries in _equality_delete_impls.
- std::vector<Block> _equality_delete_blocks;
- std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls;
-
- // id -> block column name.
- std::unordered_map<int, std::string> _id_to_block_column_name;
-
- std::shared_ptr<RowLineageColumns> _row_lineage_columns;
};
-class IcebergParquetReader final : public IcebergTableReader {
+// IcebergParquetReader: inherits ParquetReader via IcebergReaderMixin CRTP
+class IcebergParquetReader final : public IcebergReaderMixin<ParquetReader> {
public:
ENABLE_FACTORY_CREATOR(IcebergParquetReader);
- IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
- RuntimeState* state, const TFileScanRangeParams&
params,
- const TFileRangeDesc& range, ShardedKVCache* kv_cache,
- io::IOContext* io_ctx, FileMetaCache* meta_cache)
- : IcebergTableReader(std::move(file_format_reader), profile,
state, params, range,
- kv_cache, io_ctx, meta_cache) {}
- Status init_reader(
- const std::vector<std::string>& file_col_names,
- std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
- const VExprContextSPtrs& conjuncts,
- phmap::flat_hash_map<int,
std::vector<std::shared_ptr<ColumnPredicate>>>&
- slot_id_to_predicates,
- const TupleDescriptor* tuple_descriptor, const RowDescriptor*
row_descriptor,
- const std::unordered_map<std::string, int>* colname_to_slot_id,
- const VExprContextSPtrs* not_single_slot_filter_conjuncts,
- const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts);
+ IcebergParquetReader(ShardedKVCache* kv_cache, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ size_t batch_size, const cctz::time_zone* ctz,
io::IOContext* io_ctx,
+ RuntimeState* state, FileMetaCache* meta_cache)
+ : IcebergReaderMixin<ParquetReader>(kv_cache, profile, params,
range, batch_size, ctz,
+ io_ctx, state, meta_cache) {}
void set_delete_rows() final {
- auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
- parquet_reader->set_delete_rows(_iceberg_delete_rows);
+ LOG(INFO) << "[PosDeleteDebug] IcebergParquetReader::set_delete_rows:
_iceberg_delete_rows="
+ << (_iceberg_delete_rows
+ ? "set(" +
std::to_string(_iceberg_delete_rows->size()) + ")"
+ : "null");
+ // Call ParquetReader's set_delete_rows(const vector<int64_t>*)
+ ParquetReader::set_delete_rows(_iceberg_delete_rows);
+ }
Review Comment:
The added LOG(INFO) debug output in set_delete_rows() will execute in
production and can be very noisy (potentially per file/scan), impacting
performance and log volume. Please remove these logs or downgrade to a
conditional debug log (e.g. VLOG) guarded by a debug flag.
##########
be/src/format/parquet/vparquet_reader.cpp:
##########
@@ -777,7 +827,13 @@ Status ParquetReader::get_next_block(Block* block, size_t*
read_rows, bool* eof)
RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx(
const tparquet::RowGroup& row_group, const
RowGroupReader::RowGroupIndex& row_group_index) {
+ LOG(INFO) << "[PosDeleteDebug] _get_position_delete_ctx: _delete_rows="
+ << (_delete_rows ? "set(" + std::to_string(_delete_rows->size())
+ ")" : "null")
+ << " row_group.num_rows=" << row_group.num_rows
+ << " first_row=" << row_group_index.first_row;
if (_delete_rows == nullptr) {
+ LOG(INFO) << "[PosDeleteDebug] _get_position_delete_ctx: NO delete
rows, returning "
+ "no-filter ctx";
Review Comment:
ParquetReader::_get_position_delete_ctx() now logs with LOG(INFO) on every
row group. This will be extremely noisy on large files and adds overhead in hot
path. Please remove these logs or convert them to VLOG/TRACE guarded by a debug
setting.
```suggestion
if (_delete_rows == nullptr) {
```
##########
be/src/format/count_reader.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "core/block/block.h"
+#include "format/generic_reader.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+/// A lightweight reader that emits row counts without reading any actual data.
+/// Used as a decorator to replace the real reader when COUNT(*) push down is
active.
+///
+/// Instead of duplicating the COUNT short-circuit logic in every format reader
+/// (ORC, Parquet, etc.), FileScanner creates a CountReader after the real
reader
+/// is initialized and the total row count is known. The CountReader then
serves
+/// all subsequent get_next_block calls by simply resizing columns.
+///
+/// This cleanly separates the "how many rows" concern from the actual data
reading,
+/// eliminating duplicated COUNT blocks across format readers.
+class CountReader : public GenericReader {
+public:
+ /// @param total_rows Total number of rows to emit (post-filter).
+ /// @param batch_size Maximum rows per batch.
+ /// @param inner_reader The original reader, kept alive for profile
collection
+ /// and lifecycle management. Ownership is transferred.
+ CountReader(int64_t total_rows, size_t batch_size,
+ std::unique_ptr<GenericReader> inner_reader = nullptr)
+ : _remaining_rows(total_rows),
+ _batch_size(batch_size),
+ _inner_reader(std::move(inner_reader)) {
+ set_push_down_agg_type(TPushAggOp::type::COUNT);
+ }
+
+ ~CountReader() override = default;
+
+ Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof)
override {
+ auto rows = std::min(_remaining_rows,
static_cast<int64_t>(_batch_size));
+ _remaining_rows -= rows;
+
+ auto mutate_columns = block->mutate_columns();
+ for (auto& col : mutate_columns) {
+ col->resize(rows);
+ }
+ block->set_columns(std::move(mutate_columns));
+
+ *read_rows = rows;
+ *eof = (_remaining_rows == 0);
+ return Status::OK();
+ }
+
+ /// CountReader counts rows by definition.
+ bool count_read_rows() override { return true; }
+
+ /// Delegate to inner reader if available, otherwise return our total.
+ int64_t get_total_rows() const override {
+ return _inner_reader ? _inner_reader->get_total_rows() :
_initial_total_rows();
+ }
+
+ Status close() override {
+ if (_inner_reader) {
+ return _inner_reader->close();
+ }
+ return Status::OK();
+ }
+
+ /// Access the inner reader for profile collection or other lifecycle
needs.
+ GenericReader* inner_reader() const { return _inner_reader.get(); }
+
+protected:
+ void _collect_profile_before_close() override {
+ if (_inner_reader) {
+ _inner_reader->collect_profile_before_close();
+ }
+ }
+
+private:
+ int64_t _initial_total_rows() const { return _remaining_rows; }
+
+ int64_t _remaining_rows;
+ size_t _batch_size;
+ std::unique_ptr<GenericReader> _inner_reader;
Review Comment:
CountReader::get_total_rows() falls back to _initial_total_rows(), but
_initial_total_rows() returns _remaining_rows, which decreases as batches are
emitted. This makes get_total_rows() time-dependent and can underreport after
reading starts. Store the initial total rows in a separate member (e.g.,
_total_rows) and return that when there is no inner reader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]