This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d286aa7bf7 [fix](spark-load) no need to filter row group when doing spark load (#13116) d286aa7bf7 is described below commit d286aa7bf7f0585fc1e4e9464e69eb8f378d6d40 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Oct 5 23:00:56 2022 +0800 [fix](spark-load) no need to filter row group when doing spark load (#13116) 1. Fix issue #13115 2. Modify the method of `get_next_block` or `GenericReader`, to return "read_rows" explicitly. Some columns in block may not be filled in reader, if the first column is not filled, use `block->rows()` can not return real row numbers. 3. Add more checks for broker load test cases. --- be/src/common/config.h | 1 - be/src/exec/arrow/arrow_reader.cpp | 5 +++-- be/src/exec/arrow/arrow_reader.h | 2 +- be/src/exec/arrow/parquet_reader.cpp | 5 +++-- be/src/exec/arrow/parquet_reader.h | 1 + be/src/exec/base_scanner.h | 2 +- be/src/vec/exec/format/generic_reader.h | 2 +- .../vec/exec/format/parquet/vparquet_group_reader.cpp | 4 +++- .../vec/exec/format/parquet/vparquet_group_reader.h | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 5 +++-- be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +- be/src/vec/exec/scan/vfile_scanner.cpp | 10 ++++++---- be/test/vec/exec/parquet/parquet_reader_test.cpp | 3 ++- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 3 ++- regression-test/conf/regression-conf.groovy | 2 ++ .../load_p0/broker_load/test_broker_load.groovy | 19 ++++++++++++++++++- 16 files changed, 48 insertions(+), 20 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8ee1649378..736381873b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -819,7 +819,6 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); -CONF_Bool(parquet_predicate_push_down, "true"); // Max size of parquet page header in bytes CONF_mInt32(parquet_header_max_size_mb, "1"); // Max buffer size for parquet row group diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 72d4960a43..d4b8f11eaa 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -98,7 +98,7 @@ int ArrowReaderWrap::get_column_index(std::string column_name) { } } -Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { +Status ArrowReaderWrap::get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) { size_t rows = 0; bool tmp_eof = false; do { @@ -107,7 +107,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { // We need to make sure the eof is set to true iff block is empty. if (tmp_eof) { *eof = (rows == 0); - return Status::OK(); + break; } } @@ -129,6 +129,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { rows += num_elements; _arrow_batch_cur_idx += num_elements; } while (!tmp_eof && rows < _state->batch_size()); + *read_row = rows; return Status::OK(); } diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 2d83a1be01..561f67fe94 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -92,7 +92,7 @@ public: return Status::NotSupported("Not Implemented read"); } // for vec - Status get_next_block(vectorized::Block* block, bool* eof) override; + Status get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) override; // This method should be deprecated once the old scanner is removed. // And user should use "get_next_block" instead. Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof); diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index fc8c1ca5f5..f4d5aa5342 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -100,7 +100,8 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, _timezone = timezone; RETURN_IF_ERROR(column_indices()); - if (config::parquet_predicate_push_down) { + _need_filter_row_group = (tuple_desc != nullptr); + if (_need_filter_row_group) { int64_t file_size = 0; size(&file_size); _row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size, @@ -551,7 +552,7 @@ void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int curr } bool ParquetReaderWrap::filter_row_group(int current_group) { - if (config::parquet_predicate_push_down) { + if (_need_filter_row_group) { auto filter_group_set = _row_group_reader->filter_groups(); if (filter_group_set.end() != filter_group_set.find(current_group)) { // find filter group, skip diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index a2a80a1966..fcb96d46d2 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -101,6 +101,7 @@ private: std::string _timezone; int64_t _range_start_offset; int64_t _range_size; + bool _need_filter_row_group = false; private: std::unique_ptr<doris::RowGroupReader> _row_group_reader; diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 6711836e53..c6bcde2f67 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -153,7 +153,7 @@ protected: int _num_of_columns_from_file; // slot_ids for parquet predicate push down are in tuple desc - TupleId _tupleId; + TupleId _tupleId = -1; std::vector<ExprContext*> _conjunct_ctxs; private: diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index d838f4dac1..ea13a62627 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -28,7 +28,7 @@ class Block; // a set of blocks with specified schema, class GenericReader { public: - virtual Status get_next_block(Block* block, bool* eof) = 0; + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() { std::unordered_map<std::string, TypeDescriptor> map; return map; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index ddcc6494d0..a46813cbf2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -60,7 +60,8 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> return Status::OK(); } -Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) { +Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows, + bool* _batch_eof) { size_t batch_read_rows = 0; bool has_eof = false; int col_idx = 0; @@ -86,6 +87,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_ has_eof = col_eof; col_idx++; } + *read_rows = batch_read_rows; _read_rows += batch_read_rows; *_batch_eof = has_eof; // use data fill utils read column data to column ptr diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 27daffe6f7..e1b54bb529 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -31,7 +31,7 @@ public: ~RowGroupReader(); Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); - Status next_batch(Block* block, size_t batch_size, bool* _batch_eof); + Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof); ParquetColumnReader::Statistics statistics(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6b8a01c83c..4252402ebd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -177,7 +177,7 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -Status ParquetReader::get_next_block(Block* block, bool* eof) { +Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { int32_t num_of_readers = _row_group_readers.size(); DCHECK(num_of_readers <= _read_row_groups.size()); if (_read_row_groups.empty()) { @@ -187,7 +187,8 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) { bool _batch_eof = false; { SCOPED_RAW_TIMER(&_statistics.column_read_time); - RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof)); + RETURN_IF_ERROR( + _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof)); } if (_batch_eof) { auto column_st = _current_group_reader->statistics(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index ab44c31517..e495f7089f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -61,7 +61,7 @@ public: Status init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); - Status get_next_block(Block* block, bool* eof) override; + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; void close(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index f6f8127146..b37ca30290 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -99,7 +99,7 @@ Status VFileScanner::open(RuntimeState* state) { } // For query: -// [exist cols] [non-exist cols] [col from path] input ouput +// [exist cols] [non-exist cols] [col from path] input output // A B C D E // _init_src_block x x x x x - x // get_next_block x x x - - - x @@ -109,7 +109,7 @@ Status VFileScanner::open(RuntimeState* state) { // _convert_to_output_block - - - - - - - // // For load: -// [exist cols] [non-exist cols] [col from path] input ouput +// [exist cols] [non-exist cols] [col from path] input output // A B C D E // _init_src_block x x x x x x - // get_next_block x x x - - x - @@ -130,15 +130,17 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo // Init src block for load job based on the data file schema (e.g. parquet) // For query job, simply set _src_block_ptr to block. + size_t read_rows = 0; RETURN_IF_ERROR(_init_src_block(block)); { SCOPED_TIMER(_get_block_timer); // Read next block. // Some of column in block may not be filled (column not exist in file) - RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof)); + RETURN_IF_ERROR( + _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); } - if (_src_block_ptr->rows() > 0) { + if (read_rows > 0) { // Convert the src block columns type to string in-place. RETURN_IF_ERROR(_cast_to_input_block(block)); // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 68a2043d66..b2288338b3 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -122,7 +122,8 @@ TEST_F(ParquetReaderTest, normal) { ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); } bool eof = false; - p_reader->get_next_block(block, &eof); + size_t read_row = 0; + p_reader->get_next_block(block, &read_row, &eof); for (auto& col : block->get_columns_with_type_and_name()) { ASSERT_EQ(col.column->size(), 10); } diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index c18d3099d7..1e530d4410 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -411,7 +411,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) { ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); } bool batch_eof = false; - auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof); + size_t read_rows = 0; + auto stb = row_group_reader->next_batch(&block, 1024, &read_rows, &batch_eof); EXPECT_TRUE(stb.ok()); LocalFileReader result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0); diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 170ae8cca5..506cf89bf7 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -63,3 +63,5 @@ brokerName = "broker_name" // broker load test config enableBrokerLoad=false +ak="" +sk="" diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy index 8cfc912a40..0a9a6739df 100644 --- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy @@ -60,6 +60,20 @@ suite("test_broker_load", "p0") { ] def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10", ""] + def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=163703; dpp.abnorm.ALL=0; dpp.norm.ALL=36294", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000"] + String ak = getS3AK() String sk = getS3SK() String enabled = context.config.otherConfigs.get("enableBrokerLoad") @@ -126,12 +140,14 @@ suite("test_broker_load", "p0") { i++ } + i = 0 for (String label in uuids) { max_try_milli_secs = 600000 while (max_try_milli_secs > 0) { - String[][] result = sql """ show load where label="$label"; """ + String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ if (result[0][2].equals("FINISHED")) { logger.info("Load FINISHED " + label) + assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5]) break; } if (result[0][2].equals("CANCELLED")) { @@ -143,6 +159,7 @@ suite("test_broker_load", "p0") { assertTrue(1 == 2, "Load Timeout.") } } + i++ } } finally { for (String table in tables) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org