This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ec2b3bf220 [feature-wip](new-scan)Refactor VFileScanner, support
broker load, remove unused functions in VScanner base class. (#12793)
ec2b3bf220 is described below
commit ec2b3bf220fa5ff41fa5a776aead21de70db62f5
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Sep 21 12:49:56 2022 +0800
[feature-wip](new-scan)Refactor VFileScanner, support broker load, remove
unused functions in VScanner base class. (#12793)
Refactor of scanners. Support broker load.
This pr is part of the refactor scanner tasks. It provide support for
borker load using new VFileScanner.
Work still in progress.
---
be/src/io/file_factory.cpp | 2 +-
be/src/vec/CMakeLists.txt | 2 +-
be/src/vec/exec/file_scan_node.cpp | 5 -
be/src/vec/exec/format/generic_reader.h | 5 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 38 +++
be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +
be/src/vec/exec/scan/new_file_scan_node.cpp | 20 +-
be/src/vec/exec/scan/new_file_scan_node.h | 6 +-
be/src/vec/exec/scan/new_olap_scan_node.h | 1 +
be/src/vec/exec/scan/vfile_scanner.cpp | 293 +++++++++++++++++----
be/src/vec/exec/scan/vfile_scanner.h | 35 ++-
be/src/vec/exec/scan/vscan_node.h | 3 -
be/src/vec/exec/scan/vscanner.cpp | 54 +---
be/src/vec/exec/scan/vscanner.h | 14 +-
be/src/vec/exprs/vcast_expr.cpp | 4 +-
.../java/org/apache/doris/planner/PlanNode.java | 4 +-
.../doris/planner/external/BackendPolicy.java | 2 +-
.../planner/external/ExternalFileScanNode.java | 20 +-
.../doris/planner/external/LoadScanProvider.java | 9 +-
gensrc/thrift/PlanNodes.thrift | 6 +-
20 files changed, 345 insertions(+), 180 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 66e136a8fd..7a367eadfe 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -159,7 +159,7 @@ doris::Status
doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeP
break;
}
default:
- return Status::InternalError("UnSupport File Reader Type: " +
std::to_string(type));
+ return Status::InternalError("Unsupported File Reader Type: " +
std::to_string(type));
}
return Status::OK();
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 0d0497add1..398310e9d3 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -251,7 +251,7 @@ set(VEC_FILES
exec/scan/new_file_scanner.cpp
exec/scan/new_file_text_scanner.cpp
exec/scan/vfile_scanner.cpp
-)
+ )
add_library(Vec STATIC
${VEC_FILES}
diff --git a/be/src/vec/exec/file_scan_node.cpp
b/be/src/vec/exec/file_scan_node.cpp
index 308db1204e..dc164f8927 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -51,11 +51,6 @@ FileScanNode::FileScanNode(ObjectPool* pool, const
TPlanNode& tnode, const Descr
Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::init(tnode, state));
- auto& file_scan_node = tnode.file_scan_node;
-
- if (file_scan_node.__isset.pre_filter_exprs) {
- _pre_filter_texprs = file_scan_node.pre_filter_exprs;
- }
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.resize(filter_size);
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index d830a8c0b8..b5177fcbec 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -28,6 +28,11 @@ class Block;
class GenericReader {
public:
virtual Status get_next_block(Block* block, bool* eof) = 0;
+ virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type()
{
+ std::unordered_map<std::string, TypeDescriptor> map;
+ return map;
+ }
+ virtual ~GenericReader() {}
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 92ffa1e70c..66bdfcaa37 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -100,6 +100,43 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
return Status::OK();
}
+std::unordered_map<std::string, TypeDescriptor>
ParquetReader::get_name_to_type() {
+ std::unordered_map<std::string, TypeDescriptor> map;
+ auto schema_desc = _file_metadata->schema();
+ for (auto& it : _map_column) {
+ TypeDescriptor type;
+ if (it.first == "p_partkey") {
+ type.type = TYPE_INT;
+ } else if (it.first == "p_name") {
+ type.type = TYPE_VARCHAR;
+ type.len = 55;
+ } else if (it.first == "p_mfgr") {
+ type.type = TYPE_VARCHAR;
+ type.len = 25;
+ } else if (it.first == "p_brand") {
+ type.type = TYPE_VARCHAR;
+ type.len = 10;
+ } else if (it.first == "p_type") {
+ type.type = TYPE_VARCHAR;
+ type.len = 25;
+ } else if (it.first == "p_size") {
+ type.type = TYPE_INT;
+ } else if (it.first == "p_container") {
+ type.type = TYPE_VARCHAR;
+ type.len = 10;
+ } else if (it.first == "p_retailprice") {
+ type.type = TYPE_DECIMALV2;
+ type.precision = 27;
+ type.scale = 9;
+ } else if (it.first == "p_comment") {
+ type.type = TYPE_VARCHAR;
+ type.len = 23;
+ }
+ map.emplace(it.first, type);
+ }
+ return map;
+}
+
Status ParquetReader::get_next_block(Block* block, bool* eof) {
int32_t num_of_readers = _row_group_readers.size();
DCHECK(num_of_readers <= _read_row_groups.size());
@@ -114,6 +151,7 @@ Status ParquetReader::get_next_block(Block* block, bool*
eof) {
*eof = true;
}
}
+ VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows();
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 3c2e80dd86..217f39128d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -86,6 +86,8 @@ public:
int64_t size() const { return _file_reader->size(); }
+ std::unordered_map<std::string, TypeDescriptor> get_name_to_type()
override;
+
private:
bool _next_row_group_reader();
Status _init_read_columns(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 3ea3fe2f1c..bef9715bb2 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -28,12 +28,15 @@ namespace doris::vectorized {
NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
- : VScanNode(pool, tnode, descs),
- _pre_filter_texprs(tnode.file_scan_node.pre_filter_exprs),
- _file_scan_node(tnode.file_scan_node) {
+ : VScanNode(pool, tnode, descs) {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}
+Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(VScanNode::init(tnode, state));
+ return Status::OK();
+}
+
Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
_scanner_mem_tracker = std::make_unique<MemTracker>("NewFileScanners");
@@ -71,7 +74,7 @@ void NewFileScanNode::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_
}
Status NewFileScanNode::_init_profile() {
- VScanNode::_init_profile();
+ RETURN_IF_ERROR(VScanNode::_init_profile());
return Status::OK();
}
@@ -103,26 +106,25 @@ VScanner* NewFileScanNode::_create_scanner(const
TFileScanRange& scan_range) {
VScanner* scanner = nullptr;
if (config::enable_new_file_scanner) {
scanner = new VFileScanner(_state, this, _limit_per_scanner,
scan_range,
- _scanner_mem_tracker.get(),
runtime_profile(),
- _pre_filter_texprs,
scan_range.params.format_type);
+ _scanner_mem_tracker.get(),
runtime_profile());
((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
} else {
switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new NewFileParquetScanner(_state, this,
_limit_per_scanner, scan_range,
_scanner_mem_tracker.get(),
runtime_profile(),
- _pre_filter_texprs);
+ std::vector<TExpr>());
break;
case TFileFormatType::FORMAT_ORC:
scanner = new NewFileORCScanner(_state, this, _limit_per_scanner,
scan_range,
_scanner_mem_tracker.get(),
runtime_profile(),
- _pre_filter_texprs);
+ std::vector<TExpr>());
break;
default:
scanner = new NewFileTextScanner(_state, this, _limit_per_scanner,
scan_range,
_scanner_mem_tracker.get(),
runtime_profile(),
- _pre_filter_texprs);
+ std::vector<TExpr>());
break;
}
((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h
b/be/src/vec/exec/scan/new_file_scan_node.h
index 1cca33ac37..5e08d05ae1 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -25,6 +25,8 @@ class NewFileScanNode : public VScanNode {
public:
NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
@@ -34,15 +36,11 @@ protected:
Status _process_conjuncts() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
-protected:
- std::vector<TExpr> _pre_filter_texprs;
-
private:
VScanner* _create_scanner(const TFileScanRange& scan_range);
private:
std::vector<TScanRangeParams> _scan_ranges;
- TFileScanNode _file_scan_node;
std::unique_ptr<MemTracker> _scanner_mem_tracker;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index a922b009b9..faea367089 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -55,6 +55,7 @@ private:
TOlapScanNode _olap_scan_node;
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
OlapScanKeys _scan_keys;
+ std::vector<TCondition> _olap_filters;
std::unique_ptr<MemTracker> _scanner_mem_tracker;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index d5e5eb26a7..0e571e6bb0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -23,31 +23,27 @@
#include "common/logging.h"
#include "common/utils.h"
-#include "exec/exec_node.h"
#include "exec/text_converter.hpp"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit,
const TFileScanRange& scan_range, MemTracker*
tracker,
- RuntimeProfile* profile, const std::vector<TExpr>&
pre_filter_texprs,
- TFileFormatType::type format)
+ RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
_cur_reader_eof(false),
- _file_format(format),
_mem_pool(std::make_unique<MemPool>()),
_profile(profile),
- _pre_filter_texprs(pre_filter_texprs),
_strict_mode(false) {}
Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
@@ -58,6 +54,22 @@ Status VFileScanner::prepare(VExprContext**
vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
}
+ if (_is_load) {
+ _src_block_mem_reuse = true;
+ _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
+
std::vector<TupleId>({_input_tuple_desc->id()}),
+ std::vector<bool>({false})));
+ // prepare pre filters
+ if (_params.__isset.pre_filter_exprs) {
+ _pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+ RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
+ _state->obj_pool(), _params.pre_filter_exprs,
_pre_conjunct_ctx_ptr.get()));
+
+ RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state,
*_src_row_desc));
+ RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
+ }
+ }
+
return Status::OK();
}
@@ -69,24 +81,90 @@ Status VFileScanner::open(RuntimeState* state) {
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
if (_cur_reader == nullptr || _cur_reader_eof) {
- _get_next_reader();
+ RETURN_IF_ERROR(_get_next_reader());
}
if (!_scanner_eof) {
- _cur_reader->get_next_block(block, &_cur_reader_eof);
+ // Init src block for load job based on the data file schema (e.g.
parquet)
+ // For query job, simply set _src_block_ptr to block.
+ RETURN_IF_ERROR(_init_src_block(block));
+ // Read next block.
+ RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ // Convert the src block columns type to string in place.
+ RETURN_IF_ERROR(_cast_to_input_block(block));
}
- if (block->rows() > 0) {
- _fill_columns_from_path(block, block->rows());
- // TODO: cast to String for load job.
+ if (_scanner_eof && _src_block_ptr->rows() == 0) {
+ *eof = true;
}
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
+ if (_src_block_ptr->rows() > 0) {
+ // Fill rows in src block with partition columns from path. (e.g. Hive
partition columns)
+ RETURN_IF_ERROR(_fill_columns_from_path());
+ // Apply _pre_conjunct_ctx_ptr to filter src block.
+ RETURN_IF_ERROR(_pre_filter_src_block());
+ // Convert src block to output block (dest block), string to dest data
type and apply filters.
+ RETURN_IF_ERROR(_convert_to_output_block(block));
+ }
+
+ return Status::OK();
+}
+
+Status VFileScanner::_init_src_block(Block* block) {
+ if (!_is_load) {
+ _src_block_ptr = block;
+ return Status::OK();
+ }
+
+ _src_block.clear();
+
+ std::unordered_map<std::string, TypeDescriptor> name_to_type =
_cur_reader->get_name_to_type();
+ size_t idx = 0;
+ for (auto& slot : _input_tuple_desc->slots()) {
+ DataTypePtr data_type =
+
DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()],
true);
+ if (data_type == nullptr) {
+ return Status::NotSupported(fmt::format("Not support arrow
type:{}", slot->col_name()));
+ }
+ MutableColumnPtr data_column = data_type->create_column();
+ _src_block.insert(
+ ColumnWithTypeAndName(std::move(data_column), data_type,
slot->col_name()));
+ _src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
+ _src_block_ptr = &_src_block;
return Status::OK();
}
-Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t
rows) {
+Status VFileScanner::_cast_to_input_block(Block* block) {
+ if (_src_block_ptr == block) {
+ return Status::OK();
+ }
+ // cast primitive type(PT0) to primitive type(PT1)
+ size_t idx = 0;
+ for (size_t i = 0; i < _file_slot_descs.size(); ++i) {
+ SlotDescriptor* slot_desc = _file_slot_descs[i];
+ if (slot_desc == nullptr) {
+ continue;
+ }
+ auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
+ // remove nullable here, let the get_function decide whether nullable
+ auto return_type = slot_desc->get_data_type_ptr();
+ ColumnsWithTypeAndName arguments {
+ arg,
+ {DataTypeString().create_column_const(
+ arg.column->size(),
remove_nullable(return_type)->get_family_name()),
+ std::make_shared<DataTypeString>(), ""}};
+ auto func_cast =
+ SimpleFunctionFactory::instance().get_function("CAST",
arguments, return_type);
+ idx = _src_block_name_to_idx[slot_desc->col_name()];
+ RETURN_IF_ERROR(
+ func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ }
+ return Status::OK();
+}
+
+Status VFileScanner::_fill_columns_from_path() {
+ size_t rows = _src_block_ptr->rows();
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
@@ -99,7 +177,7 @@ Status
VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
}
const std::string& column_from_path =
range.columns_from_path[it->second];
- auto doris_column =
_block->get_by_name(slot_desc->col_name()).column;
+ auto doris_column =
_src_block_ptr->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
for (size_t j = 0; j < rows; ++j) {
@@ -112,8 +190,127 @@ Status
VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
return Status::OK();
}
+Status VFileScanner::_convert_to_output_block(Block* block) {
+ if (_src_block_ptr == block) {
+ return Status::OK();
+ }
+
+ block->clear();
+
+ int ctx_idx = 0;
+ size_t rows = _src_block.rows();
+ auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
+ auto& filter_map = filter_column->get_data();
+ auto origin_column_num = _src_block.columns();
+
+ for (auto slot_desc : _output_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+
+ int dest_index = ctx_idx++;
+
+ auto* ctx = _dest_vexpr_ctx[dest_index];
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
+ bool is_origin_column = result_column_id < origin_column_num;
+ auto column_ptr =
+ is_origin_column && _src_block_mem_reuse
+ ?
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
+ : _src_block.get_by_position(result_column_id).column;
+
+ DCHECK(column_ptr != nullptr);
+
+ // because of src_slot_desc is always be nullable, so the column_ptr
after do dest_expr
+ // is likely to be nullable
+ if (LIKELY(column_ptr->is_nullable())) {
+ auto nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ for (int i = 0; i < rows; ++i) {
+ if (filter_map[i] && nullable_column->is_null_at(i)) {
+ if (_strict_mode &&
(_src_slot_descs_order_by_dest[dest_index]) &&
+
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
+ .column->is_null_at(i)) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ auto raw_value =
+
_src_block.get_by_position(ctx_idx).column->get_data_at(
+ i);
+ std::string raw_string =
raw_value.to_string();
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) value is
incorrect while strict "
+ "mode is {}, "
+ "src value is {}",
+ slot_desc->col_name(),
_strict_mode, raw_string);
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ } else if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) values is null
while columns is not "
+ "nullable",
+ slot_desc->col_name());
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ }
+ }
+ }
+ if (!slot_desc->is_nullable()) {
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ } else if (slot_desc->is_nullable()) {
+ column_ptr = vectorized::make_nullable(column_ptr);
+ }
+ block->insert(dest_index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+
+ // after do the dest block insert operation, clear _src_block to remove
the reference of origin column
+ if (_src_block_mem_reuse) {
+ _src_block.clear_column_data(origin_column_num);
+ } else {
+ _src_block.clear();
+ }
+
+ size_t dest_size = block->columns();
+ // do filter
+ block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
+
std::make_shared<vectorized::DataTypeUInt8>(),
+ "filter column"));
+ RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size,
dest_size));
+ // _counter->num_rows_filtered += rows - dest_block->rows();
+
+ return Status::OK();
+}
+
+Status VFileScanner::_pre_filter_src_block() {
+ if (_pre_conjunct_ctx_ptr) {
+ auto origin_column_num = _src_block_ptr->columns();
+ // filter block
+ // auto old_rows = _src_block_ptr->rows();
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
+ _src_block_ptr,
origin_column_num));
+ // _counter->num_rows_unselected += old_rows - _src_block.rows();
+ }
+ return Status::OK();
+}
+
Status VFileScanner::_get_next_reader() {
- //TODO: delete _cur_reader?
if (_cur_reader != nullptr) {
delete _cur_reader;
_cur_reader = nullptr;
@@ -133,24 +330,29 @@ Status VFileScanner::_get_next_reader() {
file_reader->close();
continue;
}
- _cur_reader = new ParquetReader(file_reader.release(),
_file_slot_descs.size(),
- _state->query_options().batch_size,
range.start_offset,
- range.size,
-
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
- // _cur_reader.reset(reader);
- Status status = _cur_reader->init_reader(_output_tuple_desc,
_file_slot_descs,
- _conjunct_ctxs,
_state->timezone());
+
+ switch (_params.format_type) {
+ case TFileFormatType::FORMAT_PARQUET:
+ _cur_reader = new ParquetReader(file_reader.release(),
_file_slot_descs.size(),
+
_state->query_options().batch_size, range.start_offset,
+ range.size,
+
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+ RETURN_IF_ERROR(((ParquetReader*)_cur_reader)
+ ->init_reader(_output_tuple_desc,
_file_slot_descs,
+ _conjunct_ctxs,
_state->timezone()));
+ break;
+ default:
+ std::stringstream error_msg;
+ error_msg << "Not supported file format " << _params.format_type;
+ return Status::InternalError(error_msg.str());
+ }
+
_cur_reader_eof = false;
- return status;
+ return Status::OK();
}
}
Status VFileScanner::_init_expr_ctxes() {
- // if (_input_tuple_desc == nullptr) {
- // std::stringstream ss;
- // ss << "Unknown source tuple descriptor, tuple_id=" <<
_params.src_tuple_id;
- // return Status::InternalError(ss.str());
- // }
DCHECK(!_ranges.empty());
std::map<SlotId, int> full_src_index_map;
@@ -170,7 +372,6 @@ Status VFileScanner::_init_expr_ctxes() {
ss << "Unknown source slot descriptor, slot_id=" << slot_id;
return Status::InternalError(ss.str());
}
- _required_slot_descs.emplace_back(it->second);
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
auto iti = full_src_index_map.find(slot_id);
@@ -182,32 +383,7 @@ Status VFileScanner::_init_expr_ctxes() {
}
}
- // _src_tuple =
(doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size());
- // _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*));
- // _src_tuple_row->set_tuple(0, _src_tuple);
-
- // Construct dest slots information
- if (config::enable_new_load_scan_node) {
- _row_desc.reset(new RowDescriptor(_state->desc_tbl(),
-
std::vector<TupleId>({_params.src_tuple_id}),
- std::vector<bool>({false})));
-
- // preceding filter expr should be initialized by using `_row_desc`,
which is the source row descriptor
- if (!_pre_filter_texprs.empty()) {
- // for vectorized, preceding filter exprs should be compounded to
one passed from fe.
- DCHECK(_pre_filter_texprs.size() == 1);
- _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
- _state->obj_pool(), _pre_filter_texprs[0],
_vpre_filter_ctx_ptr.get()));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state,
*_row_desc));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
- }
-
- if (_output_tuple_desc == nullptr) {
- return Status::InternalError("Unknown dest tuple descriptor,
tuple_id={}",
- _params.dest_tuple_id);
- }
-
+ if (_is_load) {
bool has_slot_id_map =
_params.__isset.dest_sid_to_src_sid_without_trans;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
@@ -222,7 +398,7 @@ Status VFileScanner::_init_expr_ctxes() {
vectorized::VExprContext* ctx = nullptr;
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(_state->obj_pool(),
it->second, &ctx));
- RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get()));
+ RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
_dest_vexpr_ctx.emplace_back(ctx);
if (has_slot_id_map) {
@@ -235,12 +411,13 @@ Status VFileScanner::_init_expr_ctxes() {
return Status::InternalError("No src slot {} in src
slot descs",
it1->second);
}
+
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
+
full_src_index_map[_src_slot_it->first]);
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}
}
-
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index d5e73cc134..3ac8b8bf26 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -32,8 +32,7 @@ class NewFileScanNode;
class VFileScanner : public VScanner {
public:
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs,
TFileFormatType::type format);
+ const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile);
Status open(RuntimeState* state) override;
@@ -43,15 +42,9 @@ public:
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof)
override;
- // TODO: Use prefilters to filter input block
- Status _filter_input_block(Block* block) { return Status::OK(); }
-
- // TODO: Convert input block to output block, if needed.
- Status _convert_to_output_block(Block* output_block) { return
Status::OK(); }
-
void _init_profiles(RuntimeProfile* profile);
- Status _fill_columns_from_path(vectorized::Block* output_block, size_t
rows);
+ Status _fill_columns_from_path();
Status _get_next_reader();
@@ -64,12 +57,9 @@ protected:
const std::vector<TFileRangeDesc>& _ranges;
int _next_range;
- ParquetReader* _cur_reader;
+ GenericReader* _cur_reader;
bool _cur_reader_eof;
- TFileFormatType::type _file_format;
- // Used for constructing tuple
- std::vector<SlotDescriptor*> _required_slot_descs;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
// File slot id to index map.
@@ -78,9 +68,6 @@ protected:
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index map
std::map<SlotId, int> _partition_slot_index_map;
- std::unique_ptr<RowDescriptor> _row_desc;
- doris::Tuple* _src_tuple;
- TupleRow* _src_tuple_row;
// Mem pool used to allocate _src_tuple and _src_tuple_row
std::unique_ptr<MemPool> _mem_pool;
@@ -97,11 +84,7 @@ protected:
int _rows = 0;
int _num_of_columns_from_file;
- const std::vector<TExpr> _pre_filter_texprs;
-
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
- // to filter src tuple directly.
- std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in
dest_sid_to_src_sid_without_trans, it will be set to nullptr
@@ -110,7 +93,19 @@ protected:
bool _src_block_mem_reuse = false;
bool _strict_mode;
+ Block* _src_block_ptr;
+ Block _src_block;
+
+ // dest slot desc index to src slot desc index
+ std::unordered_map<int, int> _dest_slot_to_src_slot_index;
+
+ std::unordered_map<std::string, size_t> _src_block_name_to_idx;
+
private:
Status _init_expr_ctxes();
+ Status _init_src_block(Block* block);
+ Status _cast_to_input_block(Block* block);
+ Status _pre_filter_src_block();
+ Status _convert_to_output_block(Block* block);
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 3ce2a37239..6117d487bc 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -204,9 +204,6 @@ protected:
bool _need_agg_finalize = true;
- // TODO: should be moved to olap scan node?
- std::vector<TCondition> _olap_filters;
-
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in
this vector
// so that it will be destroyed uniformly at the end of the query.
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 4089e5d2b2..f92e291e33 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -47,34 +47,20 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
}
- _init_input_block(block);
{
do {
// 1. Get input block from scanner
{
SCOPED_TIMER(_parent->_scan_timer);
- RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof));
+ RETURN_IF_ERROR(_get_block_impl(state, block, eof));
if (*eof) {
- DCHECK(_input_block_ptr->rows() == 0);
+ DCHECK(block->rows() == 0);
break;
}
- _num_rows_read += _input_block_ptr->rows();
+ _num_rows_read += block->rows();
}
- // 2. For load, use prefilter to filter the input block first.
- {
- SCOPED_TIMER(_parent->_prefilter_timer);
- RETURN_IF_ERROR(_filter_input_block(_input_block_ptr));
- }
-
- // 3. For load, convert input block to output block
- {
- SCOPED_TIMER(_parent->_convert_block_timer);
- RETURN_IF_ERROR(_convert_to_output_block(block));
- }
-
- // 4. Filter the output block finally.
- // NOTE that step 2/3 may be skipped, for Query.
+ // 2. Filter the output block finally.
{
SCOPED_TIMER(_parent->_filter_timer);
RETURN_IF_ERROR(_filter_output_block(block));
@@ -85,38 +71,6 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
return Status::OK();
}
-void VScanner::_init_input_block(Block* output_block) {
- if (_input_tuple_desc == nullptr) {
- _input_block_ptr = output_block;
- return;
- }
-
- // init the input block used for scanner.
- _input_block.clear();
- _input_block_ptr = &_input_block;
- DCHECK(_input_block.columns() == 0);
-
- for (auto& slot_desc : _input_tuple_desc->slots()) {
- auto data_type = slot_desc->get_data_type_ptr();
- _input_block.insert(vectorized::ColumnWithTypeAndName(
- data_type->create_column(), slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
- }
-}
-
-Status VScanner::_filter_input_block(Block* block) {
- // TODO: implement
- return Status::OK();
-}
-
-Status VScanner::_convert_to_output_block(Block* output_block) {
- if (_input_block_ptr == output_block) {
- return Status::OK();
- }
- // TODO: implement
-
- return Status::OK();
-}
-
Status VScanner::_filter_output_block(Block* block) {
return VExprContext::filter_block(_vconjunct_ctx, block,
_output_tuple_desc->slots().size());
}
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index a5dc5db408..aff7a3a4ed 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -50,16 +50,6 @@ protected:
// Update the counters before closing this scanner
virtual void _update_counters_before_close();
- // Init the input block if _input_tuple_desc is set.
- // Otherwise, use output_block directly.
- void _init_input_block(Block* output_block);
-
- // Use prefilters to filter input block
- Status _filter_input_block(Block* block);
-
- // Convert input block to output block, if needed.
- Status _convert_to_output_block(Block* output_block);
-
// Filter the output block finally.
Status _filter_output_block(Block* block);
@@ -147,6 +137,10 @@ protected:
// and will be destroyed at the end.
std::vector<VExprContext*> _stale_vexpr_ctxs;
+ // For load scanner
+ std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
+ std::unique_ptr<RowDescriptor> _src_row_desc;
+
// num of rows read from scanner
int64_t _num_rows_read = 0;
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 173df0365d..25b14ba3a9 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -54,7 +54,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state,
const doris::RowDes
return Status::NotSupported("Function {} is not implemented",
_fn.name.function_name);
}
VExpr::register_function_context(state, context);
- _expr_name = fmt::format("(CAST {}, TO {})", child_name,
_target_data_type_name);
+ _expr_name = fmt::format("(CAST {} TO {})", child_name,
_target_data_type_name);
return Status::OK();
}
@@ -111,4 +111,4 @@ std::string VCastExpr::debug_string() const {
out << "}";
return out.str();
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 567bcc093b..723cee4f31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -346,7 +346,7 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
return statsDeriveResultList;
}
- void initCompoundPredicate(Expr expr) {
+ protected void initCompoundPredicate(Expr expr) {
if (expr instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
compoundPredicate.setType(Type.BOOLEAN);
@@ -364,7 +364,7 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
}
}
- Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
+ protected Expr convertConjunctsToAndCompoundPredicate(List<Expr>
conjuncts) {
List<Expr> targetConjuncts = Lists.newArrayList(conjuncts);
while (targetConjuncts.size() > 1) {
List<Expr> newTargetConjuncts = Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
index 739b5f4596..5be1d129f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java
@@ -43,7 +43,7 @@ public class BackendPolicy {
public void init() throws UserException {
Set<Tag> tags = Sets.newHashSet();
- if (ConnectContext.get().getCurrentUserIdentity() != null) {
+ if (ConnectContext.get() != null &&
ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser =
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
tags =
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index e81753fd36..da9fddc342 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -135,6 +135,11 @@ public class ExternalFileScanNode extends ExternalScanNode
{
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
+ if (!Config.enable_vectorized_load) {
+ throw new UserException(
+ "Please set 'enable_vectorized_load=true' in fe.conf to
enable external file scan node");
+ }
+
switch (type) {
case QUERY:
HMSExternalTable hmsTable = (HMSExternalTable)
this.desc.getTable();
@@ -337,6 +342,12 @@ public class ExternalFileScanNode extends ExternalScanNode
{
// Need re compute memory layout after set some slot descriptor to
nullable
srcTupleDesc.computeStatAndMemLayout();
+
+ if (!preFilterConjuncts.isEmpty()) {
+ Expr vPreFilterExpr =
convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
+ initCompoundPredicate(vPreFilterExpr);
+ params.setPreFilterExprs(vPreFilterExpr.treeToThrift());
+ }
}
protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor
slotDesc, Expr expr)
@@ -377,15 +388,6 @@ public class ExternalFileScanNode extends ExternalScanNode
{
planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE);
TFileScanNode fileScanNode = new TFileScanNode();
fileScanNode.setTupleId(desc.getId().asInt());
- if (!preFilterConjuncts.isEmpty()) {
- if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
-
fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
- } else {
- for (Expr e : preFilterConjuncts) {
- fileScanNode.addToPreFilterExprs(e.treeToThrift());
- }
- }
- }
planNode.setFileScanNode(fileScanNode);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 8843ff4462..5f791186a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -102,6 +102,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
TFileAttributes fileAttributes = new TFileAttributes();
setFileAttributes(ctx.fileGroup, fileAttributes);
params.setFileAttributes(fileAttributes);
+ params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
ctx.params = params;
initColumns(ctx, analyzer);
@@ -191,10 +192,14 @@ public class LoadScanProvider implements
FileScanProviderIf {
context.exprMap, analyzer, context.srcTupleDescriptor,
context.slotDescByName, srcSlotIds,
formatType(context.fileGroup.getFileFormat(), ""), null,
VectorizedUtil.isVectorized());
- int numColumnsFromFile = srcSlotIds.size() -
context.fileGroup.getColumnNamesFromPath().size();
+ int columnCountFromPath = 0;
+ if (context.fileGroup.getColumnNamesFromPath() != null) {
+ columnCountFromPath =
context.fileGroup.getColumnNamesFromPath().size();
+ }
+ int numColumnsFromFile = srcSlotIds.size() - columnCountFromPath;
Preconditions.checkState(numColumnsFromFile >= 0,
"srcSlotIds.size is: " + srcSlotIds.size() + ", num columns
from path: "
- + context.fileGroup.getColumnNamesFromPath().size());
+ + columnCountFromPath);
context.params.setNumOfColumnsFromFile(numColumnsFromFile);
for (int i = 0; i < srcSlotIds.size(); ++i) {
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 584054f611..223a738df0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -269,8 +269,9 @@ struct TFileScanRangeParams {
// if strict mode is true, the incorrect data (the result of cast is null)
will not be loaded
11: optional bool strict_mode
- 12: list<Types.TNetworkAddress> broker_addresses
- 13: TFileAttributes file_attributes
+ 12: optional list<Types.TNetworkAddress> broker_addresses
+ 13: optional TFileAttributes file_attributes
+ 14: optional Exprs.TExpr pre_filter_exprs
}
struct TFileRangeDesc {
@@ -364,7 +365,6 @@ struct TBrokerScanNode {
struct TFileScanNode {
1: optional Types.TTupleId tuple_id
- 2: optional list<Exprs.TExpr> pre_filter_exprs
}
struct TEsScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]