This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e8d48413c124b71d30d85192013e5f26d3dc90ee Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 28 21:15:35 2023 +0800 [Feature](avro) Support Apache Avro file format (#19990) support read avro file by hdfs() or s3() . ```sql select * from s3( "uri" = "http://127.0.0.1:9312/test2/person.avro", "ACCESS_KEY" = "ak", "SECRET_KEY" = "sk", "FORMAT" = "avro"); +--------+--------------+-------------+-----------------+ | name | boolean_type | double_type | long_type | +--------+--------------+-------------+-----------------+ | Alyssa | 1 | 10.0012 | 100000000221133 | | Ben | 0 | 5555.999 | 4009990000 | | lisi | 0 | 5992225.999 | 9099933330 | +--------+--------------+-------------+-----------------+ select * from hdfs( "uri" = "hdfs://127.0.0.1:9000/input/person2.avro", "fs.defaultFS" = "hdfs://127.0.0.1:9000", "hadoop.username" = "doris", "format" = "avro"); +--------+--------------+-------------+-----------+ | name | boolean_type | double_type | long_type | +--------+--------------+-------------+-----------+ | Alyssa | 1 | 8888.99999 | 89898989 | +--------+--------------+-------------+-----------+ ``` current avro reader only support common data type, the complex data types will be supported later. --- be/src/service/internal_service.cpp | 9 + be/src/vec/exec/jni_connector.cpp | 21 +- be/src/vec/exec/jni_connector.h | 24 ++ be/src/vec/exec/scan/avro_jni_reader.cpp | 165 ++++++++++++++ be/src/vec/exec/scan/avro_jni_reader.h | 96 ++++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 7 + build.sh | 2 + .../sql-manual/sql-functions/table-functions/s3.md | 14 +- .../sql-manual/sql-functions/table-functions/s3.md | 10 +- .../{hudi-scanner => avro-scanner}/pom.xml | 87 +++----- .../org/apache/doris/avro/AvroColumnValue.java | 162 ++++++++++++++ .../java/org/apache/doris/avro/AvroJNIScanner.java | 247 +++++++++++++++++++++ .../java/org/apache/doris/avro/AvroProperties.java | 40 ++++ .../java/org/apache/doris/avro/AvroReader.java | 37 +++ .../java/org/apache/doris/avro/HDFSFileReader.java | 73 ++++++ .../java/org/apache/doris/avro/S3FileReader.java | 91 ++++++++ .../avro-scanner/src/main/resources/package.xml | 41 ++++ fe/be-java-extensions/hudi-scanner/pom.xml | 19 +- .../java/org/apache/doris/hudi/HudiJniScanner.java | 6 + fe/be-java-extensions/java-common/pom.xml | 4 + .../org/apache/doris/common/jni/JniScanner.java | 13 +- .../apache/doris/common/jni/MockJniScanner.java | 8 +- .../apache/doris/common/jni/vec/TableSchema.java | 83 +++++++ .../doris/maxcompute/MaxComputeJniScanner.java | 7 + .../org/apache/doris/paimon/PaimonJniScanner.java | 7 + fe/be-java-extensions/pom.xml | 1 + .../property/constants/S3Properties.java | 2 + .../ExternalFileTableValuedFunction.java | 3 + .../doris/tablefunction/S3TableValuedFunction.java | 14 +- .../datasource/property/PropertyConverterTest.java | 4 +- gensrc/thrift/PlanNodes.thrift | 1 + 31 files changed, 1210 insertions(+), 88 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 275fa6a4d3..7a936487c4 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -114,6 +114,7 @@ #include "vec/exec/format/json/new_json_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exec/scan/avro_jni_reader.h" #include "vec/jsonb/serialize.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -603,6 +604,14 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c file_slots, &io_ctx); break; } + case TFileFormatType::FORMAT_AVRO: { + // file_slots is no use + std::vector<SlotDescriptor*> file_slots; + reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range, + file_slots); + ((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader(); + break; + } default: st = Status::InternalError("Not supported file format in fetch table schema: {}", params.format_type); diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index d3fafe5974..edb195479a 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -72,12 +72,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { // cannot put the env into fields, because frames in an env object is limited // to avoid limited frames in a thread, we should get local env in a method instead of in whole object. JNIEnv* env = nullptr; + int batch_size = 0; + if (!_is_table_schema) { + batch_size = _state->batch_size(); + } RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); if (env == nullptr) { return Status::InternalError("Failed to get/create JVM"); } SCOPED_TIMER(_open_scanner_time); - RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size())); + RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); // Call org.apache.doris.common.jni.JniScanner#open env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); RETURN_ERROR_IF_EXC(env); @@ -129,6 +133,18 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } +Status JniConnector::get_table_schema(std::string& table_schema_str) { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + // Call org.apache.doris.jni.JniScanner#getTableSchema + // return the TableSchema information + jstring jstr = (jstring)env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_table_schema); + RETURN_ERROR_IF_EXC(env); + table_schema_str = env->GetStringUTFChars(jstr, nullptr); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + std::map<std::string, std::string> JniConnector::get_statistics(JNIEnv* env) { jobject metrics = env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_statistics); std::map<std::string, std::string> result = JniUtil::convert_to_cpp_map(env, metrics); @@ -197,6 +213,9 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V"); _jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, "getNextBatchMeta", "()J"); + _jni_scanner_get_table_schema = + env->GetMethodID(_jni_scanner_cls, "getTableSchema", "()Ljava/lang/String;"); + RETURN_ERROR_IF_EXC(env); _jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V"); _jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseColumn", "(I)V"); _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V"); diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index dccf90d103..0f08fbe0f8 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -169,6 +169,17 @@ public: _connector_name = split(_connector_class, "/").back(); } + /** + * Just use to get the table schema. + * @param connector_class Java scanner class + * @param scanner_params Provided configuration map + */ + JniConnector(std::string connector_class, std::map<std::string, std::string> scanner_params) + : _connector_class(std::move(connector_class)), + _scanner_params(std::move(scanner_params)) { + _is_table_schema = true; + } + /// Should release jni resources if other functions are failed. ~JniConnector(); @@ -206,6 +217,17 @@ public: */ std::map<std::string, std::string> get_statistics(JNIEnv* env); + /** + * Call java side function JniScanner.getTableSchema. + * + * The schema information are stored as a string. + * Use # between column names and column types. + * + * like: col_name1,col_name2,col_name3#col_type1,col_type2.col_type3 + * + */ + Status get_table_schema(std::string& table_schema_str); + /** * Close scanner and release jni resources. */ @@ -223,6 +245,7 @@ private: std::string _connector_class; std::map<std::string, std::string> _scanner_params; std::vector<std::string> _column_names; + bool _is_table_schema = false; RuntimeState* _state; RuntimeProfile* _profile; @@ -238,6 +261,7 @@ private: jobject _jni_scanner_obj; jmethodID _jni_scanner_open; jmethodID _jni_scanner_get_next_batch; + jmethodID _jni_scanner_get_table_schema; jmethodID _jni_scanner_close; jmethodID _jni_scanner_release_column; jmethodID _jni_scanner_release_table; diff --git a/be/src/vec/exec/scan/avro_jni_reader.cpp b/be/src/vec/exec/scan/avro_jni_reader.cpp new file mode 100644 index 0000000000..5d1ef40cbc --- /dev/null +++ b/be/src/vec/exec/scan/avro_jni_reader.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "avro_jni_reader.h" + +#include <map> +#include <ostream> + +#include "runtime/descriptors.h" +#include "runtime/types.h" + +namespace doris::vectorized { + +AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector<SlotDescriptor*>& file_slot_descs) + : _file_slot_descs(file_slot_descs), _state(state), _profile(profile), _params(params) {} + +AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + const std::vector<SlotDescriptor*>& file_slot_descs) + : _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {} + +AvroJNIReader::~AvroJNIReader() = default; + +Status AvroJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof)); + if (*eof) { + RETURN_IF_ERROR(_jni_connector->close()); + } + return Status::OK(); +} + +Status AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + for (auto& desc : _file_slot_descs) { + name_to_type->emplace(desc->col_name(), desc->type()); + } + return Status::OK(); +} + +Status AvroJNIReader::init_fetch_table_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + std::ostringstream required_fields; + std::ostringstream columns_types; + std::vector<std::string> column_names; + int index = 0; + for (auto& desc : _file_slot_descs) { + std::string field = desc->col_name(); + column_names.emplace_back(field); + std::string type = JniConnector::get_hive_type(desc->type()); + if (index == 0) { + required_fields << field; + columns_types << type; + } else { + required_fields << "," << field; + columns_types << "#" << type; + } + index++; + } + + TFileType::type type = _params.file_type; + std::map<String, String> required_param = { + {"required_fields", required_fields.str()}, + {"columns_types", columns_types.str()}, + {"file_type", std::to_string(type)}, + {"is_get_table_schema", "false"}, + {"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}}; + switch (type) { + case TFileType::FILE_HDFS: + required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value)); + break; + case TFileType::FILE_S3: + required_param.insert(_params.properties.begin(), _params.properties.end()); + break; + default: + Status::InternalError("unsupported file reader type: {}", std::to_string(type)); + } + required_param.insert(_params.properties.begin(), _params.properties.end()); + _jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", + required_param, column_names); + RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} + +Status AvroJNIReader::init_fetch_table_schema_reader() { + std::map<String, String> required_param = {{"uri", _range.path}, + {"file_type", std::to_string(_params.file_type)}, + {"is_get_table_schema", "true"}}; + + required_param.insert(_params.properties.begin(), _params.properties.end()); + _jni_connector = + std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", required_param); + return _jni_connector->open(nullptr, _profile); +} + +Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { + std::string table_schema_str; + RETURN_IF_ERROR(_jni_connector->get_table_schema(table_schema_str)); + + rapidjson::Document document; + document.Parse(table_schema_str.c_str()); + if (document.IsArray()) { + for (int i = 0; i < document.Size(); ++i) { + rapidjson::Value& column_schema = document[i]; + col_names->push_back(column_schema["name"].GetString()); + col_types->push_back(convert_to_doris_type(column_schema)); + } + } + return _jni_connector->close(); +} + +TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) { + ::doris::TPrimitiveType::type schema_type = + static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); + switch (schema_type) { + case TPrimitiveType::INT: + case TPrimitiveType::STRING: + case TPrimitiveType::BIGINT: + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: + return TypeDescriptor(thrift_to_type(schema_type)); + case TPrimitiveType::ARRAY: { + TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY); + list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + return list_type; + } + case TPrimitiveType::MAP: { + TypeDescriptor map_type(PrimitiveType::TYPE_MAP); + + // The default type of AVRO MAP structure key is STRING + map_type.add_sub_type(PrimitiveType::TYPE_STRING); + map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + return map_type; + } + default: + return TypeDescriptor(PrimitiveType::INVALID_TYPE); + } +} + +TypeDescriptor AvroJNIReader::convert_complex_type( + const rapidjson::Document::ConstObject child_schema) { + ::doris::TPrimitiveType::type child_schema_type = + static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt()); + return TypeDescriptor(thrift_to_type(child_schema_type)); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/avro_jni_reader.h b/be/src/vec/exec/scan/avro_jni_reader.h new file mode 100644 index 0000000000..70ef859fba --- /dev/null +++ b/be/src/vec/exec/scan/avro_jni_reader.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <rapidjson/document.h> +#include <stddef.h> + +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "common/status.h" +#include "exec/olap_common.h" +#include "vec/exec/format/generic_reader.h" +#include "vec/exec/jni_connector.h" + +namespace doris { +class RuntimeProfile; + +class RuntimeState; + +class SlotDescriptor; +namespace vectorized { +class Block; +} // namespace vectorized +struct TypeDescriptor; +} // namespace doris + +namespace doris::vectorized { + +/** + * Read avro-format file + */ +class AvroJNIReader : public GenericReader { + ENABLE_FACTORY_CREATOR(AvroJNIReader); + +public: + /** + * Call java side by jni to get table data. + */ + AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, + const std::vector<SlotDescriptor*>& file_slot_descs); + + /** + * Call java side by jni to get table schema. + */ + AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs); + + ~AvroJNIReader() override; + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + + Status init_fetch_table_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + + Status init_fetch_table_schema_reader(); + + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) override; + + TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema); + + TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema); + +private: + const std::vector<SlotDescriptor*>& _file_slot_descs; + RuntimeState* _state; + RuntimeProfile* _profile; + const TFileScanRangeParams _params; + const TFileRangeDesc _range; + std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + std::unique_ptr<JniConnector> _jni_connector; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 43c6147dad..359e05624e 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -61,6 +61,7 @@ #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/transactional_hive_reader.h" +#include "vec/exec/scan/avro_jni_reader.h" #include "vec/exec/scan/hudi_jni_reader.h" #include "vec/exec/scan/max_compute_jni_reader.h" #include "vec/exec/scan/new_file_scan_node.h" @@ -715,6 +716,12 @@ Status VFileScanner::_get_next_reader() { ((NewJsonReader*)(_cur_reader.get()))->init_reader(_col_default_value_ctx); break; } + case TFileFormatType::FORMAT_AVRO: { + _cur_reader = AvroJNIReader::create_unique(_state, _profile, _params, _file_slot_descs); + init_status = ((AvroJNIReader*)(_cur_reader.get())) + ->init_fetch_table_reader(_colname_to_value_range); + break; + } default: return Status::InternalError("Not supported file format: {}", _params.format_type); } diff --git a/build.sh b/build.sh index dfc16b7936..260bc1dd11 100755 --- a/build.sh +++ b/build.sh @@ -436,6 +436,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then modules+=("be-java-extensions/jdbc-scanner") modules+=("be-java-extensions/paimon-scanner") modules+=("be-java-extensions/max-compute-scanner") + modules+=("be-java-extensions/avro-scanner") fi FE_MODULES="$( IFS=',' @@ -647,6 +648,7 @@ EOF extensions_modules+=("hudi-scanner") extensions_modules+=("paimon-scanner") extensions_modules+=("max-compute-scanner") + extensions_modules+=("avro-scanner") BE_JAVA_EXTENSIONS_DIR="${DORIS_OUTPUT}/be/lib/java_extensions/" rm -rf "${BE_JAVA_EXTENSIONS_DIR}" diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md index bb937a9624..5743814028 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md @@ -168,7 +168,7 @@ select * from s3( ``` -**csv foramt** +**csv format** `csv` format: Read the file on S3 and process it as a csv file, read the first line in the file to parse out the table schema. The number of columns in the first line of the file `n` will be used as the number of columns in the table schema, and the column names of the table schema will be automatically named `c1, c2, ..., cn`, and the column type is set to `String` , for example: @@ -214,7 +214,7 @@ MySQL [(none)]> Desc function s3("uri" = "http://127.0.0.1:9312/test2/student1.c +-------+------+------+-------+---------+-------+ ``` -**csv_with_names foramt** +**csv_with_names format** `csv_with_names` format: The first line of the file is used as the number and name of the columns of the table schema, and the column type is set to `String`, for example: The file content of student_with_names.csv: @@ -258,9 +258,9 @@ MySQL [(none)]> Desc function s3("uri" = "http://127.0.0.1:9312/test2/student_wi +-------+------+------+-------+---------+-------+ ``` -**csv_with_names_and_types foramt** +**csv_with_names_and_types format** -`csv_with_names_and_types` foramt: Currently, it does not support parsing the column type from a csv file. When using this format, S3 tvf will parse the first line of the file as the number and name of the columns of the table schema, and set the column type to String. Meanwhile, the second line of the file is ignored. +`csv_with_names_and_types` format: Currently, it does not support parsing the column type from a csv file. When using this format, S3 tvf will parse the first line of the file as the number and name of the columns of the table schema, and set the column type to String. Meanwhile, the second line of the file is ignored. The file content of student_with_names_and_types.csv: @@ -304,7 +304,7 @@ MySQL [(none)]> Desc function s3("uri" = "http://127.0.0.1:9312/test2/student_wi +-------+------+------+-------+---------+-------+ ``` -**json foramt** +**json format** `json` format: The json format involves many optional parameters, and the meaning of each parameter can be referred to: [Json Load](../../../data-operate/import/import-way/load-json-format.md). When S3 tvf queries the json format file, it locates a json object according to the `json_root` and `jsonpaths` parameters, and uses the `key` in the object as the column name of the table schema, and sets the column type to String. For example: @@ -352,7 +352,7 @@ MySQL [(none)]> select * from s3( +------+------+ ``` -**parquet foramt** +**parquet format** `parquet` format: S3 tvf supports parsing the column names and column types of the table schema from the parquet file. Example: @@ -396,7 +396,7 @@ MySQL [(none)]> desc function s3( +---------------+--------------+------+-------+---------+-------+ ``` -**orc foramt** +**orc format** `orc` format: Same as `parquet` format, set `format` parameter to orc. diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md index 4ac6c8fa55..66708b0d98 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md @@ -169,7 +169,7 @@ select * from s3( ``` -**csv foramt** +**csv format** 由于S3 table-valued-function事先并不知道table schema,所以会先读一遍文件来解析出table schema。 `csv` 格式: S3 table-valued-function 读取S3上的文件并当作csv文件来处理,读取文件中的第一行用于解析table schema。文件第一行的列个数`n`将作为table schema的列个数,table schema的列名则自动取名为`c1, c2, ..., cn` ,列类型都设置为 `String`, 举例: @@ -216,7 +216,7 @@ MySQL [(none)]> Desc function s3("uri" = "http://127.0.0.1:9312/test2/student1.c +-------+------+------+-------+---------+-------+ ``` -**csv_with_names foramt** +**csv_with_names format** `csv_with_names`格式:解析文件的第一行作为table schema的列个数和列名,列类型则都设置为 `String`, 举例: student_with_names.csv文件内容为 @@ -310,7 +310,7 @@ MySQL [(none)]> Desc function s3("uri" = "http://127.0.0.1:9312/test2/student_wi +-------+------+------+-------+---------+-------+ ``` -**json foramt** +**json format** `json` 格式:json格式涉及到较多的可选参数,各个参数的意义可以参考:[Json Load](../../../data-operate/import/import-way/load-json-format.md)。 S3 tvf查询json格式文件时根据 `json_root` 和 `jsonpaths` 参数定位到一个json对象,将该对象的中的`key` 作为table schema的列名,列类型都设置为String。举例: @@ -358,7 +358,7 @@ MySQL [(none)]> select * from s3( +------+------+ ``` -**parquet foramt** +**parquet format** `parquet` 格式:S3 tvf支持从parquet文件中解析出table schema的列名、列类型。举例: @@ -402,7 +402,7 @@ MySQL [(none)]> desc function s3( +---------------+--------------+------+-------+---------+-------+ ``` -**orc foramt** +**orc format** `orc` 格式:和`parquet` format使用方法一致, 将`format`参数设置为orc。 diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/avro-scanner/pom.xml similarity index 51% copy from fe/be-java-extensions/hudi-scanner/pom.xml copy to fe/be-java-extensions/avro-scanner/pom.xml index 71564e4be3..f4137696c4 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/avro-scanner/pom.xml @@ -17,19 +17,22 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>be-java-extensions</artifactId> <groupId>org.apache.doris</groupId> <version>${revision}</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>hudi-scanner</artifactId> + + <artifactId>avro-scanner</artifactId> <properties> - <doris.home>${basedir}/../../</doris.home> - <fe_ut_parallel>1</fe_ut_parallel> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> @@ -37,74 +40,42 @@ under the License. <groupId>org.apache.doris</groupId> <artifactId>java-common</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <artifactId>fe-common</artifactId> - <groupId>org.apache.doris</groupId> - </exclusion> - </exclusions> </dependency> - <dependency> - <groupId>org.apache.hudi</groupId> - <artifactId>hudi-hadoop-mr-bundle</artifactId> - <version>${hudi.version}</version> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hudi</groupId> - <artifactId>hudi-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-avro</artifactId> - </exclusion> <exclusion> <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <artifactId>hudi-hadoop-mr</artifactId> - <groupId>org.apache.hudi</groupId> + <artifactId>avro-tools</artifactId> </exclusion> </exclusions> </dependency> <dependency> - <groupId>com.facebook.presto.hive</groupId> - <artifactId>hive-apache</artifactId> - <version>${presto.hive.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> </dependency> <dependency> - <groupId>com.facebook.presto.hadoop</groupId> - <artifactId>hadoop-apache2</artifactId> - <version>${presto.hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> </dependency> <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>hive-catalog-shade</artifactId> </dependency> </dependencies> + + <build> - <finalName>hudi-scanner</finalName> + <finalName>avro-scanner</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -131,4 +102,4 @@ under the License. </plugin> </plugins> </build> -</project> +</project> \ No newline at end of file diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java new file mode 100644 index 0000000000..3c796f1fc7 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.vec.ColumnValue; + +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map.Entry; + +public class AvroColumnValue implements ColumnValue { + + private final Object fieldData; + private final ObjectInspector fieldInspector; + + public AvroColumnValue(ObjectInspector fieldInspector, Object fieldData) { + this.fieldInspector = fieldInspector; + this.fieldData = fieldData; + } + + private Object inspectObject() { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); + } + + @Override + public boolean canGetStringAsBytes() { + return false; + } + + @Override + public boolean isNull() { + return false; + } + + @Override + public boolean getBoolean() { + return (boolean) inspectObject(); + } + + @Override + public byte getByte() { + return (byte) inspectObject(); + } + + @Override + public short getShort() { + return (short) inspectObject(); + } + + @Override + public int getInt() { + return (int) inspectObject(); + } + + @Override + public float getFloat() { + return (float) inspectObject(); + } + + @Override + public long getLong() { + return (long) inspectObject(); + } + + @Override + public double getDouble() { + return (double) inspectObject(); + } + + @Override + public BigInteger getBigInteger() { + return null; + } + + @Override + public BigDecimal getDecimal() { + return (BigDecimal) inspectObject(); + } + + @Override + public String getString() { + return inspectObject().toString(); + } + + @Override + public LocalDate getDate() { + // avro has no date type + return null; + } + + @Override + public LocalDateTime getDateTime() { + // avro has no dateTime type + return null; + } + + @Override + public byte[] getBytes() { + return (byte[]) inspectObject(); + } + + @Override + public void unpackArray(List<ColumnValue> values) { + ListObjectInspector inspector = (ListObjectInspector) fieldInspector; + List<?> items = inspector.getList(fieldData); + ObjectInspector itemInspector = inspector.getListElementObjectInspector(); + for (Object item : items) { + AvroColumnValue avroColumnValue = null; + if (item != null) { + avroColumnValue = new AvroColumnValue(itemInspector, item); + } + values.add(avroColumnValue); + } + } + + @Override + public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) { + MapObjectInspector inspector = (MapObjectInspector) fieldInspector; + ObjectInspector keyObjectInspector = inspector.getMapKeyObjectInspector(); + ObjectInspector valueObjectInspector = inspector.getMapValueObjectInspector(); + for (Entry<?, ?> kv : inspector.getMap(fieldData).entrySet()) { + AvroColumnValue avroKey = null; + AvroColumnValue avroValue = null; + if (kv.getKey() != null) { + avroKey = new AvroColumnValue(keyObjectInspector, kv.getKey()); + } + if (kv.getValue() != null) { + avroValue = new AvroColumnValue(valueObjectInspector, kv.getValue()); + } + keys.add(avroKey); + values.add(avroValue); + } + } + + @Override + public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) { + + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java new file mode 100644 index 0000000000..72b9e41416 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPrimitiveType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; + +public class AvroJNIScanner extends JniScanner { + + private static final Logger LOG = LogManager.getLogger(AvroJNIScanner.class); + private final TFileType fileType; + private final String uri; + private final Map<String, String> requiredParams; + private final Integer fetchSize; + private int[] requiredColumnIds; + private String[] columnTypes; + private String[] requiredFields; + private ColumnType[] requiredTypes; + private AvroReader avroReader; + private final boolean isGetTableSchema; + private StructObjectInspector rowInspector; + private Deserializer deserializer; + private StructField[] structFields; + private ObjectInspector[] fieldInspectors; + private String serde; + + /** + * Call by JNI for get table data or get table schema + * + * @param fetchSize The size of data fetched each time + * @param requiredParams required params + */ + public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) { + this.requiredParams = requiredParams; + this.fetchSize = fetchSize; + this.isGetTableSchema = Boolean.parseBoolean(requiredParams.get(AvroProperties.IS_GET_TABLE_SCHEMA)); + this.fileType = TFileType.findByValue(Integer.parseInt(requiredParams.get(AvroProperties.FILE_TYPE))); + this.uri = requiredParams.get(AvroProperties.URI); + if (!isGetTableSchema) { + this.columnTypes = requiredParams.get(AvroProperties.COLUMNS_TYPES) + .split(AvroProperties.COLUMNS_TYPE_DELIMITER); + this.requiredFields = requiredParams.get(AvroProperties.REQUIRED_FIELDS) + .split(AvroProperties.FIELDS_DELIMITER); + this.requiredTypes = new ColumnType[requiredFields.length]; + this.serde = requiredParams.get(AvroProperties.HIVE_SERDE); + this.structFields = new StructField[requiredFields.length]; + this.fieldInspectors = new ObjectInspector[requiredFields.length]; + } + } + + private void init() throws Exception { + requiredColumnIds = new int[requiredFields.length]; + for (int i = 0; i < requiredFields.length; i++) { + ColumnType columnType = ColumnType.parseType(requiredFields[i], columnTypes[i]); + requiredTypes[i] = columnType; + requiredColumnIds[i] = i; + } + + Properties properties = createProperties(); + deserializer = getDeserializer(new Configuration(), properties, this.serde); + rowInspector = (StructObjectInspector) deserializer.getObjectInspector(); + + for (int i = 0; i < requiredFields.length; i++) { + StructField field = rowInspector.getStructFieldRef(requiredFields[i]); + structFields[i] = field; + fieldInspectors[i] = field.getFieldObjectInspector(); + } + } + + public Properties createProperties() { + Properties properties = new Properties(); + properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, + Arrays.stream(this.requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(","))); + properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, String.join(",", requiredFields)); + properties.setProperty(AvroProperties.COLUMNS, String.join(",", requiredFields)); + properties.setProperty(AvroProperties.COLUMNS2TYPES, String.join(",", columnTypes)); + properties.setProperty(serdeConstants.SERIALIZATION_LIB, this.serde); + return properties; + } + + private Deserializer getDeserializer(Configuration configuration, Properties properties, String name) + throws Exception { + Class<? extends Deserializer> deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) + .asSubclass(Deserializer.class); + Deserializer deserializer = deserializerClass.getConstructor().newInstance(); + deserializer.initialize(configuration, properties); + return deserializer; + } + + @Override + public void open() throws IOException { + try { + if (!isGetTableSchema) { + init(); + } + } catch (Exception e) { + LOG.warn("Failed to init avro scanner. ", e); + throw new IOException(e); + } + switch (fileType) { + case FILE_HDFS: + this.avroReader = new HDFSFileReader(uri); + break; + case FILE_S3: + String bucketName = requiredParams.get(AvroProperties.S3_BUCKET); + String key = requiredParams.get(AvroProperties.S3_KEY); + String accessKey = requiredParams.get(AvroProperties.S3_ACCESS_KEY); + String secretKey = requiredParams.get(AvroProperties.S3_SECRET_KEY); + String endpoint = requiredParams.get(AvroProperties.S3_ENDPOINT); + String region = requiredParams.get(AvroProperties.S3_REGION); + this.avroReader = new S3FileReader(accessKey, secretKey, endpoint, region, bucketName, key); + break; + default: + LOG.warn("Unsupported " + fileType.name() + " file type."); + throw new IOException("Unsupported " + fileType.name() + " file type."); + } + this.avroReader.open(new Configuration()); + if (!isGetTableSchema) { + initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); + } + } + + @Override + public void close() throws IOException { + if (Objects.nonNull(avroReader)) { + avroReader.close(); + } + } + + @Override + protected int getNext() throws IOException { + int numRows = 0; + for (; numRows < getBatchSize(); numRows++) { + if (!avroReader.hasNext()) { + break; + } + GenericRecord rowRecord = (GenericRecord) avroReader.getNext(); + for (int i = 0; i < requiredFields.length; i++) { + Object fieldData = rowRecord.get(requiredFields[i]); + if (fieldData == null) { + appendData(i, null); + } else { + AvroColumnValue fieldValue = new AvroColumnValue(fieldInspectors[i], fieldData); + appendData(i, fieldValue); + } + } + } + return numRows; + } + + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + Schema schema = avroReader.getSchema(); + List<Field> schemaFields = schema.getFields(); + List<SchemaColumn> schemaColumns = new ArrayList<>(); + for (Field schemaField : schemaFields) { + Schema avroSchema = schemaField.schema(); + String columnName = schemaField.name().toLowerCase(Locale.ROOT); + + SchemaColumn schemaColumn = new SchemaColumn(); + TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema, schemaColumn); + schemaColumn.setName(columnName); + schemaColumn.setType(tPrimitiveType); + schemaColumns.add(schemaColumn); + } + return new TableSchema(schemaColumns); + } + + private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn schemaColumn) + throws UnsupportedOperationException { + Schema.Type type = avroSchema.getType(); + switch (type) { + case NULL: + return TPrimitiveType.NULL_TYPE; + case STRING: + return TPrimitiveType.STRING; + case INT: + return TPrimitiveType.INT; + case BOOLEAN: + return TPrimitiveType.BOOLEAN; + case LONG: + return TPrimitiveType.BIGINT; + case FLOAT: + return TPrimitiveType.FLOAT; + case BYTES: + return TPrimitiveType.BINARY; + case DOUBLE: + return TPrimitiveType.DOUBLE; + case ARRAY: + SchemaColumn arrayChildColumn = new SchemaColumn(); + schemaColumn.addChildColumn(arrayChildColumn); + arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(), arrayChildColumn)); + return TPrimitiveType.ARRAY; + case MAP: + SchemaColumn mapChildColumn = new SchemaColumn(); + schemaColumn.addChildColumn(mapChildColumn); + mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(), mapChildColumn)); + return TPrimitiveType.MAP; + default: + throw new UnsupportedOperationException("avro format: " + type.getName() + " is not supported."); + } + } + +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java new file mode 100644 index 0000000000..11066b5e08 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +public class AvroProperties { + + protected static final String COLUMNS_TYPE_DELIMITER = "#"; + protected static final String FIELDS_DELIMITER = ","; + + protected static final String IS_GET_TABLE_SCHEMA = "is_get_table_schema"; + protected static final String COLUMNS_TYPES = "columns_types"; + protected static final String REQUIRED_FIELDS = "required_fields"; + protected static final String FILE_TYPE = "file_type"; + protected static final String URI = "uri"; + protected static final String S3_BUCKET = "s3.virtual.bucket"; + protected static final String S3_KEY = "s3.virtual.key"; + protected static final String S3_ACCESS_KEY = "s3.access_key"; + protected static final String S3_SECRET_KEY = "s3.secret_key"; + protected static final String S3_ENDPOINT = "s3.endpoint"; + protected static final String S3_REGION = "s3.region"; + protected static final String HIVE_SERDE = "hive.serde"; + protected static final String COLUMNS = "columns"; + protected static final String COLUMNS2TYPES = "columns.types"; + +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java new file mode 100644 index 0000000000..eb012e402b --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +public interface AvroReader { + + void open(Configuration conf) throws IOException; + + Schema getSchema(); + + boolean hasNext(); + + Object getNext() throws IOException; + + void close() throws IOException; + +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java new file mode 100644 index 0000000000..7200b4dde6 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.net.URI; + +public class HDFSFileReader implements AvroReader { + private static final Logger LOG = LogManager.getLogger(HDFSFileReader.class); + private final Path filePath; + private final String url; + private DataFileStream<GenericRecord> reader; + private BufferedInputStream inputStream; + + public HDFSFileReader(String url) { + this.url = url; + this.filePath = new Path(url); + } + + @Override + public void open(Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(URI.create(url), conf); + inputStream = new BufferedInputStream(fs.open(filePath)); + reader = new DataFileStream<>(inputStream, new GenericDatumReader<>()); + } + + @Override + public Schema getSchema() { + return reader.getSchema(); + } + + @Override + public boolean hasNext() { + return reader.hasNext(); + } + + @Override + public Object getNext() throws IOException { + return reader.next(); + } + + @Override + public void close() throws IOException { + inputStream.close(); + reader.close(); + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java new file mode 100644 index 0000000000..bd966eb31c --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; + +public class S3FileReader implements AvroReader { + + private static final Logger LOG = LogManager.getLogger(S3FileReader.class); + private final String bucketName; + private final String key; + private AmazonS3 s3Client; + private DataFileStream<GenericRecord> reader; + private InputStream s3ObjectInputStream; + private final AWSCredentials credentials; + private final String endpoint; + private final String region; + + public S3FileReader(String accessKey, String secretKey, String endpoint, String region, + String bucketName, String key) { + this.bucketName = bucketName; + this.key = key; + this.endpoint = endpoint; + this.region = region; + credentials = new BasicAWSCredentials(accessKey, secretKey); + } + + @Override + public void open(Configuration conf) throws IOException { + s3Client = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .build(); + S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, key)); + s3ObjectInputStream = object.getObjectContent(); + reader = new DataFileStream<>(s3ObjectInputStream, new GenericDatumReader<>()); + } + + @Override + public Schema getSchema() { + return reader.getSchema(); + } + + @Override + public boolean hasNext() { + return reader.hasNext(); + } + + @Override + public Object getNext() throws IOException { + return reader.next(); + } + + @Override + public void close() throws IOException { + s3ObjectInputStream.close(); + reader.close(); + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/resources/package.xml b/fe/be-java-extensions/avro-scanner/src/main/resources/package.xml new file mode 100644 index 0000000000..4bbb261060 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/resources/package.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <id>jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <unpackOptions> + <excludes> + <exclude>**/Log4j2Plugins.dat</exclude> + </excludes> + </unpackOptions> + </dependencySet> + </dependencySets> +</assembly> diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml index 71564e4be3..098073504e 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/hudi-scanner/pom.xml @@ -88,15 +88,16 @@ under the License. </exclusions> </dependency> <dependency> - <groupId>com.facebook.presto.hadoop</groupId> - <artifactId>hadoop-apache2</artifactId> - <version>${presto.hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 556c6b2e7b..bdc4960ec1 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -20,6 +20,7 @@ package org.apache.doris.hudi; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.doris.common.jni.vec.TableSchema; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.Deserializer; @@ -135,6 +136,11 @@ public class HudiJniScanner extends JniScanner { } } + @Override + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + // do nothing + return null; + } private void init(JobConf jobConf, Properties properties) throws Exception { String basePath = hudiScanParam.getBasePath(); diff --git a/fe/be-java-extensions/java-common/pom.xml b/fe/be-java-extensions/java-common/pom.xml index eedd7c00ad..20ed0104fa 100644 --- a/fe/be-java-extensions/java-common/pom.xml +++ b/fe/be-java-extensions/java-common/pom.xml @@ -55,6 +55,10 @@ under the License. <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> </dependencies> </project> diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java index 89c960bddc..c45b2ac8e5 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java @@ -21,6 +21,7 @@ package org.apache.doris.common.jni; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.common.jni.vec.VectorTable; import java.io.IOException; @@ -43,6 +44,9 @@ public abstract class JniScanner { // Scan data and save as vector table protected abstract int getNext() throws IOException; + // parse table schema + protected abstract TableSchema parseTableSchema() throws UnsupportedOperationException; + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, int batchSize) { this.types = requiredTypes; @@ -63,6 +67,11 @@ public abstract class JniScanner { return vectorTable; } + public String getTableSchema() throws IOException { + TableSchema tableSchema = parseTableSchema(); + return tableSchema.getTableSchema(); + } + public long getNextBatchMeta() throws IOException { if (vectorTable == null) { vectorTable = new VectorTable(types, fields, predicates, batchSize); @@ -95,7 +104,7 @@ public abstract class JniScanner { return vectorTable.getMetaAddress(); } - protected void resetTable() { + public void resetTable() { if (vectorTable != null) { vectorTable.reset(); } @@ -105,7 +114,7 @@ public abstract class JniScanner { vectorTable.releaseColumn(fieldId); } - protected void releaseTable() { + public void releaseTable() { if (vectorTable != null) { vectorTable.close(); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java index 14a412dccb..fc2928f8ed 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java @@ -21,6 +21,7 @@ package org.apache.doris.common.jni; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.TableSchema; import org.apache.log4j.Logger; @@ -143,7 +144,7 @@ public class MockJniScanner extends JniScanner { private static final Logger LOG = Logger.getLogger(MockJniScanner.class); - private final int mockRows; + private int mockRows; private int readRows = 0; private final MockColumnValue columnValue = new MockColumnValue(); @@ -195,4 +196,9 @@ public class MockJniScanner extends JniScanner { readRows += rows; return rows; } + + @Override + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + return null; + } } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java new file mode 100644 index 0000000000..421feb55a3 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.vec; + +import org.apache.doris.thrift.TPrimitiveType; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.List; + +/** + * Used to parse the file structure of table-value-function type. + * like avro file. + */ +public class TableSchema { + private final List<SchemaColumn> schemaColumns; + private final ObjectMapper objectMapper; + + public TableSchema(List<SchemaColumn> schemaColumns) { + this.schemaColumns = schemaColumns; + this.objectMapper = new ObjectMapper(); + } + + public String getTableSchema() throws IOException { + try { + return objectMapper.writeValueAsString(schemaColumns); + } catch (JsonProcessingException e) { + throw new IOException(e); + } + } + + public static class SchemaColumn { + private String name; + private int type; + private SchemaColumn childColumn; + + public SchemaColumn() { + + } + + public String getName() { + return name; + } + + public SchemaColumn getChildColumn() { + return childColumn; + } + + public int getType() { + return type; + } + + public void setName(String name) { + this.name = name; + } + + public void setType(TPrimitiveType type) { + this.type = type.getValue(); + } + + public void addChildColumn(SchemaColumn childColumn) { + this.childColumn = childColumn; + } + } + +} diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 8f9b903afd..6a3d519670 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -20,6 +20,7 @@ package org.apache.doris.maxcompute; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.TableSchema; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; @@ -238,6 +239,12 @@ public class MaxComputeJniScanner extends JniScanner { return realRows; } + @Override + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + // do nothing + return null; + } + private int readVectors(int expectedRows) throws IOException { VectorSchemaRoot batch; int curReadRows = 0; diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 71965344a4..f6cdc436b7 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -21,6 +21,7 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.TableSchema; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.log4j.Logger; @@ -140,6 +141,12 @@ public class PaimonJniScanner extends JniScanner { return rows; } + @Override + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + // do nothing + return null; + } + private Catalog create(CatalogContext context) throws IOException { Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE)); FileIO fileIO; diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index aee6321093..f1ffe1527f 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -27,6 +27,7 @@ under the License. <module>jdbc-scanner</module> <module>paimon-scanner</module> <module>max-compute-scanner</module> + <module>avro-scanner</module> </modules> <parent> diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index f9fe52aeb6..b5a96ddcec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -54,6 +54,8 @@ public class S3Properties extends BaseProperties { // required by storage policy public static final String ROOT_PATH = "s3.root.path"; public static final String BUCKET = "s3.bucket"; + public static final String VIRTUAL_BUCKET = "s3.virtual.bucket"; + public static final String VIRTUAL_KEY = "s3.virtual.key"; public static final String VALIDITY_CHECK = "s3_validity_check"; public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY); public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList(ACCESS_KEY, SECRET_KEY); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 1bd07d0cff..8a28c82ec9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -206,6 +206,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio case "json": this.fileFormatType = TFileFormatType.FORMAT_JSON; break; + case "avro": + this.fileFormatType = TFileFormatType.FORMAT_AVRO; + break; default: throw new AnalysisException("format:" + formatString + " is not supported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 9b820fa185..aa3777652d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -67,6 +67,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { private final S3URI s3uri; private final boolean forceVirtualHosted; private String virtualBucket; + private String virtualKey; public S3TableValuedFunction(Map<String, String> params) throws AnalysisException { Map<String, String> tvfParams = getValidParams(params); @@ -83,12 +84,15 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { credential.setSessionToken(tvfParams.get(S3Properties.SESSION_TOKEN)); } - parseProperties(tvfParams); // set S3 location properties // these five properties is necessary, no one can be lost. locationProperties = S3Properties.credentialToMap(credential); String usePathStyle = tvfParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"); locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); + locationProperties.put(S3Properties.VIRTUAL_BUCKET, virtualBucket); + locationProperties.put(S3Properties.VIRTUAL_KEY, getVirtualKey()); + + parseProperties(tvfParams); if (FeConstants.runningUnitTest) { // Just check FileSystemFactory.getS3FileSystem(locationProperties); @@ -116,6 +120,11 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { return S3Properties.requiredS3TVFProperties(validParams); } + private String getVirtualKey() { + virtualKey = s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); + return virtualKey; + } + private String getEndpointAndSetVirtualBucket(Map<String, String> params) throws AnalysisException { Preconditions.checkState(forceVirtualHosted, "only invoked when force virtual hosted."); String[] fileds = s3uri.getVirtualBucket().split("\\.", 2); @@ -162,8 +171,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { public String getFilePath() { // must be "s3://..." if (forceVirtualHosted) { - return NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM - + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); + return NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM + virtualKey; } return NAME + S3URI.SCHEME_DELIM + s3uri.getKey(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java index ed58d8c4b7..e945eff9b7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java @@ -230,7 +230,7 @@ public class PropertyConverterTest extends TestWithFeService { Assertions.assertEquals(analyzedStmt.getTableRefs().size(), 1); TableValuedFunctionRef oldFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0); S3TableValuedFunction s3Tvf = (S3TableValuedFunction) oldFuncTable.getTableFunction(); - Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 9); + Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 11); String queryNew = "select * from s3(\n" + " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n" @@ -243,7 +243,7 @@ public class PropertyConverterTest extends TestWithFeService { Assertions.assertEquals(analyzedStmtNew.getTableRefs().size(), 1); TableValuedFunctionRef newFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0); S3TableValuedFunction newS3Tvf = (S3TableValuedFunction) newFuncTable.getTableFunction(); - Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 9); + Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 11); } @Test diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6ad67077df..978efee422 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -115,6 +115,7 @@ enum TFileFormatType { FORMAT_JSON, FORMAT_PROTO, FORMAT_JNI, + FORMAT_AVRO, } // In previous versions, the data compression format and file format were stored together, as TFileFormatType, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org