This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eb8ac0ebf [feature-wip][multi-catalog]Support caseSensitive field 
name in file scan node (#11310)
6eb8ac0ebf is described below

commit 6eb8ac0ebf08467f9e2ea56cddd85528a0d2a124
Author: huangzhaowei <huangzhaowei....@bytedance.com>
AuthorDate: Fri Aug 5 08:03:16 2022 +0800

    [feature-wip][multi-catalog]Support caseSensitive field name in file scan 
node (#11310)
    
    * Impl case sentive in file scan node
---
 be/src/exec/arrow/arrow_reader.cpp                   | 20 ++++++++++++++++++--
 be/src/exec/arrow/arrow_reader.h                     |  5 ++++-
 be/src/exec/arrow/orc_reader.cpp                     | 10 +++++++---
 be/src/exec/arrow/orc_reader.h                       |  2 +-
 be/src/exec/arrow/parquet_reader.cpp                 | 11 +++++++----
 be/src/exec/arrow/parquet_reader.h                   |  2 +-
 be/src/vec/exec/file_arrow_scanner.cpp               | 10 +++++++---
 .../org/apache/doris/common/util/BrokerUtil.java     | 11 ++++++++++-
 .../java/org/apache/doris/planner/HudiScanNode.java  |  2 +-
 .../doris/planner/external/ExternalFileScanNode.java |  2 +-
 10 files changed, 57 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/arrow/arrow_reader.cpp 
b/be/src/exec/arrow/arrow_reader.cpp
index 5d1785f744..a2e5f7c33e 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -30,6 +30,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/mem_pool.h"
 #include "runtime/tuple.h"
+#include "util/string_util.h"
 #include "util/thrift_util.h"
 
 namespace doris {
@@ -37,8 +38,10 @@ namespace doris {
 // Broker
 
 ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
-                                 int32_t num_of_columns_from_file)
-        : _batch_size(batch_size), 
_num_of_columns_from_file(num_of_columns_from_file) {
+                                 int32_t num_of_columns_from_file, bool 
caseSensitive)
+        : _batch_size(batch_size),
+          _num_of_columns_from_file(num_of_columns_from_file),
+          _caseSensitive(caseSensitive) {
     _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
     _rb_reader = nullptr;
     _total_groups = 0;
@@ -81,6 +84,19 @@ Status ArrowReaderWrap::column_indices(const 
std::vector<SlotDescriptor*>& tuple
     return Status::OK();
 }
 
+int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
+    std::string real_column_name = _caseSensitive ? column_name : 
to_lower(column_name);
+    auto iter = _map_column.find(real_column_name);
+    if (iter != _map_column.end()) {
+        return iter->second;
+    } else {
+        std::stringstream str_error;
+        str_error << "Invalid Column Name:" << real_column_name;
+        LOG(WARNING) << str_error.str();
+        return -1;
+    }
+}
+
 Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, 
bool* eof) {
     std::unique_lock<std::mutex> lock(_mtx);
     while (!_closed && _queue.empty()) {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 704ca0750e..159377c480 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -79,7 +79,8 @@ private:
 // base of arrow reader
 class ArrowReaderWrap {
 public:
-    ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file);
+    ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
+                    bool caseSensitive);
     virtual ~ArrowReaderWrap();
 
     virtual Status init_reader(const TupleDescriptor* tuple_desc,
@@ -96,6 +97,7 @@ public:
     std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
     virtual Status size(int64_t* size) { return Status::NotSupported("Not 
Implemented size"); }
+    int get_cloumn_index(std::string column_name);
 
     void prefetch_batch();
 
@@ -124,6 +126,7 @@ protected:
     std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
     const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
     std::thread _thread;
+    bool _caseSensitive;
 };
 
 } // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 0db5640369..0156355b39 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -24,13 +24,14 @@
 #include "io/file_reader.h"
 #include "runtime/mem_pool.h"
 #include "runtime/tuple.h"
+#include "util/string_util.h"
 
 namespace doris {
 
 ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
                              int32_t num_of_columns_from_file, int64_t 
range_start_offset,
-                             int64_t range_size)
-        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
+                             int64_t range_size, bool caseSensitive)
+        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, 
caseSensitive),
           _range_start_offset(range_start_offset),
           _range_size(range_size) {
     _reader = nullptr;
@@ -66,8 +67,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
     }
     std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
     for (size_t i = 0; i < schema->num_fields(); ++i) {
+        std::string schemaName =
+                _caseSensitive ? schema->field(i)->name() : 
to_lower(schema->field(i)->name());
         // orc index started from 1.
-        _map_column.emplace(schema->field(i)->name(), i + 1);
+
+        _map_column.emplace(schemaName, i + 1);
     }
     RETURN_IF_ERROR(column_indices(tuple_slot_descs));
 
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index 1e6f0f83e6..392addfea9 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -33,7 +33,7 @@ namespace doris {
 class ORCReaderWrap final : public ArrowReaderWrap {
 public:
     ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
-                  int64_t range_start_offset, int64_t range_size);
+                  int64_t range_start_offset, int64_t range_size, bool 
caseSensitive = true);
     ~ORCReaderWrap() override = default;
 
     Status init_reader(const TupleDescriptor* tuple_desc,
diff --git a/be/src/exec/arrow/parquet_reader.cpp 
b/be/src/exec/arrow/parquet_reader.cpp
index 8d119146b4..03f6657586 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -32,14 +32,15 @@
 #include "runtime/mem_pool.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple.h"
+#include "util/string_util.h"
 
 namespace doris {
 
 // Broker
 ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t 
batch_size,
                                      int32_t num_of_columns_from_file, int64_t 
range_start_offset,
-                                     int64_t range_size)
-        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
+                                     int64_t range_size, bool caseSensitive)
+        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, 
caseSensitive),
           _rows_of_group(0),
           _current_line_of_group(0),
           _current_line_of_batch(0),
@@ -84,12 +85,14 @@ Status ParquetReaderWrap::init_reader(const 
TupleDescriptor* tuple_desc,
         // map
         auto* schemaDescriptor = _file_metadata->schema();
         for (int i = 0; i < _file_metadata->num_columns(); ++i) {
+            std::string schemaName;
             // Get the Column Reader for the boolean column
             if (schemaDescriptor->Column(i)->max_definition_level() > 1) {
-                
_map_column.emplace(schemaDescriptor->Column(i)->path()->ToDotVector()[0], i);
+                schemaName = 
schemaDescriptor->Column(i)->path()->ToDotVector()[0];
             } else {
-                _map_column.emplace(schemaDescriptor->Column(i)->name(), i);
+                schemaName = schemaDescriptor->Column(i)->name();
             }
+            _map_column.emplace(_caseSensitive ? schemaName : 
to_lower(schemaName), i);
         }
 
         _timezone = timezone;
diff --git a/be/src/exec/arrow/parquet_reader.h 
b/be/src/exec/arrow/parquet_reader.h
index 95774f60b0..d4805f8d84 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
     // batch_size is not use here
     ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
-                      int64_t range_start_offset, int64_t range_size);
+                      int64_t range_start_offset, int64_t range_size, bool 
caseSensitive = true);
     ~ParquetReaderWrap() override = default;
 
     // Read
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp 
b/be/src/vec/exec/file_arrow_scanner.cpp
index e6c4fa7597..d416fb735c 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -186,7 +186,11 @@ Status FileArrowScanner::_append_batch_to_block(Block* 
block) {
         if (slot_desc == nullptr) {
             continue;
         }
-        auto* array = _batch->GetColumnByName(slot_desc->col_name()).get();
+        int file_index = 
_cur_file_reader->get_cloumn_index(slot_desc->col_name());
+        if (file_index == -1) {
+            continue;
+        }
+        auto* array = _batch->column(file_index).get();
         auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
         RETURN_IF_ERROR(arrow_column_to_doris_column(
                 array, _arrow_batch_cur_idx, column_with_type_and_name.column,
@@ -228,7 +232,7 @@ ArrowReaderWrap* 
VFileParquetScanner::_new_arrow_reader(FileReader* file_reader,
                                                         int64_t 
range_start_offset,
                                                         int64_t range_size) {
     return new ParquetReaderWrap(file_reader, batch_size, 
num_of_columns_from_file,
-                                 range_start_offset, range_size);
+                                 range_start_offset, range_size, false);
 }
 
 void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@@ -252,7 +256,7 @@ ArrowReaderWrap* 
VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int
                                                     int64_t range_start_offset,
                                                     int64_t range_size) {
     return new ORCReaderWrap(file_reader, batch_size, 
num_of_columns_from_file, range_start_offset,
-                             range_size);
+                             range_size, false);
 }
 
 } // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index be7fa7c63f..0db7a16df0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -222,6 +222,14 @@ public class BrokerUtil {
 
     public static List<String> parseColumnsFromPath(String filePath, 
List<String> columnsFromPath)
             throws UserException {
+        return parseColumnsFromPath(filePath, columnsFromPath, true);
+    }
+
+    public static List<String> parseColumnsFromPath(
+            String filePath,
+            List<String> columnsFromPath,
+            boolean caseSensitive)
+            throws UserException {
         if (columnsFromPath == null || columnsFromPath.isEmpty()) {
             return Collections.emptyList();
         }
@@ -246,7 +254,8 @@ public class BrokerUtil {
                 throw new UserException("Fail to parse columnsFromPath, 
expected: "
                         + columnsFromPath + ", filePath: " + filePath);
             }
-            int index = columnsFromPath.indexOf(pair[0]);
+            String parsedColumnName = caseSensitive ? pair[0] : 
pair[0].toLowerCase();
+            int index = columnsFromPath.indexOf(parsedColumnName);
             if (index == -1) {
                 continue;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
index dab3a9bfcd..93ab58afe1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
@@ -274,7 +274,7 @@ public class HudiScanNode extends BrokerScanNode {
 
             TScanRangeLocations curLocations = newLocations(context.params, 
brokerDesc);
             List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
-                    getPartitionKeys());
+                    getPartitionKeys(), false);
             int numberOfColumnsFromFile = context.slotDescByName.size() - 
partitionValuesFromPath.size();
 
             TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, 
fileFormatType,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 1ae05ff630..2984352066 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -309,7 +309,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
             totalFileSize += split.getLength();
 
             List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
-                    partitionKeys);
+                    partitionKeys, false);
 
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to