This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ed527a7d9c6 branch-3.1: [fix](tvf) support compressed json file for
tvf and refactor code #51983 (#52581)
ed527a7d9c6 is described below
commit ed527a7d9c6bd161619dbe74bab0731ecb3d3b92
Author: Socrates <[email protected]>
AuthorDate: Sat Jul 5 20:26:34 2025 +0800
branch-3.1: [fix](tvf) support compressed json file for tvf and refactor
code #51983 (#52581)
bp: #51983
---
be/src/olap/push_handler.cpp | 1 -
be/src/service/internal_service.cpp | 7 +-
be/src/vec/exec/format/avro/avro_jni_reader.cpp | 5 +-
be/src/vec/exec/format/avro/avro_jni_reader.h | 4 +-
be/src/vec/exec/format/csv/csv_reader.cpp | 77 ++++++++++-----------
be/src/vec/exec/format/csv/csv_reader.h | 7 +-
be/src/vec/exec/format/generic_reader.h | 6 ++
be/src/vec/exec/format/json/new_json_reader.cpp | 13 ++--
be/src/vec/exec/format/json/new_json_reader.h | 1 +
be/src/vec/exec/format/orc/vorc_reader.cpp | 9 ++-
be/src/vec/exec/format/orc/vorc_reader.h | 2 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 22 +++---
be/src/vec/exec/format/parquet/vparquet_reader.h | 4 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 6 +-
be/src/vec/exec/format/wal/wal_reader.h | 2 +
be/src/vec/exec/scan/vfile_scanner.cpp | 12 +---
be/src/vec/exec/scan/vfile_scanner.h | 1 -
.../exec/format/parquet/parquet_reader_test.cpp | 4 --
.../json_format_test/simple_object_json.json.gz | Bin 0 -> 211 bytes
.../data/external_table_p0/tvf/test_hdfs_tvf.out | Bin 40945 -> 41176 bytes
.../external_table_p0/tvf/test_hdfs_tvf.groovy | 10 +++
21 files changed, 105 insertions(+), 88 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 8cfe6a1a7f9..4233b79e865 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -643,7 +643,6 @@ Status PushBrokerReader::_get_next_reader() {
const_cast<cctz::time_zone*>(&_runtime_state->timezone_obj()),
_io_ctx.get(), _runtime_state.get());
- RETURN_IF_ERROR(parquet_reader->open());
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_all_col_names, place_holder, _colname_to_value_range,
_push_down_exprs,
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index ec5e0b7adf0..3cb999fea1b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -856,7 +856,6 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
case TFileFormatType::FORMAT_AVRO: {
reader = vectorized::AvroJNIReader::create_unique(profile.get(),
params, range,
file_slots);
- st =
((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader();
break;
}
default:
@@ -865,6 +864,12 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
st.to_protobuf(result->mutable_status());
return;
}
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create reader, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ st = reader->init_schema_reader();
if (!st.ok()) {
LOG(WARNING) << "failed to init reader, errmsg=" << st;
st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index 6591abab58d..d6c38730f9f 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -54,7 +54,7 @@ Status
AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-Status AvroJNIReader::init_fetch_table_reader(
+Status AvroJNIReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
std::ostringstream required_fields;
@@ -107,7 +107,8 @@ TFileType::type AvroJNIReader::get_file_type() {
return type;
}
-Status AvroJNIReader::init_fetch_table_schema_reader() {
+// open the jni connector for parsing schema
+Status AvroJNIReader::init_schema_reader() {
std::map<String, String> required_param = {{"uri", _range.path},
{"file_type",
std::to_string(get_file_type())},
{"is_get_table_schema",
"true"}};
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index c8d55cf58cf..7daaa232f64 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -70,12 +70,12 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- Status init_fetch_table_reader(
+ Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
TFileType::type get_file_type();
- Status init_fetch_table_schema_reader();
+ Status init_schema_reader() override;
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 34a789cf854..6ce31b59561 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -396,14 +396,45 @@ Status
CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
return Status::OK();
}
+// init decompressor, file reader and line reader for parsing schema
+Status CsvReader::init_schema_reader() {
+ _start_offset = _range.start_offset;
+ if (_start_offset != 0) {
+ return Status::InvalidArgument(
+ "start offset of TFileRangeDesc must be zero in get parsered
schema");
+ }
+ if (_params.file_type == TFileType::FILE_BROKER) {
+ return Status::InternalError<false>(
+ "Getting parsered schema from csv file do not support stream
load and broker "
+ "load.");
+ }
+
+ // csv file without names line and types line.
+ _read_line = 1;
+ _is_parse_name = false;
+
+ if (_params.__isset.file_attributes &&
_params.file_attributes.__isset.header_type &&
+ !_params.file_attributes.header_type.empty()) {
+ std::string header_type =
to_lower(_params.file_attributes.header_type);
+ if (header_type == BeConsts::CSV_WITH_NAMES) {
+ _is_parse_name = true;
+ } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+ _read_line = 2;
+ _is_parse_name = true;
+ }
+ }
+
+ RETURN_IF_ERROR(_init_options());
+ RETURN_IF_ERROR(_create_file_reader(true));
+ RETURN_IF_ERROR(_create_decompressor());
+ RETURN_IF_ERROR(_create_line_reader());
+ return Status::OK();
+}
+
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
- size_t read_line = 0;
- bool is_parse_name = false;
- RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));
-
- if (read_line == 1) {
- if (!is_parse_name) { //parse csv file without names and types
+ if (_read_line == 1) {
+ if (!_is_parse_name) { //parse csv file without names and types
size_t col_nums = 0;
RETURN_IF_ERROR(_parse_col_nums(&col_nums));
for (size_t i = 0; i < col_nums; ++i) {
@@ -708,40 +739,6 @@ void CsvReader::_split_line(const Slice& line) {
_fields_splitter->split_line(line, &_split_values);
}
-Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
- _start_offset = _range.start_offset;
- if (_start_offset != 0) {
- return Status::InvalidArgument(
- "start offset of TFileRangeDesc must be zero in get parsered
schema");
- }
- if (_params.file_type == TFileType::FILE_BROKER) {
- return Status::InternalError<false>(
- "Getting parsered schema from csv file do not support stream
load and broker "
- "load.");
- }
-
- // csv file without names line and types line.
- *read_line = 1;
- *is_parse_name = false;
-
- if (_params.__isset.file_attributes &&
_params.file_attributes.__isset.header_type &&
- !_params.file_attributes.header_type.empty()) {
- std::string header_type =
to_lower(_params.file_attributes.header_type);
- if (header_type == BeConsts::CSV_WITH_NAMES) {
- *is_parse_name = true;
- } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
- *read_line = 2;
- *is_parse_name = true;
- }
- }
-
- RETURN_IF_ERROR(_init_options());
- RETURN_IF_ERROR(_create_file_reader(true));
- RETURN_IF_ERROR(_create_decompressor());
- RETURN_IF_ERROR(_create_line_reader());
- return Status::OK();
-}
-
Status CsvReader::_parse_col_nums(size_t* col_nums) {
const uint8_t* ptr = nullptr;
size_t size = 0;
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index 1f060d18ac3..117b5058c52 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -182,6 +182,7 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status init_schema_reader() override;
// get schema of csv file from first one line or first two lines.
// if file format is FORMAT_CSV_DEFLATE and if
// 1. header_type is empty, get schema from first line.
@@ -231,9 +232,6 @@ private:
void _init_system_properties();
void _init_file_description();
- // used for parse table schema of csv file.
- // Currently, this feature is for table valued function.
- Status _prepare_parse(size_t* read_line, bool* is_parse_name);
Status _parse_col_nums(size_t* col_nums);
Status _parse_col_names(std::vector<std::string>* col_names);
// TODO(ftw): parse type
@@ -263,6 +261,9 @@ private:
// True if this is a load task
bool _is_load = false;
bool _line_reader_eof;
+ // For schema reader
+ size_t _read_line = 0;
+ bool _is_parse_name = false;
TFileFormatType::type _file_format_type;
bool _is_proto_format;
TFileCompressType::type _file_compress_type;
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index e32928e4b95..c853cae15a6 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -45,6 +45,12 @@ public:
return Status::NotSupported("get_columns is not implemented");
}
+ // This method is responsible for initializing the resource for parsing
schema.
+ // It will be called before `get_parsed_schema`.
+ virtual Status init_schema_reader() {
+ return Status::NotSupported("init_schema_reader is not implemented for
this reader.");
+ }
+ // `col_types` is always nullable to process illegal values.
virtual Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
return Status::NotSupported("get_parsed_schema is not implemented for
this reader.");
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index c8969c6d4c3..173bb2b4dc4 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -255,18 +255,23 @@ Status
NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>*
col_types) {
+// init decompressor, file reader and line reader for parsing schema
+Status NewJsonReader::init_schema_reader() {
RETURN_IF_ERROR(_get_range_params());
-
+ // create decompressor.
+ // _decompressor may be nullptr if this is not a compressed file
+ RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type,
&_decompressor));
RETURN_IF_ERROR(_open_file_reader(true));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
-
// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+ return Status::OK();
+}
+Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>*
col_types) {
bool eof = false;
const uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str_ptr;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 31ddc0fa9c9..967a5300529 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -93,6 +93,7 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status init_schema_reader() override;
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 35ff0c7561c..2c10c9ff29c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -359,10 +359,14 @@ Status OrcReader::init_reader(
return Status::OK();
}
+// init file reader for parsing schema
+Status OrcReader::init_schema_reader() {
+ return _create_file_reader();
+}
+
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
- RETURN_IF_ERROR(_create_file_reader());
- auto& root_type = _is_acid ? _remove_acid(_reader->getType()) :
_reader->getType();
+ const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) :
_reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(get_field_name_lower_case(&root_type, i));
col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i)));
@@ -374,7 +378,6 @@ Status
OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_na
std::vector<int32_t>*
col_attributes,
const std::string& attribute,
bool* exist_attribute) {
- RETURN_IF_ERROR(_create_file_reader());
*exist_attribute = true;
auto& root_type = _is_acid ? _remove_acid(_reader->getType()) :
_reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 3e2b785cf03..fc4fba4789c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -177,6 +177,8 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status init_schema_reader() override;
+
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a38031a668f..1b48f04066e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -259,12 +259,6 @@ const FieldDescriptor
ParquetReader::get_file_metadata_schema() {
return _file_metadata->schema();
}
-Status ParquetReader::open() {
- RETURN_IF_ERROR(_open_file());
- _t_metadata = &(_file_metadata->to_thrift());
- return Status::OK();
-}
-
void ParquetReader::_init_system_properties() {
if (_scan_range.__isset.file_type) {
// for compatibility
@@ -311,10 +305,8 @@ Status ParquetReader::init_reader(
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_colname_to_value_range = colname_to_value_range;
_hive_use_column_names = hive_use_column_names;
- if (_file_metadata == nullptr) {
- return Status::InternalError("failed to init parquet reader, please
open reader first");
- }
-
+ RETURN_IF_ERROR(_open_file());
+ _t_metadata = &(_file_metadata->to_thrift());
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
@@ -491,11 +483,15 @@ Status ParquetReader::set_fill_columns(
return Status::OK();
}
-Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>*
col_types) {
+// init file reader and file metadata for parsing schema
+Status ParquetReader::init_schema_reader() {
RETURN_IF_ERROR(_open_file());
- _t_metadata = &_file_metadata->to_thrift();
+ _t_metadata = &(_file_metadata->to_thrift());
+ return Status::OK();
+}
+Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>*
col_types) {
_total_groups = _t_metadata->row_groups.size();
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < schema_desc.size(); ++i) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index e24071093b6..d189343e82e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -107,8 +107,6 @@ public:
// for test
void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader =
file_reader; }
- Status open();
-
Status init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
@@ -134,6 +132,8 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status init_schema_reader() override;
+
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index c297904ca41..cd1bded9eac 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -160,6 +160,7 @@ Status IcebergTableReader::_equality_delete_base(
delete_desc.file_size = -1;
std::unique_ptr<GenericReader> delete_reader =
_create_equality_reader(delete_desc);
if (!init_schema) {
+ RETURN_IF_ERROR(delete_reader->init_schema_reader());
RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
&equality_delete_col_types));
_generate_equality_delete_block(&_equality_delete_block,
equality_delete_col_names,
@@ -167,7 +168,6 @@ Status IcebergTableReader::_equality_delete_base(
init_schema = true;
}
if (auto* parquet_reader =
typeid_cast<ParquetReader*>(delete_reader.get())) {
- RETURN_IF_ERROR(parquet_reader->open());
RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names,
not_in_file_col_names,
nullptr, {}, nullptr,
nullptr, nullptr,
nullptr, nullptr, false));
@@ -446,8 +446,6 @@ Status IcebergParquetReader
::_read_position_delete_file(const TFileRangeDesc* d
ParquetReader parquet_delete_reader(
_profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx,
_state);
-
- RETURN_IF_ERROR(parquet_delete_reader.open());
RETURN_IF_ERROR(parquet_delete_reader.init_reader(delete_file_col_names,
{}, nullptr, {},
nullptr, nullptr,
nullptr, nullptr, nullptr,
false));
@@ -542,6 +540,7 @@ Status IcebergOrcReader::_read_position_delete_file(const
TFileRangeDesc* delete
Status IcebergParquetReader::get_file_col_id_to_name(
bool& exist_schema, std::map<int32_t, std::string>&
file_col_id_to_name) {
auto* parquet_reader =
static_cast<ParquetReader*>(_file_format_reader.get());
+ RETURN_IF_ERROR(parquet_reader->init_schema_reader());
FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
if (field_desc.has_parquet_field_id()) {
@@ -561,6 +560,7 @@ Status IcebergOrcReader::get_file_col_id_to_name(
std::vector<std::string> col_names;
std::vector<int32_t> col_ids;
+ RETURN_IF_ERROR(orc_reader->init_schema_reader());
RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
if (!exist_schema) {
diff --git a/be/src/vec/exec/format/wal/wal_reader.h
b/be/src/vec/exec/format/wal/wal_reader.h
index 5834d74efea..8da5e74aa1d 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -24,6 +24,8 @@ namespace doris {
namespace vectorized {
struct ScannerCounter;
class WalReader : public GenericReader {
+ ENABLE_FACTORY_CREATOR(WalReader);
+
public:
WalReader(RuntimeState* state);
~WalReader() override = default;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52850d6e8e2..fb1910bded6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -129,8 +129,6 @@ Status VFileScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& conju
RETURN_IF_ERROR(VScanner::prepare(state, conjuncts));
_get_block_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerGetBlockTime", 1);
- _open_reader_timer =
- ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerOpenReaderTime", 1);
_cast_to_input_block_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerCastInputBlockTime", 1);
_fill_missing_columns_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
@@ -942,10 +940,6 @@ Status VFileScanner::_get_next_reader() {
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
- {
- SCOPED_TIMER(_open_reader_timer);
- RETURN_IF_ERROR(parquet_reader->open());
- }
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
@@ -1110,12 +1104,12 @@ Status VFileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile,
*_params, _file_slot_descs,
range);
- init_status = ((AvroJNIReader*)(_cur_reader.get()))
-
->init_fetch_table_reader(_colname_to_value_range);
+ init_status =
+
((AvroJNIReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_WAL: {
- _cur_reader.reset(new WalReader(_state));
+ _cur_reader = WalReader::create_unique(_state);
init_status =
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 5b1604209d4..6a605da3dca 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -173,7 +173,6 @@ protected:
private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
- RuntimeProfile::Counter* _open_reader_timer = nullptr;
RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
index 423adfd41ce..afa4d7f9d5f 100644
--- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
@@ -150,7 +150,6 @@ TEST_F(ParquetReaderTest, normal) {
runtime_state.set_desc_tbl(desc_tbl);
std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
- static_cast<void>(p_reader->open());
static_cast<void>(p_reader->init_reader(column_names,
missing_column_names, nullptr, {},
nullptr, nullptr, nullptr,
nullptr, nullptr));
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
@@ -231,7 +230,6 @@ TEST_F(ParquetReaderTest, use_column_name) {
colname_to_value_range.emplace("smallint_col",
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
colname_to_value_range.emplace("int_col",
ColumnValueRange<TYPE_INT>("int_col"));
- static_cast<void>(p_reader->open());
static_cast<void>(p_reader->init_reader(table_column_names, {},
&colname_to_value_range, {},
nullptr, nullptr, nullptr,
nullptr, nullptr, false,
use_column_name));
@@ -271,7 +269,6 @@ TEST_F(ParquetReaderTest, use_column_name2) {
colname_to_value_range.emplace("smallint_col",
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
colname_to_value_range.emplace("int_col",
ColumnValueRange<TYPE_INT>("int_col"));
- static_cast<void>(p_reader->open());
static_cast<void>(p_reader->init_reader(table_column_names,
{"boolean_col"},
&colname_to_value_range, {},
nullptr, nullptr, nullptr,
nullptr, nullptr, false,
use_column_name));
@@ -314,7 +311,6 @@ TEST_F(ParquetReaderTest, use_column_idx) {
colname_to_value_range.emplace("col3",
ColumnValueRange<TYPE_SMALLINT>("col3"));
colname_to_value_range.emplace("col102",
ColumnValueRange<TYPE_SMALLINT>("col102"));
- static_cast<void>(p_reader->open());
static_cast<void>(p_reader->init_reader(table_column_names, {},
&colname_to_value_range, {},
nullptr, nullptr, nullptr,
nullptr, nullptr, false,
use_column_name));
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
new file mode 100644
index 00000000000..8a6db90241f
Binary files /dev/null and
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
differ
diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
index a8f5dcf5396..04ec58cdbae 100644
Binary files a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out and
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out differ
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index 74cb1e320aa..8bc8194843d 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -143,6 +143,16 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker")
{
"strip_outer_array" = "false",
"read_json_by_line" = "true") order by id; """
+ uri = "${defaultFS}" +
"/user/doris/preinstalled_data/json_format_test/simple_object_json.json.gz"
+ format = "json"
+ qt_json_compressed """ select * from HDFS(
+ "uri" = "${uri}",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}",
+ "compress_type" = "GZ",
+ "strip_outer_array" = "false",
+ "read_json_by_line" = "true") order by id; """
+
uri = "${defaultFS}" +
"/user/doris/preinstalled_data/json_format_test/simple_object_json.json"
format = "json"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]