This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d80b7b9689 [feature-wip](new-scan) support more load situation (#12953)
d80b7b9689 is described below

commit d80b7b9689da6da4042ed25333a02386c9238f9b
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Tue Sep 27 21:48:32 2022 +0800

    [feature-wip](new-scan) support more load situation (#12953)
---
 be/src/exec/arrow/arrow_reader.cpp                 |  15 +-
 be/src/exec/arrow/arrow_reader.h                   |   2 +
 be/src/exec/arrow/orc_reader.cpp                   |  25 +-
 be/src/exec/arrow/orc_reader.h                     |   5 +
 be/src/vec/CMakeLists.txt                          |   1 -
 be/src/vec/columns/column_const.h                  |   2 +-
 be/src/vec/exec/file_hdfs_scanner.cpp              |  98 -------
 be/src/vec/exec/file_hdfs_scanner.h                |  57 ----
 be/src/vec/exec/file_scan_node.cpp                 |  10 +-
 be/src/vec/exec/format/generic_reader.h            |   4 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  57 ++--
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  16 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             | 300 +++++++++++++++++----
 be/src/vec/exec/scan/vfile_scanner.h               |  61 +++--
 be/src/vec/exec/scan/vscan_node.h                  |   1 +
 be/src/vec/exec/scan/vscanner.h                    |   4 -
 be/src/vec/exprs/vexpr_context.cpp                 |   2 +-
 be/src/vec/exprs/vliteral.cpp                      |   3 +-
 be/src/vec/utils/arrow_column_to_doris_column.cpp  |  56 ++++
 be/src/vec/utils/arrow_column_to_doris_column.h    |   4 +-
 be/test/vec/exec/parquet/parquet_reader_test.cpp   | 120 +--------
 .../planner/external/ExternalFileScanNode.java     |  74 ++++-
 .../doris/planner/external/FileScanProviderIf.java |   3 +
 .../doris/planner/external/HiveScanProvider.java   |   6 +
 .../doris/planner/external/LoadScanProvider.java   |  18 +-
 gensrc/thrift/PlanNodes.thrift                     |  11 +-
 26 files changed, 535 insertions(+), 420 deletions(-)

diff --git a/be/src/exec/arrow/arrow_reader.cpp 
b/be/src/exec/arrow/arrow_reader.cpp
index d26efd32aa..72d4960a43 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -79,10 +79,7 @@ Status ArrowReaderWrap::column_indices() {
         if (iter != _map_column.end()) {
             _include_column_ids.emplace_back(iter->second);
         } else {
-            std::stringstream str_error;
-            str_error << "Invalid Column Name:" << slot_desc->col_name();
-            LOG(WARNING) << str_error.str();
-            return Status::InvalidArgument(str_error.str());
+            _missing_cols.push_back(slot_desc->col_name());
         }
     }
     return Status::OK();
@@ -103,10 +100,13 @@ int ArrowReaderWrap::get_column_index(std::string 
column_name) {
 
 Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
     size_t rows = 0;
+    bool tmp_eof = false;
     do {
         if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
-            RETURN_IF_ERROR(next_batch(&_batch, eof));
-            if (*eof) {
+            RETURN_IF_ERROR(next_batch(&_batch, &tmp_eof));
+            // We need to make sure the eof is set to true iff block is empty.
+            if (tmp_eof) {
+                *eof = (rows == 0);
                 return Status::OK();
             }
         }
@@ -128,7 +128,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* 
block, bool* eof) {
         }
         rows += num_elements;
         _arrow_batch_cur_idx += num_elements;
-    } while (!(*eof) && rows < _state->batch_size());
+    } while (!tmp_eof && rows < _state->batch_size());
     return Status::OK();
 }
 
@@ -138,7 +138,6 @@ Status 
ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, b
         if (_batch_eof) {
             _include_column_ids.clear();
             *eof = true;
-            _batch_eof = false;
             return Status::OK();
         }
         _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 35703e4bbd..2d83a1be01 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -137,6 +137,8 @@ protected:
     // The following fields are only valid when using "get_block()" interface.
     std::shared_ptr<arrow::RecordBatch> _batch;
     size_t _arrow_batch_cur_idx = 0;
+    // Save col names which need to be read but does not exist in file
+    std::vector<std::string> _missing_cols;
 };
 
 } // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 65a67909ba..8f46a9bf21 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -26,6 +26,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "util/string_util.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
 
 namespace doris {
 
@@ -67,12 +68,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
         LOG(WARNING) << "failed to read schema, errmsg=" << 
maybe_schema.status();
         return Status::InternalError("Failed to create orc file reader");
     }
-    std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
-    for (size_t i = 0; i < schema->num_fields(); ++i) {
+    _schema = maybe_schema.ValueOrDie();
+    for (size_t i = 0; i < _schema->num_fields(); ++i) {
         std::string schemaName =
-                _case_sensitive ? schema->field(i)->name() : 
to_lower(schema->field(i)->name());
+                _case_sensitive ? _schema->field(i)->name() : 
to_lower(_schema->field(i)->name());
         // orc index started from 1.
-
         _map_column.emplace(schemaName, i + 1);
     }
     RETURN_IF_ERROR(column_indices());
@@ -82,6 +82,23 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
     return Status::OK();
 }
 
+Status ORCReaderWrap::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                  std::unordered_set<std::string>* 
missing_cols) {
+    for (size_t i = 0; i < _schema->num_fields(); ++i) {
+        std::string schema_name =
+                _case_sensitive ? _schema->field(i)->name() : 
to_lower(_schema->field(i)->name());
+        TypeDescriptor type;
+        RETURN_IF_ERROR(
+                
vectorized::arrow_type_to_doris_type(_schema->field(i)->type()->id(), &type));
+        name_to_type->emplace(schema_name, type);
+    }
+
+    for (auto& col : _missing_cols) {
+        missing_cols->insert(col);
+    }
+    return Status::OK();
+}
+
 Status ORCReaderWrap::_seek_start_stripe() {
     // If file was from Hms table, _range_start_offset is started from 3(magic 
word).
     // And if file was from load, _range_start_offset is always set to zero.
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index a6455e8400..2d394ccf7d 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -27,6 +27,7 @@
 
 #include "common/status.h"
 #include "exec/arrow/arrow_reader.h"
+
 namespace doris {
 
 // Reader of ORC file
@@ -41,6 +42,9 @@ public:
                        const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
 
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
 private:
     Status _next_stripe_reader(bool* eof);
     Status _seek_start_stripe();
@@ -50,6 +54,7 @@ private:
 private:
     // orc file reader object
     std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader;
+    std::shared_ptr<arrow::Schema> _schema;
     bool _cur_file_eof; // is read over?
     int64_t _range_start_offset;
     int64_t _range_size;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 16eabe1e45..9632fa13d3 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -231,7 +231,6 @@ set(VEC_FILES
   exec/file_scanner.cpp
   exec/file_scan_node.cpp
   exec/file_text_scanner.cpp
-  exec/file_hdfs_scanner.cpp
   exec/format/parquet/vparquet_column_chunk_reader.cpp
   exec/format/parquet/vparquet_group_reader.cpp
   exec/format/parquet/vparquet_page_index.cpp
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index f001150bee..75561ae3e8 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -184,7 +184,7 @@ public:
         return false;
     }
 
-    //    bool is_nullable() const override { return 
is_column_nullable(*data); }
+    // bool is_nullable() const override { return is_column_nullable(*data); }
     bool only_null() const override { return data->is_null_at(0); }
     bool is_numeric() const override { return data->is_numeric(); }
     bool is_fixed_and_contiguous() const override { return 
data->is_fixed_and_contiguous(); }
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp 
b/be/src/vec/exec/file_hdfs_scanner.cpp
deleted file mode 100644
index ec891730c8..0000000000
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "file_hdfs_scanner.h"
-
-#include "io/file_factory.h"
-
-namespace doris::vectorized {
-
-ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state, 
RuntimeProfile* profile,
-                                               const TFileScanRangeParams& 
params,
-                                               const 
std::vector<TFileRangeDesc>& ranges,
-                                               const std::vector<TExpr>& 
pre_filter_texprs,
-                                               ScannerCounter* counter)
-        : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {}
-
-ParquetFileHdfsScanner::~ParquetFileHdfsScanner() {
-    ParquetFileHdfsScanner::close();
-}
-
-Status ParquetFileHdfsScanner::open() {
-    RETURN_IF_ERROR(FileScanner::open());
-    if (_ranges.empty()) {
-        return Status::OK();
-    }
-    RETURN_IF_ERROR(_get_next_reader());
-    return Status::OK();
-}
-
-void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {}
-
-Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
-    if (_scanner_eof) {
-        *eof = true;
-        return Status::OK();
-    }
-    RETURN_IF_ERROR(init_block(block));
-    bool range_eof = false;
-    RETURN_IF_ERROR(_reader->get_next_block(block, &range_eof));
-    if (block->rows() > 0) {
-        _fill_columns_from_path(block, block->rows());
-    }
-    if (range_eof) {
-        RETURN_IF_ERROR(_get_next_reader());
-        *eof = _scanner_eof;
-    }
-    return Status::OK();
-}
-
-Status ParquetFileHdfsScanner::_get_next_reader() {
-    if (_next_range >= _ranges.size()) {
-        _scanner_eof = true;
-        return Status::OK();
-    }
-    const TFileRangeDesc& range = _ranges[_next_range++];
-    std::unique_ptr<FileReader> file_reader;
-    RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), 
_profile, _params, range,
-                                                    file_reader));
-    auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-    if (tuple_desc->slots().empty()) {
-        return Status::EndOfFile("No Parquet column need load");
-    }
-    std::vector<std::string> column_names;
-    for (int i = 0; i < _file_slot_descs.size(); i++) {
-        column_names.push_back(_file_slot_descs[i]->col_name());
-    }
-    _reader.reset(new ParquetReader(_profile, _params, range, column_names,
-                                    _state->query_options().batch_size,
-                                    
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
-    Status status = _reader->init_reader(_conjunct_ctxs);
-    if (!status.ok()) {
-        if (status.is_end_of_file()) {
-            return _get_next_reader();
-        }
-        return status;
-    }
-    return Status::OK();
-}
-
-void ParquetFileHdfsScanner::close() {
-    FileScanner::close();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_hdfs_scanner.h 
b/be/src/vec/exec/file_hdfs_scanner.h
deleted file mode 100644
index b9883b88b5..0000000000
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "file_scanner.h"
-#include "vec/core/block.h"
-#include "vec/exec/format/parquet/vparquet_reader.h"
-
-namespace doris::vectorized {
-
-class HdfsFileScanner : public FileScanner {
-public:
-    HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile,
-                    const TFileScanRangeParams& params, const 
std::vector<TFileRangeDesc>& ranges,
-                    const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter)
-            : FileScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {};
-};
-
-class ParquetFileHdfsScanner : public HdfsFileScanner {
-public:
-    ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
-                           const TFileScanRangeParams& params,
-                           const std::vector<TFileRangeDesc>& ranges,
-                           const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter);
-    ~ParquetFileHdfsScanner();
-    Status open() override;
-
-    Status get_next(vectorized::Block* block, bool* eof) override;
-    void close() override;
-
-protected:
-    void _init_profiles(RuntimeProfile* profile) override;
-
-private:
-    Status _get_next_reader();
-
-private:
-    std::shared_ptr<ParquetReader> _reader;
-};
-
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_scan_node.cpp 
b/be/src/vec/exec/file_scan_node.cpp
index dc164f8927..3a3f9634e9 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -30,7 +30,6 @@
 #include "util/thread.h"
 #include "util/types.h"
 #include "vec/exec/file_arrow_scanner.h"
-#include "vec/exec/file_hdfs_scanner.h"
 #include "vec/exec/file_text_scanner.h"
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vexpr.h"
@@ -459,13 +458,8 @@ std::unique_ptr<FileScanner> 
FileScanNode::create_scanner(const TFileScanRange&
     FileScanner* scan = nullptr;
     switch (scan_range.params.format_type) {
     case TFileFormatType::FORMAT_PARQUET:
-        if (config::parquet_reader_using_internal) {
-            scan = new ParquetFileHdfsScanner(_runtime_state, 
runtime_profile(), scan_range.params,
-                                              scan_range.ranges, 
_pre_filter_texprs, counter);
-        } else {
-            scan = new VFileParquetScanner(_runtime_state, runtime_profile(), 
scan_range.params,
-                                           scan_range.ranges, 
_pre_filter_texprs, counter);
-        }
+        scan = new VFileParquetScanner(_runtime_state, runtime_profile(), 
scan_range.params,
+                                       scan_range.ranges, _pre_filter_texprs, 
counter);
         break;
     case TFileFormatType::FORMAT_ORC:
         scan = new VFileORCScanner(_runtime_state, runtime_profile(), 
scan_range.params,
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index a98d678fde..d838f4dac1 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -33,6 +33,10 @@ public:
         std::unordered_map<std::string, TypeDescriptor> map;
         return map;
     }
+    virtual Status get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                               std::unordered_set<std::string>* missing_cols) {
+        return Status::NotSupported("get_columns is not implemented");
+    }
     virtual ~GenericReader() {}
 };
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6d5718d181..5f595fec75 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -23,11 +23,12 @@
 #include "parquet_thrift_util.h"
 
 namespace doris::vectorized {
-ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
-                             const TFileRangeDesc& range,
+ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
+                             const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
                              const std::vector<std::string>& column_names, 
size_t batch_size,
                              cctz::time_zone* ctz)
         : _profile(profile),
+          _file_reader(file_reader),
           _scan_params(params),
           _scan_range(range),
           _batch_size(batch_size),
@@ -47,14 +48,15 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
 
 ParquetReader::~ParquetReader() {
     close();
-    if (_group_file_reader != _file_reader.get()) {
-        delete _group_file_reader;
-        _group_file_reader = nullptr;
-    }
 }
 
 void ParquetReader::close() {
     if (!_closed) {
+        if (_file_reader != nullptr) {
+            _file_reader->close();
+            delete _file_reader;
+        }
+
         if (_profile != nullptr) {
             COUNTER_UPDATE(_filtered_row_groups, 
_statistics.filtered_row_groups);
             COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups);
@@ -68,26 +70,8 @@ void ParquetReader::close() {
 }
 
 Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
-    if (_file_reader == nullptr) {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range,
-                                                        _file_reader, 2048));
-        // RowGroupReader has its own underlying buffer, so we should return 
file reader directly
-        // If RowGroupReaders use the same file reader with ParquetReader, the 
file position will change
-        // when ParquetReader try to read ColumnIndex meta, which causes 
performance cost
-        std::unique_ptr<FileReader> group_file_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range,
-                                                        group_file_reader, 0));
-        _group_file_reader = group_file_reader.release();
-        RETURN_IF_ERROR(_group_file_reader->open());
-    } else {
-        // test only
-        _group_file_reader = _file_reader.get();
-    }
-    RETURN_IF_ERROR(_file_reader->open());
-    if (_file_reader->size() == 0) {
-        return Status::EndOfFile("Empty Parquet File");
-    }
-    RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
+    CHECK(_file_reader != nullptr);
+    RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
     _t_metadata = &_file_metadata->to_thrift();
     _total_groups = _t_metadata->row_groups.size();
     if (_total_groups == 0) {
@@ -109,6 +93,8 @@ Status ParquetReader::_init_read_columns() {
         auto iter = _map_column.find(file_col_name);
         if (iter != _map_column.end()) {
             _include_column_ids.emplace_back(iter->second);
+        } else {
+            _missing_cols.push_back(file_col_name);
         }
     }
     // The same order as physical columns
@@ -133,6 +119,21 @@ std::unordered_map<std::string, TypeDescriptor> 
ParquetReader::get_name_to_type(
     return map;
 }
 
+Status ParquetReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                  std::unordered_set<std::string>* 
missing_cols) {
+    auto schema_desc = _file_metadata->schema();
+    std::unordered_set<std::string> column_names;
+    schema_desc.get_column_names(&column_names);
+    for (auto name : column_names) {
+        auto field = schema_desc.get_column(name);
+        name_to_type->emplace(name, field->type);
+    }
+    for (auto& col : _missing_cols) {
+        missing_cols->insert(col);
+    }
+    return Status::OK();
+}
+
 Status ParquetReader::get_next_block(Block* block, bool* eof) {
     int32_t num_of_readers = _row_group_readers.size();
     DCHECK(num_of_readers <= _read_row_groups.size());
@@ -166,8 +167,8 @@ Status ParquetReader::_init_row_group_readers(const 
std::vector<ExprContext*>& c
     for (auto row_group_id : _read_row_groups) {
         auto& row_group = _t_metadata->row_groups[row_group_id];
         std::shared_ptr<RowGroupReader> row_group_reader;
-        row_group_reader.reset(new RowGroupReader(_group_file_reader, 
_read_columns, row_group_id,
-                                                  row_group, _ctz));
+        row_group_reader.reset(
+                new RowGroupReader(_file_reader, _read_columns, row_group_id, 
row_group, _ctz));
         std::vector<RowRange> candidate_row_ranges;
         RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
         if (candidate_row_ranges.empty()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c91bc08059..73848ccd48 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -70,13 +70,14 @@ private:
 
 class ParquetReader : public GenericReader {
 public:
-    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, const std::vector<std::string>& 
column_names,
-                  size_t batch_size, cctz::time_zone* ctz);
+    ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
+                  const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                  const std::vector<std::string>& column_names, size_t 
batch_size,
+                  cctz::time_zone* ctz);
 
     virtual ~ParquetReader();
     // for test
-    void set_file_reader(FileReader* file_reader) { 
_file_reader.reset(file_reader); }
+    void set_file_reader(FileReader* file_reader) { _file_reader = 
file_reader; }
 
     Status init_reader(std::vector<ExprContext*>& conjunct_ctxs);
 
@@ -87,6 +88,8 @@ public:
     int64_t size() const { return _file_reader->size(); }
 
     std::unordered_map<std::string, TypeDescriptor> get_name_to_type() 
override;
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
 
     ParquetStatistics& statistics() { return _statistics; }
 
@@ -120,10 +123,10 @@ private:
 
 private:
     RuntimeProfile* _profile;
+    // file reader is passed from file scanner, and owned by this parquet 
reader.
+    FileReader* _file_reader = nullptr;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
-    std::unique_ptr<FileReader> _file_reader = nullptr;
-    FileReader* _group_file_reader = nullptr;
 
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
@@ -144,6 +147,7 @@ private:
     std::unordered_map<int, tparquet::OffsetIndex> _col_offsets;
     const std::vector<std::string> _column_names;
 
+    std::vector<std::string> _missing_cols;
     ParquetStatistics _statistics;
     bool _closed = false;
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 985676eb48..ffc44775e5 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -18,11 +18,13 @@
 #include "vec/exec/scan/vfile_scanner.h"
 
 #include <fmt/format.h>
+#include <thrift/protocol/TDebugProtocol.h>
 
 #include <vec/data_types/data_type_factory.hpp>
 
 #include "common/logging.h"
 #include "common/utils.h"
+#include "exec/arrow/orc_reader.h"
 #include "exec/text_converter.hpp"
 #include "exprs/expr_context.h"
 #include "runtime/descriptors.h"
@@ -49,6 +51,17 @@ VFileScanner::VFileScanner(RuntimeState* state, 
NewFileScanNode* parent, int64_t
 Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
 
+    _get_block_timer = ADD_TIMER(_parent->_scanner_profile, 
"FileScannerGetBlockTime");
+    _cast_to_input_block_timer =
+            ADD_TIMER(_parent->_scanner_profile, 
"FileScannerCastInputBlockTime");
+    _fill_path_columns_timer =
+            ADD_TIMER(_parent->_scanner_profile, 
"FileScannerFillPathColumnTime");
+    _fill_missing_columns_timer =
+            ADD_TIMER(_parent->_scanner_profile, 
"FileScannerFillMissingColumnTime");
+    _pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, 
"FileScannerPreFilterTimer");
+    _convert_to_output_block_timer =
+            ADD_TIMER(_parent->_scanner_profile, 
"FileScannerConvertOuputBlockTime");
+
     if (vconjunct_ctx_ptr != nullptr) {
         // Copy vconjunct_ctx_ptr from scan node to this scanner's 
_vconjunct_ctx.
         RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
@@ -64,12 +77,15 @@ Status VFileScanner::prepare(VExprContext** 
vconjunct_ctx_ptr) {
             _pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
             RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
                     _state->obj_pool(), _params.pre_filter_exprs, 
_pre_conjunct_ctx_ptr.get()));
-
             RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, 
*_src_row_desc));
             RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
         }
     }
 
+    _default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
+                                                  
std::vector<TupleId>({_real_tuple_desc->id()}),
+                                                  std::vector<bool>({false})));
+
     return Status::OK();
 }
 
@@ -79,6 +95,25 @@ Status VFileScanner::open(RuntimeState* state) {
     return Status::OK();
 }
 
+// For query:
+//                              [exist cols]  [non-exist cols]  [col from 
path]  input  ouput
+//                              A     B    C  D                 E
+// _init_src_block              x     x    x  x                 x              
  -      x
+// get_next_block               x     x    x  -                 -              
  -      x
+// _cast_to_input_block         -     -    -  -                 -              
  -      -
+// _fill_columns_from_path      -     -    -  -                 x              
  -      x
+// _fill_missing_columns        -     -    -  x                 -              
  -      x
+// _convert_to_output_block     -     -    -  -                 -              
  -      -
+//
+// For load:
+//                              [exist cols]  [non-exist cols]  [col from 
path]  input  ouput
+//                              A     B    C  D                 E
+// _init_src_block              x     x    x  x                 x              
  x      -
+// get_next_block               x     x    x  -                 -              
  x      -
+// _cast_to_input_block         x     x    x  -                 -              
  x      -
+// _fill_columns_from_path      -     -    -  -                 x              
  x      -
+// _fill_missing_columns        -     -    -  x                 -              
  x      -
+// _convert_to_output_block     -     -    -  -                 -              
  -      x
 Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* 
eof) {
     do {
         if (_cur_reader == nullptr || _cur_reader_eof) {
@@ -93,14 +128,20 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
         // Init src block for load job based on the data file schema (e.g. 
parquet)
         // For query job, simply set _src_block_ptr to block.
         RETURN_IF_ERROR(_init_src_block(block));
-        // Read next block.
-        RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, 
&_cur_reader_eof));
+        {
+            SCOPED_TIMER(_get_block_timer);
+            // Read next block.
+            // Some of column in block may not be filled (column not exist in 
file)
+            RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, 
&_cur_reader_eof));
+        }
 
         if (_src_block_ptr->rows() > 0) {
-            // Convert the src block columns type to string in place.
+            // Convert the src block columns type to string in-place.
             RETURN_IF_ERROR(_cast_to_input_block(block));
             // Fill rows in src block with partition columns from path. (e.g. 
Hive partition columns)
             RETURN_IF_ERROR(_fill_columns_from_path());
+            // Fill columns not exist in file with null or default value
+            RETURN_IF_ERROR(_fill_missing_columns());
             // Apply _pre_conjunct_ctx_ptr to filter src block.
             RETURN_IF_ERROR(_pre_filter_src_block());
             // Convert src block to output block (dest block), string to dest 
data type and apply filters.
@@ -125,13 +166,29 @@ Status VFileScanner::_init_src_block(Block* block) {
         return Status::OK();
     }
 
-    _src_block.clear();
+    // if (_src_block_init) {
+    //     _src_block.clear_column_data();
+    //     _src_block_ptr = &_src_block;
+    //     return Status::OK();
+    // }
 
-    std::unordered_map<std::string, TypeDescriptor> name_to_type = 
_cur_reader->get_name_to_type();
+    _src_block.clear();
     size_t idx = 0;
+    // slots in _input_tuple_desc contains all slots describe in load 
statement, eg:
+    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
+    // _input_tuple_desc will contains: k1, k2, tmp1
+    // and some of them are from file, such as k1 and k2, and some of them may 
not exist in file, such as tmp1
+    // _input_tuple_desc also contains columns from path
     for (auto& slot : _input_tuple_desc->slots()) {
-        DataTypePtr data_type =
-                
DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()], 
true);
+        DataTypePtr data_type;
+        auto it = _name_to_col_type.find(slot->col_name());
+        if (it == _name_to_col_type.end()) {
+            // not exist in file, using type from _input_tuple_desc
+            data_type =
+                    DataTypeFactory::instance().create_data_type(slot->type(), 
slot->is_nullable());
+        } else {
+            data_type = 
DataTypeFactory::instance().create_data_type(it->second, true);
+        }
         if (data_type == nullptr) {
             return Status::NotSupported(fmt::format("Not support arrow 
type:{}", slot->col_name()));
         }
@@ -141,18 +198,20 @@ Status VFileScanner::_init_src_block(Block* block) {
         _src_block_name_to_idx.emplace(slot->col_name(), idx++);
     }
     _src_block_ptr = &_src_block;
+    _src_block_init = true;
     return Status::OK();
 }
 
 Status VFileScanner::_cast_to_input_block(Block* block) {
-    if (_src_block_ptr == block) {
+    if (!_is_load) {
         return Status::OK();
     }
+    SCOPED_TIMER(_cast_to_input_block_timer);
     // cast primitive type(PT0) to primitive type(PT1)
     size_t idx = 0;
-    for (size_t i = 0; i < _file_slot_descs.size(); ++i) {
-        SlotDescriptor* slot_desc = _file_slot_descs[i];
-        if (slot_desc == nullptr) {
+    for (auto& slot_desc : _input_tuple_desc->slots()) {
+        if (_name_to_col_type.find(slot_desc->col_name()) == 
_name_to_col_type.end()) {
+            // skip columns which does not exist in file
             continue;
         }
         auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
@@ -177,6 +236,7 @@ Status VFileScanner::_fill_columns_from_path() {
     size_t rows = _src_block_ptr->rows();
     const TFileRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
+        SCOPED_TIMER(_fill_path_columns_timer);
         for (const auto& slot_desc : _partition_slot_descs) {
             if (slot_desc == nullptr) continue;
             auto it = _partition_slot_index_map.find(slot_desc->id());
@@ -200,11 +260,82 @@ Status VFileScanner::_fill_columns_from_path() {
     return Status::OK();
 }
 
+Status VFileScanner::_fill_missing_columns() {
+    if (_missing_cols.empty()) {
+        return Status::OK();
+    }
+
+    SCOPED_TIMER(_fill_missing_columns_timer);
+    int rows = _src_block_ptr->rows();
+    for (auto slot_desc : _real_tuple_desc->slots()) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
+            continue;
+        }
+
+        auto it = _col_default_value_ctx.find(slot_desc->col_name());
+        if (it == _col_default_value_ctx.end()) {
+            return Status::InternalError("failed to find default value expr 
for slot: {}",
+                                         slot_desc->col_name());
+        }
+        if (it->second == nullptr) {
+            // no default column, fill with null
+            auto nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(
+                    
(*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column))
+                            .mutate()
+                            .get());
+            nullable_column->insert_many_defaults(rows);
+        } else {
+            // fill with default value
+            auto* ctx = it->second;
+            auto origin_column_num = _src_block_ptr->columns();
+            int result_column_id = -1;
+            // PT1 => dest primitive type
+            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
+            bool is_origin_column = result_column_id < origin_column_num;
+            if (!is_origin_column) {
+                auto result_column_ptr = 
_src_block_ptr->get_by_position(result_column_id).column;
+                // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
+                result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
+                auto origin_column_type = 
_src_block_ptr->get_by_name(slot_desc->col_name()).type;
+                bool is_nullable = origin_column_type->is_nullable();
+                _src_block_ptr->replace_by_position(
+                        
_src_block_ptr->get_position_by_name(slot_desc->col_name()),
+                        is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
+                _src_block_ptr->erase(result_column_id);
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status VFileScanner::_pre_filter_src_block() {
+    if (!_is_load) {
+        return Status::OK();
+    }
+    if (_pre_conjunct_ctx_ptr) {
+        SCOPED_TIMER(_pre_filter_timer);
+        auto origin_column_num = _src_block_ptr->columns();
+        auto old_rows = _src_block_ptr->rows();
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
+                                                               _src_block_ptr, 
origin_column_num));
+        _counter.num_rows_unselected += old_rows - _src_block.rows();
+    }
+    return Status::OK();
+}
+
 Status VFileScanner::_convert_to_output_block(Block* block) {
-    if (_src_block_ptr == block) {
+    if (!_is_load) {
         return Status::OK();
     }
 
+    SCOPED_TIMER(_convert_to_output_block_timer);
+    // The block is passed from scanner context's free blocks,
+    // which is initialized by src columns.
+    // But for load job, the block should be filled with dest columns.
+    // So need to clear it first.
     block->clear();
 
     int ctx_idx = 0;
@@ -217,7 +348,6 @@ Status VFileScanner::_convert_to_output_block(Block* block) 
{
         if (!slot_desc->is_materialized()) {
             continue;
         }
-
         int dest_index = ctx_idx++;
 
         auto* ctx = _dest_vexpr_ctx[dest_index];
@@ -229,13 +359,15 @@ Status VFileScanner::_convert_to_output_block(Block* 
block) {
                 is_origin_column && _src_block_mem_reuse
                         ? 
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
                         : _src_block.get_by_position(result_column_id).column;
+        // column_ptr maybe a ColumnConst, convert it to a normal column
+        column_ptr = column_ptr->convert_to_full_column_if_const();
 
         DCHECK(column_ptr != nullptr);
 
         // because of src_slot_desc is always be nullable, so the column_ptr 
after do dest_expr
         // is likely to be nullable
         if (LIKELY(column_ptr->is_nullable())) {
-            auto nullable_column =
+            const ColumnNullable* nullable_column =
                     reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
             for (int i = 0; i < rows; ++i) {
                 if (filter_map[i] && nullable_column->is_null_at(i)) {
@@ -280,10 +412,10 @@ Status VFileScanner::_convert_to_output_block(Block* 
block) {
                 }
             }
             if (!slot_desc->is_nullable()) {
-                column_ptr = nullable_column->get_nested_column_ptr();
+                column_ptr = remove_nullable(column_ptr);
             }
         } else if (slot_desc->is_nullable()) {
-            column_ptr = vectorized::make_nullable(column_ptr);
+            column_ptr = make_nullable(column_ptr);
         }
         block->insert(dest_index, 
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
                                                                     
slot_desc->get_data_type_ptr(),
@@ -308,52 +440,61 @@ Status VFileScanner::_convert_to_output_block(Block* 
block) {
     return Status::OK();
 }
 
-Status VFileScanner::_pre_filter_src_block() {
-    if (_pre_conjunct_ctx_ptr) {
-        auto origin_column_num = _src_block_ptr->columns();
-        auto old_rows = _src_block_ptr->rows();
-        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
-                                                               _src_block_ptr, 
origin_column_num));
-        _counter.num_rows_unselected += old_rows - _src_block.rows();
-    }
-    return Status::OK();
-}
-
 Status VFileScanner::_get_next_reader() {
-    if (_cur_reader != nullptr) {
-        delete _cur_reader;
-        _cur_reader = nullptr;
-    }
     while (true) {
+        _cur_reader.reset(nullptr);
+        _src_block_init = false;
         if (_next_range >= _ranges.size()) {
             _scanner_eof = true;
             return Status::OK();
         }
         const TFileRangeDesc& range = _ranges[_next_range++];
-        std::vector<std::string> column_names;
+
+        // 1. create file reader
+        std::unique_ptr<FileReader> file_reader;
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), 
_profile, _params,
+                                                        range, file_reader));
+        RETURN_IF_ERROR(file_reader->open());
+        if (file_reader->size() == 0) {
+            file_reader->close();
+            continue;
+        }
+
+        // 2. create reader for specific format
+        // TODO: add csv, json, avro
+        Status init_status;
         switch (_params.format_type) {
         case TFileFormatType::FORMAT_PARQUET: {
-            for (int i = 0; i < _file_slot_descs.size(); i++) {
-                column_names.push_back(_file_slot_descs[i]->col_name());
-            }
-            _cur_reader = new ParquetReader(_profile, _params, range, 
column_names,
-                                            _state->query_options().batch_size,
-                                            
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
-            Status status = 
((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs);
-            if (status.ok()) {
-                _cur_reader_eof = false;
-                return status;
-            } else if (status.is_end_of_file()) {
-                continue;
-            } else {
-                return status;
-            }
+            _cur_reader.reset(
+                    new ParquetReader(_profile, file_reader.release(), 
_params, range,
+                                      _file_col_names, 
_state->query_options().batch_size,
+                                      
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+            init_status = 
((ParquetReader*)(_cur_reader.get()))->init_reader(_conjunct_ctxs);
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            _cur_reader.reset(new ORCReaderWrap(_state, _file_slot_descs, 
file_reader.release(),
+                                                _num_of_columns_from_file, 
range.start_offset,
+                                                range.size, false));
+            init_status =
+                    ((ORCReaderWrap*)(_cur_reader.get()))
+                            ->init_reader(_real_tuple_desc, _conjunct_ctxs, 
_state->timezone());
+            break;
         }
         default:
-            std::stringstream error_msg;
-            error_msg << "Not supported file format " << _params.format_type;
-            return Status::InternalError(error_msg.str());
+            return Status::InternalError("Not supported file format: {}", 
_params.format_type);
         }
+
+        if (init_status.is_end_of_file()) {
+            continue;
+        } else if (!init_status.ok()) {
+            return Status::InternalError("failed to init reader for file {}, 
err: {}", range.path,
+                                         init_status.get_error_msg());
+        }
+
+        _cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
+        _cur_reader_eof = false;
+        break;
     }
     return Status::OK();
 }
@@ -382,6 +523,8 @@ Status VFileScanner::_init_expr_ctxes() {
             _file_slot_descs.emplace_back(it->second);
             auto iti = full_src_index_map.find(slot_id);
             _file_slot_index_map.emplace(slot_id, iti->second);
+            _file_slot_name_map.emplace(it->second->col_name(), iti->second);
+            _file_col_names.push_back(it->second->col_name());
         } else {
             _partition_slot_descs.emplace_back(it->second);
             auto iti = full_src_index_map.find(slot_id);
@@ -390,7 +533,9 @@ Status VFileScanner::_init_expr_ctxes() {
     }
 
     if (_is_load) {
+        // follow desc expr map and src default value expr map is only for 
load task.
         bool has_slot_id_map = 
_params.__isset.dest_sid_to_src_sid_without_trans;
+        int idx = 0;
         for (auto slot_desc : _output_tuple_desc->slots()) {
             if (!slot_desc->is_materialized()) {
                 continue;
@@ -402,11 +547,15 @@ Status VFileScanner::_init_expr_ctxes() {
             }
 
             vectorized::VExprContext* ctx = nullptr;
-            RETURN_IF_ERROR(
-                    vectorized::VExpr::create_expr_tree(_state->obj_pool(), 
it->second, &ctx));
-            RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
-            RETURN_IF_ERROR(ctx->open(_state));
+            if (!it->second.nodes.empty()) {
+                RETURN_IF_ERROR(
+                        
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
+                RETURN_IF_ERROR(ctx->open(_state));
+            }
             _dest_vexpr_ctx.emplace_back(ctx);
+            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
+
             if (has_slot_id_map) {
                 auto it1 = 
_params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
                 if (it1 == 
std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -423,8 +572,49 @@ Status VFileScanner::_init_expr_ctxes() {
                 }
             }
         }
+
+        for (auto slot_desc : _real_tuple_desc->slots()) {
+            if (!slot_desc->is_materialized()) {
+                continue;
+            }
+            vectorized::VExprContext* ctx = nullptr;
+            auto it = _params.default_value_of_src_slot.find(slot_desc->id());
+            // if does not exist or is empty, the default value will be null
+            if (it != std::end(_params.default_value_of_src_slot) && 
!it->second.nodes.empty()) {
+                RETURN_IF_ERROR(
+                        
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
+                RETURN_IF_ERROR(ctx->open(_state));
+            }
+            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
+        }
     }
     return Status::OK();
 }
 
+Status VFileScanner::close(RuntimeState* state) {
+    if (_is_closed) {
+        return Status::OK();
+    }
+
+    for (auto ctx : _dest_vexpr_ctx) {
+        if (ctx != nullptr) {
+            ctx->close(state);
+        }
+    }
+
+    for (auto it : _col_default_value_ctx) {
+        if (it.second != nullptr) {
+            it.second->close(state);
+        }
+    }
+
+    if (_pre_conjunct_ctx_ptr) {
+        (*_pre_conjunct_ctx_ptr)->close(state);
+    }
+
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 4bad47795d..6608a8bfd0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -46,6 +46,8 @@ public:
 
     Status open(RuntimeState* state) override;
 
+    Status close(RuntimeState* state) override;
+
 public:
     Status prepare(VExprContext** vconjunct_ctx_ptr);
 
@@ -65,24 +67,52 @@ protected:
     const std::vector<TFileRangeDesc>& _ranges;
     int _next_range;
 
-    GenericReader* _cur_reader;
+    std::unique_ptr<GenericReader> _cur_reader;
     bool _cur_reader_eof;
 
     // File source slot descriptors
     std::vector<SlotDescriptor*> _file_slot_descs;
-    // File slot id to index map.
+    // File slot id to index in _file_slot_descs
     std::map<SlotId, int> _file_slot_index_map;
+    // file col name to index in _file_slot_descs
+    std::map<std::string, int> _file_slot_name_map;
+    // col names from _file_slot_descs
+    std::vector<std::string> _file_col_names;
     // Partition source slot descriptors
     std::vector<SlotDescriptor*> _partition_slot_descs;
-    // Partition slot id to index map
+    // Partition slot id to index in _partition_slot_descs
     std::map<SlotId, int> _partition_slot_index_map;
+    // created from param.expr_of_dest_slot
+    // For query, it saves default value expr of all dest columns, or nullptr 
for NULL.
+    // For load, it saves convertion expr/default value of all dest columns.
+    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+    // dest slot name to index in _dest_vexpr_ctx;
+    std::unordered_map<std::string, int> _dest_slot_name_to_idx;
+    // col name to default value expr
+    std::unordered_map<std::string, vectorized::VExprContext*> 
_col_default_value_ctx;
+    // the map values of dest slot id to src slot desc
+    // if there is not key of dest slot id in 
dest_sid_to_src_sid_without_trans, it will be set to nullptr
+    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
+    // dest slot desc index to src slot desc index
+    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
+
+    std::unordered_map<std::string, size_t> _src_block_name_to_idx;
+
+    // Get from GenericReader, save the existing columns in file to their type.
+    std::unordered_map<std::string, TypeDescriptor> _name_to_col_type;
+    // Get from GenericReader, save columns that requried by scan but not 
exist in file.
+    // These columns will be filled by default value or null.
+    std::unordered_set<std::string> _missing_cols;
+
+    // For load task
+    std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
+    std::unique_ptr<RowDescriptor> _src_row_desc;
+    // row desc for default exprs
+    std::unique_ptr<RowDescriptor> _default_val_row_desc;
 
     // Mem pool used to allocate _src_tuple and _src_tuple_row
     std::unique_ptr<MemPool> _mem_pool;
 
-    // Dest tuple descriptor and dest expr context
-    const TupleDescriptor* _dest_tuple_desc;
-
     // Profile
     RuntimeProfile* _profile;
     ScannerCounter _counter;
@@ -91,22 +121,20 @@ protected:
     int _rows = 0;
     int _num_of_columns_from_file;
 
-    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
-
-    // the map values of dest slot id to src slot desc
-    // if there is not key of dest slot id in 
dest_sid_to_src_sid_without_trans, it will be set to nullptr
-    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
-
     bool _src_block_mem_reuse = false;
     bool _strict_mode;
 
+    bool _src_block_init = false;
     Block* _src_block_ptr;
     Block _src_block;
 
-    // dest slot desc index to src slot desc index
-    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
-
-    std::unordered_map<std::string, size_t> _src_block_name_to_idx;
+private:
+    RuntimeProfile::Counter* _get_block_timer = nullptr;
+    RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
+    RuntimeProfile::Counter* _fill_path_columns_timer = nullptr;
+    RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
+    RuntimeProfile::Counter* _pre_filter_timer = nullptr;
+    RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
 
 private:
     Status _init_expr_ctxes();
@@ -114,6 +142,7 @@ private:
     Status _cast_to_input_block(Block* block);
     Status _pre_filter_src_block();
     Status _convert_to_output_block(Block* block);
+    Status _fill_missing_columns();
     void _reset_counter() {
         _counter.num_rows_unselected = 0;
         _counter.num_rows_filtered = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 6117d487bc..fbcf248a3c 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -37,6 +37,7 @@ public:
             : ExecNode(pool, tnode, descs), 
_runtime_filter_descs(tnode.runtime_filters) {}
     friend class VScanner;
     friend class NewOlapScanner;
+    friend class VFileScanner;
     friend class ScannerContext;
 
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index aff7a3a4ed..07313428df 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -137,10 +137,6 @@ protected:
     // and will be destroyed at the end.
     std::vector<VExprContext*> _stale_vexpr_ctxs;
 
-    // For load scanner
-    std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
-    std::unique_ptr<RowDescriptor> _src_row_desc;
-
     // num of rows read from scanner
     int64_t _num_rows_read = 0;
 
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index 019a7d802c..ccb1045cb1 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -32,7 +32,7 @@ VExprContext::VExprContext(VExpr* expr)
           _stale(false) {}
 
 VExprContext::~VExprContext() {
-    DCHECK(!_prepared || _closed);
+    DCHECK(!_prepared || _closed) << get_stack_trace();
 
     for (int i = 0; i < _fn_contexts.size(); ++i) {
         delete _fn_contexts[i];
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index d87278b347..d99cdbdd97 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -194,7 +194,8 @@ Status VLiteral::execute(VExprContext* context, 
vectorized::Block* block, int* r
 
 std::string VLiteral::debug_string() const {
     std::stringstream out;
-    out << "VLiteral (type = " << _data_type->get_name();
+    out << "VLiteral (name = " << _expr_name;
+    out << ", type = " << _data_type->get_name();
     out << ", value = ";
     if (_column_ptr.get()->size() > 0) {
         StringRef ref = _column_ptr.get()->get_data_at(0);
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp 
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index eacd1136b5..e71bf970a0 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -408,4 +408,60 @@ Status arrow_column_to_doris_column(const arrow::Array* 
arrow_column, size_t arr
     return Status::NotSupported(
             fmt::format("Not support arrow type:{}", 
arrow_column->type()->name()));
 }
+
+Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor* 
return_type) {
+    switch (type) {
+    case arrow::Type::STRING:
+    case arrow::Type::BINARY:
+    case arrow::Type::FIXED_SIZE_BINARY:
+        return_type->type = TYPE_STRING;
+        break;
+    case arrow::Type::INT8:
+        return_type->type = TYPE_TINYINT;
+        break;
+    case arrow::Type::UINT8:
+    case arrow::Type::INT16:
+        return_type->type = TYPE_SMALLINT;
+        break;
+    case arrow::Type::UINT16:
+    case arrow::Type::INT32:
+        return_type->type = TYPE_INT;
+        break;
+    case arrow::Type::UINT32:
+    case arrow::Type::INT64:
+        return_type->type = TYPE_BIGINT;
+        break;
+    case arrow::Type::UINT64:
+        return_type->type = TYPE_LARGEINT;
+        break;
+    case arrow::Type::HALF_FLOAT:
+    case arrow::Type::FLOAT:
+        return_type->type = TYPE_FLOAT;
+        break;
+    case arrow::Type::DOUBLE:
+        return_type->type = TYPE_DOUBLE;
+        break;
+    case arrow::Type::BOOL:
+        return_type->type = TYPE_BOOLEAN;
+        break;
+    case arrow::Type::DATE32:
+        return_type->type = TYPE_DATEV2;
+        break;
+    case arrow::Type::DATE64:
+        return_type->type = TYPE_DATETIMEV2;
+        break;
+    case arrow::Type::TIMESTAMP:
+        return_type->type = TYPE_BIGINT;
+        break;
+    case arrow::Type::DECIMAL:
+        return_type->type = TYPE_DECIMALV2;
+        return_type->precision = 27;
+        return_type->scale = 9;
+        break;
+    default:
+        return Status::InternalError("unsupport type: {}", type);
+    }
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h 
b/be/src/vec/utils/arrow_column_to_doris_column.h
index 9d5f077672..13edffadae 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.h
+++ b/be/src/vec/utils/arrow_column_to_doris_column.h
@@ -24,7 +24,7 @@
 #include <memory>
 
 #include "common/status.h"
-#include "runtime/primitive_type.h"
+#include "runtime/types.h"
 #include "vec/core/column_with_type_and_name.h"
 
 // This files contains some utilities to convert Doris internal
@@ -42,4 +42,6 @@ Status arrow_column_to_doris_column(const arrow::Array* 
arrow_column, size_t arr
                                     ColumnPtr& doris_column, const 
DataTypePtr& type,
                                     size_t num_elements, const 
cctz::time_zone& ctz);
 
+Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor* 
return_type);
+
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 23bf6b353f..e8d3339b43 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -22,7 +22,6 @@
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type_factory.hpp"
-#include "vec/exec/file_hdfs_scanner.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 
 namespace doris {
@@ -92,6 +91,7 @@ TEST_F(ParquetReaderTest, normal) {
     auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
     LocalFileReader* reader =
             new 
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
 0);
+    reader->open();
 
     cctz::time_zone ctz;
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
@@ -106,8 +106,8 @@ TEST_F(ParquetReaderTest, normal) {
         scan_range.start_offset = 0;
         scan_range.size = 1000;
     }
-    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 
column_names, 992, &ctz);
-    p_reader->set_file_reader(reader);
+    auto p_reader =
+            new ParquetReader(nullptr, reader, scan_params, scan_range, 
column_names, 992, &ctz);
     RuntimeState runtime_state((TQueryGlobals()));
     runtime_state.set_desc_tbl(desc_tbl);
     runtime_state.init_instance_mem_tracker();
@@ -132,119 +132,5 @@ TEST_F(ParquetReaderTest, normal) {
     delete p_reader;
 }
 
-TEST_F(ParquetReaderTest, scanner) {
-    TDescriptorTable t_desc_table;
-    TTableDescriptor t_table_desc;
-
-    t_table_desc.id = 0;
-    t_table_desc.tableType = TTableType::OLAP_TABLE;
-    t_table_desc.numCols = 7;
-    t_table_desc.numClusteringCols = 0;
-    t_desc_table.tableDescriptors.push_back(t_table_desc);
-    t_desc_table.__isset.tableDescriptors = true;
-
-    // init boolean and numeric slot
-    std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", 
"smallint_col",
-                                              "int_col",     "bigint_col",  
"float_col",
-                                              "double_col"};
-    for (int i = 0; i < numeric_types.size(); i++) {
-        TSlotDescriptor tslot_desc;
-        {
-            tslot_desc.id = i;
-            tslot_desc.parent = 0;
-            TTypeDesc type;
-            {
-                TTypeNode node;
-                node.__set_type(TTypeNodeType::SCALAR);
-                TScalarType scalar_type;
-                scalar_type.__set_type(TPrimitiveType::type(i + 2));
-                node.__set_scalar_type(scalar_type);
-                type.types.push_back(node);
-            }
-            tslot_desc.slotType = type;
-            tslot_desc.columnPos = 0;
-            tslot_desc.byteOffset = 0;
-            tslot_desc.nullIndicatorByte = 1;
-            tslot_desc.nullIndicatorBit = 1;
-            tslot_desc.colName = numeric_types[i];
-            tslot_desc.slotIdx = 0;
-            tslot_desc.isMaterialized = true;
-            t_desc_table.slotDescriptors.push_back(tslot_desc);
-        }
-    }
-
-    t_desc_table.__isset.slotDescriptors = true;
-    {
-        TTupleDescriptor t_tuple_desc;
-        t_tuple_desc.id = 0;
-        t_tuple_desc.byteSize = 16;
-        t_tuple_desc.numNullBytes = 0;
-        t_tuple_desc.tableId = 0;
-        t_tuple_desc.__isset.tableId = true;
-        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
-    }
-
-    // set scan range
-    //    std::vector<TScanRangeParams> scan_ranges;
-    TFileScanRange file_scan_range;
-    {
-        //        TScanRangeParams scan_range_params;
-        //        TFileScanRange file_scan_range;
-        TFileScanRangeParams params;
-        {
-            params.__set_src_tuple_id(0);
-            params.__set_num_of_columns_from_file(7);
-            params.file_type = TFileType::FILE_LOCAL;
-            params.format_type = TFileFormatType::FORMAT_PARQUET;
-            std::vector<TFileScanSlotInfo> file_slots;
-            for (int i = 0; i < numeric_types.size(); i++) {
-                TFileScanSlotInfo slot_info;
-                slot_info.slot_id = i;
-                slot_info.is_file_slot = true;
-                file_slots.emplace_back(slot_info);
-            }
-            params.__set_required_slots(file_slots);
-        }
-        file_scan_range.params = params;
-        TFileRangeDesc range;
-        {
-            range.start_offset = 0;
-            range.size = 1000;
-            range.path = 
"./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
-            std::vector<std::string> columns_from_path {"value"};
-            range.__set_columns_from_path(columns_from_path);
-        }
-        file_scan_range.ranges.push_back(range);
-        //        
scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
-        //        scan_ranges.push_back(scan_range_params);
-    }
-
-    std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
-    RuntimeState runtime_state((TQueryGlobals()));
-    runtime_state.init_instance_mem_tracker();
-
-    DescriptorTbl* desc_tbl;
-    ObjectPool obj_pool;
-    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
-    runtime_state.set_desc_tbl(desc_tbl);
-    ScannerCounter counter;
-    std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
-    auto scan = new ParquetFileHdfsScanner(&runtime_state, 
runtime_state.runtime_profile(),
-                                           file_scan_range.params, 
file_scan_range.ranges,
-                                           pre_filter_texprs, &counter);
-    scan->reg_conjunct_ctxs(0, conjunct_ctxs);
-    Status st = scan->open();
-    EXPECT_TRUE(st.ok());
-
-    bool eof = false;
-    Block* block = new Block();
-    scan->get_next(block, &eof);
-    for (auto& col : block->get_columns_with_type_and_name()) {
-        ASSERT_EQ(col.column->size(), 10);
-    }
-    delete block;
-    delete scan;
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index da9fddc342..7500c73899 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -45,6 +46,7 @@ import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TFileScanNode;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TPlanNode;
@@ -72,16 +74,23 @@ public class ExternalFileScanNode extends ExternalScanNode {
         public List<Expr> conjuncts;
 
         public TupleDescriptor destTupleDescriptor;
-
+        public Map<String, SlotDescriptor> destSlotDescByName;
         // === Set when init ===
         public TupleDescriptor srcTupleDescriptor;
+        public Map<String, SlotDescriptor> srcSlotDescByName;
         public Map<String, Expr> exprMap;
-        public Map<String, SlotDescriptor> slotDescByName;
         public String timezone;
         // === Set when init ===
 
         public TFileScanRangeParams params;
 
+        public void createDestSlotMap() {
+            Preconditions.checkNotNull(destTupleDescriptor);
+            destSlotDescByName = Maps.newHashMap();
+            for (SlotDescriptor slot : destTupleDescriptor.getSlots()) {
+                destSlotDescByName.put(slot.getColumn().getName(), slot);
+            }
+        }
     }
 
     public enum Type {
@@ -169,7 +178,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 break;
             case LOAD:
                 for (FileGroupInfo fileGroupInfo : fileGroupInfos) {
-                    this.scanProviders.add(new 
LoadScanProvider(fileGroupInfo));
+                    this.scanProviders.add(new LoadScanProvider(fileGroupInfo, 
desc));
                 }
                 break;
             default:
@@ -186,6 +195,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
     private void initParamCreateContexts(Analyzer analyzer) throws 
UserException {
         for (FileScanProviderIf scanProvider : scanProviders) {
             ParamCreateContext context = scanProvider.createContext(analyzer);
+            context.createDestSlotMap();
             // set where and preceding filter.
             // FIXME(cmy): we should support set different expr for different 
file group.
             
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), 
context.srcTupleDescriptor, analyzer);
@@ -255,20 +265,72 @@ public class ExternalFileScanNode extends 
ExternalScanNode {
                 contexts.size() + " vs. " + scanProviders.size());
         for (int i = 0; i < contexts.size(); ++i) {
             ParamCreateContext context = contexts.get(i);
-            finalizeParamsForLoad(context, analyzer);
             FileScanProviderIf scanProvider = scanProviders.get(i);
+            setDefaultValueExprs(scanProvider, context);
+            finalizeParamsForLoad(context, analyzer);
             createScanRangeLocations(context, scanProvider);
             this.inputSplitsNum += scanProvider.getInputSplitNum();
             this.totalFileSize += scanProvider.getInputFileSize();
         }
     }
 
+    protected void setDefaultValueExprs(FileScanProviderIf scanProvider, 
ParamCreateContext context)
+            throws UserException {
+        TableIf tbl = scanProvider.getTargetTable();
+        Preconditions.checkNotNull(tbl);
+        TExpr tExpr = new TExpr();
+        tExpr.setNodes(Lists.newArrayList());
+
+        for (Column column : tbl.getBaseSchema()) {
+            Expr expr;
+            if (column.getDefaultValue() != null) {
+                if (column.getDefaultValueExprDef() != null) {
+                    expr = column.getDefaultValueExpr();
+                } else {
+                    expr = new StringLiteral(column.getDefaultValue());
+                }
+            } else {
+                if (column.isAllowNull()) {
+                    expr = 
NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR);
+                } else {
+                    expr = null;
+                }
+            }
+            SlotDescriptor slotDesc = null;
+            switch (type) {
+                case LOAD: {
+                    slotDesc = context.srcSlotDescByName.get(column.getName());
+                    break;
+                }
+                case QUERY: {
+                    slotDesc = 
context.destSlotDescByName.get(column.getName());
+                    break;
+                }
+                default:
+                    Preconditions.checkState(false, type);
+            }
+            // if slot desc is null, which mean it is a unrelated slot, just 
skip.
+            // eg:
+            // (a, b, c) set (x=a, y=b, z=c)
+            // c does not exist in file, the z will be filled with null, even 
if z has default value.
+            // and if z is not nullable, the load will fail.
+            if (slotDesc != null) {
+                if (expr != null) {
+                    expr = castToSlot(slotDesc, expr);
+                    
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), 
expr.treeToThrift());
+                } else {
+                    
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
+                }
+            }
+        }
+    }
+
     protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer 
analyzer) throws UserException {
         if (type != Type.LOAD) {
             context.params.setSrcTupleId(-1);
             return;
         }
-        Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
+        Map<String, SlotDescriptor> slotDescByName = context.srcSlotDescByName;
         Map<String, Expr> exprMap = context.exprMap;
         TupleDescriptor srcTupleDesc = context.srcTupleDescriptor;
         boolean negative = context.fileGroup.isNegative();
@@ -426,3 +488,5 @@ public class ExternalFileScanNode extends ExternalScanNode {
     }
 }
 
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index 700d8be098..8ae7952169 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
@@ -56,4 +57,6 @@ public interface FileScanProviderIf {
     int getInputSplitNum();
 
     long getInputFileSize();
+
+    TableIf getTargetTable();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 7965a02711..1df3d639bc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.HiveBucketUtil;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -86,6 +87,11 @@ public class HiveScanProvider implements 
HMSTableScanProviderIf {
         this.desc = desc;
     }
 
+    @Override
+    public TableIf getTargetTable() {
+        return hmsTable;
+    }
+
     @Override
     public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
         TFileFormatType type = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 33b0db2de7..d202ead466 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
@@ -55,10 +57,12 @@ import java.util.Map;
 
 public class LoadScanProvider implements FileScanProviderIf {
 
-    FileGroupInfo fileGroupInfo;
+    private FileGroupInfo fileGroupInfo;
+    private TupleDescriptor destTupleDesc;
 
-    public LoadScanProvider(FileGroupInfo fileGroupInfo) {
+    public LoadScanProvider(FileGroupInfo fileGroupInfo, TupleDescriptor 
destTupleDesc) {
         this.fileGroupInfo = fileGroupInfo;
+        this.destTupleDesc = destTupleDesc;
     }
 
     @Override
@@ -89,6 +93,7 @@ public class LoadScanProvider implements FileScanProviderIf {
     @Override
     public ParamCreateContext createContext(Analyzer analyzer) throws 
UserException {
         ParamCreateContext ctx = new ParamCreateContext();
+        ctx.destTupleDescriptor = destTupleDesc;
         ctx.fileGroup = fileGroupInfo.getFileGroup();
         ctx.timezone = analyzer.getTimezone();
 
@@ -169,7 +174,7 @@ public class LoadScanProvider implements FileScanProviderIf 
{
      */
     private void initColumns(ParamCreateContext context, Analyzer analyzer) 
throws UserException {
         context.srcTupleDescriptor = 
analyzer.getDescTbl().createTupleDescriptor();
-        context.slotDescByName = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        context.srcSlotDescByName = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
         context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
         // for load job, column exprs is got from file group
@@ -190,7 +195,7 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         }
         List<Integer> srcSlotIds = Lists.newArrayList();
         Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, 
context.fileGroup.getColumnToHadoopFunction(),
-                context.exprMap, analyzer, context.srcTupleDescriptor, 
context.slotDescByName, srcSlotIds,
+                context.exprMap, analyzer, context.srcTupleDescriptor, 
context.srcSlotDescByName, srcSlotIds,
                 formatType(context.fileGroup.getFileFormat(), ""), null, 
VectorizedUtil.isVectorized());
 
         int columnCountFromPath = 0;
@@ -247,4 +252,9 @@ public class LoadScanProvider implements FileScanProviderIf 
{
             return TFileFormatType.FORMAT_CSV_PLAIN;
         }
     }
+
+    @Override
+    public TableIf getTargetTable() {
+        return fileGroupInfo.getTargetTable();
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4135099c8b..0a4529572b 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -261,17 +261,18 @@ struct TFileScanRangeParams {
     // The convert exprt map for load job
     // desc slot id -> expr
     9: optional map<Types.TSlotId, Exprs.TExpr> expr_of_dest_slot
+    10: optional map<Types.TSlotId, Exprs.TExpr> default_value_of_src_slot
     // This is the mapping of dest slot id and src slot id in load expr
     // It excludes the slot id which has the transform expr
-    10: optional map<Types.TSlotId, Types.TSlotId> 
dest_sid_to_src_sid_without_trans
+    11: optional map<Types.TSlotId, Types.TSlotId> 
dest_sid_to_src_sid_without_trans
 
     // strictMode is a boolean
     // if strict mode is true, the incorrect data (the result of cast is null) 
will not be loaded
-    11: optional bool strict_mode
+    12: optional bool strict_mode
 
-    12: optional list<Types.TNetworkAddress> broker_addresses
-    13: optional TFileAttributes file_attributes
-    14: optional Exprs.TExpr pre_filter_exprs
+    13: optional list<Types.TNetworkAddress> broker_addresses
+    14: optional TFileAttributes file_attributes
+    15: optional Exprs.TExpr pre_filter_exprs
 }
 
 struct TFileRangeDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to