This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6eb8ac0ebf [feature-wip][multi-catalog]Support caseSensitive field name in file scan node (#11310) 6eb8ac0ebf is described below commit 6eb8ac0ebf08467f9e2ea56cddd85528a0d2a124 Author: huangzhaowei <huangzhaowei....@bytedance.com> AuthorDate: Fri Aug 5 08:03:16 2022 +0800 [feature-wip][multi-catalog]Support caseSensitive field name in file scan node (#11310) * Impl case sentive in file scan node --- be/src/exec/arrow/arrow_reader.cpp | 20 ++++++++++++++++++-- be/src/exec/arrow/arrow_reader.h | 5 ++++- be/src/exec/arrow/orc_reader.cpp | 10 +++++++--- be/src/exec/arrow/orc_reader.h | 2 +- be/src/exec/arrow/parquet_reader.cpp | 11 +++++++---- be/src/exec/arrow/parquet_reader.h | 2 +- be/src/vec/exec/file_arrow_scanner.cpp | 10 +++++++--- .../org/apache/doris/common/util/BrokerUtil.java | 11 ++++++++++- .../java/org/apache/doris/planner/HudiScanNode.java | 2 +- .../doris/planner/external/ExternalFileScanNode.java | 2 +- 10 files changed, 57 insertions(+), 18 deletions(-) diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 5d1785f744..a2e5f7c33e 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -30,6 +30,7 @@ #include "runtime/exec_env.h" #include "runtime/mem_pool.h" #include "runtime/tuple.h" +#include "util/string_util.h" #include "util/thrift_util.h" namespace doris { @@ -37,8 +38,10 @@ namespace doris { // Broker ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) - : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) { + int32_t num_of_columns_from_file, bool caseSensitive) + : _batch_size(batch_size), + _num_of_columns_from_file(num_of_columns_from_file), + _caseSensitive(caseSensitive) { _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader)); _rb_reader = nullptr; _total_groups = 0; @@ -81,6 +84,19 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple return Status::OK(); } +int ArrowReaderWrap::get_cloumn_index(std::string column_name) { + std::string real_column_name = _caseSensitive ? column_name : to_lower(column_name); + auto iter = _map_column.find(real_column_name); + if (iter != _map_column.end()) { + return iter->second; + } else { + std::stringstream str_error; + str_error << "Invalid Column Name:" << real_column_name; + LOG(WARNING) << str_error.str(); + return -1; + } +} + Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) { std::unique_lock<std::mutex> lock(_mtx); while (!_closed && _queue.empty()) { diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 704ca0750e..159377c480 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -79,7 +79,8 @@ private: // base of arrow reader class ArrowReaderWrap { public: - ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); + ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, + bool caseSensitive); virtual ~ArrowReaderWrap(); virtual Status init_reader(const TupleDescriptor* tuple_desc, @@ -96,6 +97,7 @@ public: std::shared_ptr<Statistics>& statistics() { return _statistics; } void close(); virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } + int get_cloumn_index(std::string column_name); void prefetch_batch(); @@ -124,6 +126,7 @@ protected: std::list<std::shared_ptr<arrow::RecordBatch>> _queue; const size_t _max_queue_size = config::parquet_reader_max_buffer_size; std::thread _thread; + bool _caseSensitive; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 0db5640369..0156355b39 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -24,13 +24,14 @@ #include "io/file_reader.h" #include "runtime/mem_pool.h" #include "runtime/tuple.h" +#include "util/string_util.h" namespace doris { ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), + int64_t range_size, bool caseSensitive) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive), _range_start_offset(range_start_offset), _range_size(range_size) { _reader = nullptr; @@ -66,8 +67,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, } std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie(); for (size_t i = 0; i < schema->num_fields(); ++i) { + std::string schemaName = + _caseSensitive ? schema->field(i)->name() : to_lower(schema->field(i)->name()); // orc index started from 1. - _map_column.emplace(schema->field(i)->name(), i + 1); + + _map_column.emplace(schemaName, i + 1); } RETURN_IF_ERROR(column_indices(tuple_slot_descs)); diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index 1e6f0f83e6..392addfea9 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -33,7 +33,7 @@ namespace doris { class ORCReaderWrap final : public ArrowReaderWrap { public: ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size); + int64_t range_start_offset, int64_t range_size, bool caseSensitive = true); ~ORCReaderWrap() override = default; Status init_reader(const TupleDescriptor* tuple_desc, diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index 8d119146b4..03f6657586 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -32,14 +32,15 @@ #include "runtime/mem_pool.h" #include "runtime/string_value.h" #include "runtime/tuple.h" +#include "util/string_util.h" namespace doris { // Broker ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), + int64_t range_size, bool caseSensitive) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive), _rows_of_group(0), _current_line_of_group(0), _current_line_of_batch(0), @@ -84,12 +85,14 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, // map auto* schemaDescriptor = _file_metadata->schema(); for (int i = 0; i < _file_metadata->num_columns(); ++i) { + std::string schemaName; // Get the Column Reader for the boolean column if (schemaDescriptor->Column(i)->max_definition_level() > 1) { - _map_column.emplace(schemaDescriptor->Column(i)->path()->ToDotVector()[0], i); + schemaName = schemaDescriptor->Column(i)->path()->ToDotVector()[0]; } else { - _map_column.emplace(schemaDescriptor->Column(i)->name(), i); + schemaName = schemaDescriptor->Column(i)->name(); } + _map_column.emplace(_caseSensitive ? schemaName : to_lower(schemaName), i); } _timezone = timezone; diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index 95774f60b0..d4805f8d84 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap { public: // batch_size is not use here ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size); + int64_t range_start_offset, int64_t range_size, bool caseSensitive = true); ~ParquetReaderWrap() override = default; // Read diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index e6c4fa7597..d416fb735c 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -186,7 +186,11 @@ Status FileArrowScanner::_append_batch_to_block(Block* block) { if (slot_desc == nullptr) { continue; } - auto* array = _batch->GetColumnByName(slot_desc->col_name()).get(); + int file_index = _cur_file_reader->get_cloumn_index(slot_desc->col_name()); + if (file_index == -1) { + continue; + } + auto* array = _batch->column(file_index).get(); auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); RETURN_IF_ERROR(arrow_column_to_doris_column( array, _arrow_batch_cur_idx, column_with_type_and_name.column, @@ -228,7 +232,7 @@ ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t range_start_offset, int64_t range_size) { return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, - range_start_offset, range_size); + range_start_offset, range_size, false); } void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) { @@ -252,7 +256,7 @@ ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int int64_t range_start_offset, int64_t range_size) { return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, - range_size); + range_size, false); } } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index be7fa7c63f..0db7a16df0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -222,6 +222,14 @@ public class BrokerUtil { public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath) throws UserException { + return parseColumnsFromPath(filePath, columnsFromPath, true); + } + + public static List<String> parseColumnsFromPath( + String filePath, + List<String> columnsFromPath, + boolean caseSensitive) + throws UserException { if (columnsFromPath == null || columnsFromPath.isEmpty()) { return Collections.emptyList(); } @@ -246,7 +254,8 @@ public class BrokerUtil { throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } - int index = columnsFromPath.indexOf(pair[0]); + String parsedColumnName = caseSensitive ? pair[0] : pair[0].toLowerCase(); + int index = columnsFromPath.indexOf(parsedColumnName); if (index == -1) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index dab3a9bfcd..93ab58afe1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -274,7 +274,7 @@ public class HudiScanNode extends BrokerScanNode { TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - getPartitionKeys()); + getPartitionKeys(), false); int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, fileFormatType, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 1ae05ff630..2984352066 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -309,7 +309,7 @@ public class ExternalFileScanNode extends ExternalScanNode { totalFileSize += split.getLength(); List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - partitionKeys); + partitionKeys, false); TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org