This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new d86a9dc481 [feature](outfile) support parquet writer (#12492) (#12873) d86a9dc481 is described below commit d86a9dc481b9f9ac5903641b84d44670463d4f2d Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Sep 23 08:52:17 2022 +0800 [feature](outfile) support parquet writer (#12492) (#12873) --- be/src/vec/CMakeLists.txt | 3 +- be/src/vec/runtime/vfile_result_writer.cpp | 43 +- be/src/vec/runtime/vfile_result_writer.h | 8 +- be/src/vec/runtime/vparquet_writer.cpp | 654 +++++++++++++++++++++++++++++ be/src/vec/runtime/vparquet_writer.h | 84 ++++ 5 files changed, 773 insertions(+), 19 deletions(-) diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 56cfdfcc14..a8355d8a0d 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -182,7 +182,8 @@ set(VEC_FILES runtime/vdata_stream_mgr.cpp runtime/vfile_result_writer.cpp runtime/vpartition_info.cpp - runtime/vsorted_run_merger.cpp) + runtime/vsorted_run_merger.cpp + runtime/vparquet_writer.cpp) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 6d4ecb8db1..7bc56607b8 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -56,7 +56,8 @@ VFileResultWriter::VFileResultWriter( _parent_profile(parent_profile), _sinker(sinker), _output_block(output_block), - _output_row_descriptor(output_row_descriptor) { + _output_row_descriptor(output_row_descriptor), + _vparquet_writer(nullptr) { _output_object_data = output_object_data; } @@ -129,7 +130,10 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { // just use file writer is enough break; case TFileFormatType::FORMAT_PARQUET: - return Status::NotSupported("Parquet Writer is not supported yet!"); + _vparquet_writer.reset(new VParquetWriterWrapper( + _file_writer_impl.get(), _output_vexpr_ctxs, _file_opts->file_properties, + _file_opts->schema, _output_object_data)); + RETURN_IF_ERROR(_vparquet_writer->init()); break; default: return Status::InternalError( @@ -195,18 +199,18 @@ Status VFileResultWriter::append_block(Block& block) { return Status::OK(); } SCOPED_TIMER(_append_row_batch_timer); - if (_parquet_writer != nullptr) { - return Status::NotSupported("Parquet Writer is not supported yet!"); + Status status = Status::OK(); + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec + // failed, just return the error status + auto output_block = + VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block, status); + auto num_rows = output_block.rows(); + if (UNLIKELY(num_rows == 0)) { + return status; + } + if (_vparquet_writer) { + _write_parquet_file(output_block); } else { - Status status = Status::OK(); - // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec - // failed, just return the error status - auto output_block = VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, - block, status); - auto num_rows = output_block.rows(); - if (UNLIKELY(num_rows == 0)) { - return status; - } RETURN_IF_ERROR(_write_csv_file(output_block)); } @@ -214,6 +218,12 @@ Status VFileResultWriter::append_block(Block& block) { return Status::OK(); } +Status VFileResultWriter::_write_parquet_file(const Block& block) { + RETURN_IF_ERROR(_vparquet_writer->write(block)); + // split file if exceed limit + return _create_new_file_if_exceed_size(); +} + Status VFileResultWriter::_write_csv_file(const Block& block) { for (size_t i = 0; i < block.rows(); i++) { for (size_t col_id = 0; col_id < block.columns(); col_id++) { @@ -348,8 +358,11 @@ Status VFileResultWriter::_create_new_file_if_exceed_size() { } Status VFileResultWriter::_close_file_writer(bool done) { - if (_parquet_writer != nullptr) { - return Status::NotSupported("Parquet Writer is not supported yet!"); + if (_vparquet_writer) { + _vparquet_writer->close(); + _current_written_bytes = _vparquet_writer->written_len(); + COUNTER_UPDATE(_written_data_bytes, _current_written_bytes); + _vparquet_writer.reset(nullptr); } else if (_file_writer_impl) { _file_writer_impl->close(); } diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h index 5f0bb7971e..a3a4e59a17 100644 --- a/be/src/vec/runtime/vfile_result_writer.h +++ b/be/src/vec/runtime/vfile_result_writer.h @@ -20,6 +20,7 @@ #include "exec/file_writer.h" #include "runtime/file_result_writer.h" #include "vec/sink/result_sink.h" +#include "vec/runtime/vparquet_writer.h" namespace doris { @@ -48,6 +49,7 @@ public: int64_t get_written_rows() const override { return 1; } private: + Status _write_parquet_file(const Block& block); Status _write_csv_file(const Block& block); // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer @@ -81,9 +83,7 @@ private: // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. - std::unique_ptr<FileWriter> _file_writer_impl; - // parquet file writer - ParquetWriterWrapper* _parquet_writer = nullptr; + std::unique_ptr<doris::FileWriter> _file_writer_impl; // Used to buffer the export data of plain text // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling // file writer's write() for every single row. @@ -119,6 +119,8 @@ private: bool _is_result_sent = false; bool _header_sent = false; RowDescriptor _output_row_descriptor; + // parquet file writer + std::unique_ptr<VParquetWriterWrapper> _vparquet_writer; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_writer.cpp new file mode 100644 index 0000000000..8e376a8149 --- /dev/null +++ b/be/src/vec/runtime/vparquet_writer.cpp @@ -0,0 +1,654 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vparquet_writer.h" + +#include <arrow/array.h> +#include <arrow/status.h> +#include <time.h> + +#include "io/file_writer.h" +#include "util/mysql_global.h" +#include "util/types.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exprs/vexpr.h" +#include "vec/functions/function_helpers.h" + +namespace doris::vectorized { + +VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer, + const std::vector<VExprContext*>& output_vexpr_ctxs, + const std::map<std::string, std::string>& properties, + const std::vector<std::vector<std::string>>& schema, + bool output_object_data) + : _output_vexpr_ctxs(output_vexpr_ctxs), + _str_schema(schema), + _cur_written_rows(0), + _rg_writer(nullptr), + _output_object_data(output_object_data) { + _outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer)); + parse_properties(properties); +} + +void VParquetWriterWrapper::parse_properties( + const std::map<std::string, std::string>& propertie_map) { + parquet::WriterProperties::Builder builder; + for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) { + std::string property_name = it->first; + std::string property_value = it->second; + if (property_name == "compression") { + // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 + if (property_value == "snappy") { + builder.compression(parquet::Compression::SNAPPY); + } else if (property_value == "gzip") { + builder.compression(parquet::Compression::GZIP); + } else if (property_value == "brotli") { + builder.compression(parquet::Compression::BROTLI); + } else if (property_value == "zstd") { + builder.compression(parquet::Compression::ZSTD); + } else if (property_value == "lz4") { + builder.compression(parquet::Compression::LZ4); + } else if (property_value == "lzo") { + builder.compression(parquet::Compression::LZO); + } else if (property_value == "bz2") { + builder.compression(parquet::Compression::BZ2); + } else { + builder.compression(parquet::Compression::UNCOMPRESSED); + } + } else if (property_name == "disable_dictionary") { + if (property_value == "true") { + builder.enable_dictionary(); + } else { + builder.disable_dictionary(); + } + } else if (property_name == "version") { + if (property_value == "v1") { + builder.version(parquet::ParquetVersion::PARQUET_1_0); + } else { + builder.version(parquet::ParquetVersion::PARQUET_2_LATEST); + } + } + } + _properties = builder.build(); +} + +Status VParquetWriterWrapper::parse_schema(const std::vector<std::vector<std::string>>& schema) { + parquet::schema::NodeVector fields; + for (auto column = schema.begin(); column != schema.end(); column++) { + std::string repetition_type = (*column)[0]; + parquet::Repetition::type parquet_repetition_type = parquet::Repetition::REQUIRED; + if (repetition_type.find("required") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::REQUIRED; + } else if (repetition_type.find("repeated") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::REPEATED; + } else if (repetition_type.find("optional") != std::string::npos) { + parquet_repetition_type = parquet::Repetition::OPTIONAL; + } else { + parquet_repetition_type = parquet::Repetition::UNDEFINED; + } + + std::string data_type = (*column)[1]; + parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY; + if (data_type == "boolean") { + parquet_data_type = parquet::Type::BOOLEAN; + } else if (data_type.find("int32") != std::string::npos) { + parquet_data_type = parquet::Type::INT32; + } else if (data_type.find("int64") != std::string::npos) { + parquet_data_type = parquet::Type::INT64; + } else if (data_type.find("int96") != std::string::npos) { + parquet_data_type = parquet::Type::INT96; + } else if (data_type.find("float") != std::string::npos) { + parquet_data_type = parquet::Type::FLOAT; + } else if (data_type.find("double") != std::string::npos) { + parquet_data_type = parquet::Type::DOUBLE; + } else if (data_type.find("byte_array") != std::string::npos) { + parquet_data_type = parquet::Type::BYTE_ARRAY; + } else if (data_type.find("fixed_len_byte_array") != std::string::npos) { + parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY; + } else { + parquet_data_type = parquet::Type::UNDEFINED; + } + + std::string column_name = (*column)[2]; + fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, parquet_repetition_type, + parquet::LogicalType::None(), + parquet_data_type)); + _schema = std::static_pointer_cast<parquet::schema::GroupNode>( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); + } + return Status::OK(); +} + +Status VParquetWriterWrapper::init() { + RETURN_IF_ERROR(parse_schema(_str_schema)); + RETURN_IF_ERROR(init_parquet_writer()); + RETURN_IF_ERROR(validate_schema()); + return Status::OK(); +} + +Status VParquetWriterWrapper::validate_schema() { + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + switch (_output_vexpr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: { + if (_str_schema[i][1] != "boolean") { + return Status::InvalidArgument( + "project field type is boolean, " + "but the definition type of column {} is {}", + _str_schema[i][2], _str_schema[i][1]); + } + break; + } + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: { + if (_str_schema[i][1] != "int32") { + return Status::InvalidArgument( + "project field type is {}, should use int32," + " but the definition type of column {} is {}", + _output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2], + _str_schema[i][1]); + } + break; + } + case TYPE_LARGEINT: { + return Status::InvalidArgument("do not support large int type."); + } + case TYPE_FLOAT: { + if (_str_schema[i][1] != "float") { + return Status::InvalidArgument( + "project field type is float, " + "but the definition type of column {} is {}", + _str_schema[i][2], _str_schema[i][1]); + } + break; + } + case TYPE_DOUBLE: { + if (_str_schema[i][1] != "double") { + return Status::InvalidArgument( + "project field type is double, " + "but the definition type of column {} is {}", + _str_schema[i][2], _str_schema[i][1]); + } + break; + } + case TYPE_BIGINT: + case TYPE_DATETIME: + case TYPE_DATE: + case TYPE_DATEV2: + case TYPE_DATETIMEV2: { + if (_str_schema[i][1] != "int64") { + return Status::InvalidArgument( + "project field type is {}, should use int64, " + "but the definition type of column {} is {}", + _output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2], + _str_schema[i][1]); + } + break; + } + case TYPE_HLL: + case TYPE_OBJECT: { + if (!_output_object_data) { + return Status::InvalidArgument( + "Invalid expression type: {}", + _output_vexpr_ctxs[i]->root()->type().debug_string()); + } + [[fallthrough]]; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128: { + if (_str_schema[i][1] != "byte_array") { + return Status::InvalidArgument( + "project field type is {}, should use byte_array, " + "but the definition type of column {} is {}", + _output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2], + _str_schema[i][1]); + } + break; + } + default: { + return Status::InvalidArgument("Invalid expression type: {}", + _output_vexpr_ctxs[i]->root()->type().debug_string()); + } + } + } + return Status::OK(); +} + +#define RETURN_WRONG_TYPE \ + return Status::InvalidArgument("Invalid column type: {}", raw_column->get_name()); + +#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE) \ + parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ + parquet::WRITER* col_writer = static_cast<parquet::WRITER*>(rgWriter->column(i)); \ + __int128 default_value = 0; \ + if (null_map != nullptr) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + col_writer->WriteBatch(1, nullptr, nullptr, \ + (*null_map)[row_id] != 0 \ + ? reinterpret_cast<const NATIVE_TYPE*>(&default_value) \ + : reinterpret_cast<const NATIVE_TYPE*>( \ + assert_cast<const COLUMN_TYPE&>(*col) \ + .get_data_at(row_id) \ + .data)); \ + } \ + } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ + col_writer->WriteBatch( \ + sz, nullptr, nullptr, \ + reinterpret_cast<const NATIVE_TYPE*>(not_nullable_column->get_data().data())); \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE) \ + parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ + parquet::ByteArrayWriter* col_writer = \ + static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \ + parquet::ByteArray value; \ + auto decimal_type = \ + check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get()); \ + DCHECK(decimal_type); \ + if (null_map != nullptr) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if ((*null_map)[row_id] != 0) { \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } else { \ + auto s = decimal_type->to_string(*col, row_id); \ + value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ + value.len = s.size(); \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } \ + } \ + } else { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + auto s = decimal_type->to_string(*col, row_id); \ + value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ + value.len = s.size(); \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } \ + } + +#define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE) \ + parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ + parquet::ByteArrayWriter* col_writer = \ + static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \ + if (null_map != nullptr) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if ((*null_map)[row_id] != 0) { \ + parquet::ByteArray value; \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } else { \ + const auto& tmp = col->get_data_at(row_id); \ + parquet::ByteArray value; \ + value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ + value.len = tmp.size; \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } \ + } \ + } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + const auto& tmp = not_nullable_column->get_data_at(row_id); \ + parquet::ByteArray value; \ + value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ + value.len = tmp.size; \ + col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +Status VParquetWriterWrapper::write(const Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + size_t sz = block.rows(); + try { + for (size_t i = 0; i < block.columns(); i++) { + auto& raw_column = block.get_by_position(i).column; + const auto col = raw_column->is_nullable() + ? reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->get_nested_column_ptr() + .get() + : block.get_by_position(i).column.get(); + auto null_map = + raw_column->is_nullable() && reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->get_null_map_column_ptr() + ->has_null() + ? reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->get_null_map_column_ptr() + : nullptr; + auto& type = block.get_by_position(i).type; + switch (_output_vexpr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: { + DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, ColumnVector<UInt8>, bool) + break; + } + case TYPE_BIGINT: { + DISPATCH_PARQUET_NUMERIC_WRITER(Int64Writer, ColumnVector<Int64>, int64_t) + break; + } + case TYPE_LARGEINT: { + return Status::InvalidArgument("do not support large int type."); + } + case TYPE_FLOAT: { + DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter, ColumnVector<Float32>, float_t) + break; + } + case TYPE_DOUBLE: { + DISPATCH_PARQUET_NUMERIC_WRITER(DoubleWriter, ColumnVector<Float64>, double_t) + break; + } + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = + static_cast<parquet::Int32Writer*>(rgWriter->column(i)); + int32_t default_int32 = 0; + if (null_map != nullptr) { + if (const auto* nested_column = + check_and_get_column<const ColumnVector<Int32>>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + col_writer->WriteBatch( + 1, nullptr, nullptr, + (*null_map)[row_id] != 0 + ? &default_int32 + : reinterpret_cast<const int32_t*>( + nested_column->get_data_at(row_id).data)); + } + } else if (const auto* int16_column = + check_and_get_column<const ColumnVector<Int16>>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const int32_t tmp = int16_column->get_data()[row_id]; + col_writer->WriteBatch( + 1, nullptr, nullptr, + (*null_map)[row_id] != 0 + ? &default_int32 + : reinterpret_cast<const int32_t*>(&tmp)); + } + } else if (const auto* int8_column = + check_and_get_column<const ColumnVector<Int8>>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const int32_t tmp = int8_column->get_data()[row_id]; + col_writer->WriteBatch( + 1, nullptr, nullptr, + (*null_map)[row_id] != 0 + ? &default_int32 + : reinterpret_cast<const int32_t*>(&tmp)); + } + } else { + RETURN_WRONG_TYPE + } + } else if (const auto* not_nullable_column = + check_and_get_column<const ColumnVector<Int32>>(col)) { + col_writer->WriteBatch(sz, nullptr, nullptr, + reinterpret_cast<const int32_t*>( + not_nullable_column->get_data().data())); + } else if (const auto& int16_column = + check_and_get_column<const ColumnVector<Int16>>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const int32_t tmp = int16_column->get_data()[row_id]; + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast<const int32_t*>(&tmp)); + } + } else if (const auto& int8_column = + check_and_get_column<const ColumnVector<Int8>>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const int32_t tmp = int8_column->get_data()[row_id]; + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast<const int32_t*>(&tmp)); + } + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_DATETIME: + case TYPE_DATE: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast<parquet::Int64Writer*>(rgWriter->column(i)); + int64_t default_int64 = 0; + if (null_map != nullptr) { + for (size_t row_id = 0; row_id < sz; row_id++) { + if ((*null_map)[row_id] != 0) { + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + } else { + const auto tmp = binary_cast<Int64, VecDateTimeValue>( + assert_cast<const ColumnVector<Int64>&>(*col) + .get_data()[row_id]) + .to_olap_datetime(); + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast<const int64_t*>(&tmp)); + } + } + } else if (const auto* not_nullable_column = + check_and_get_column<const ColumnVector<Int64>>(col)) { + std::vector<uint64_t> res(sz); + for (size_t row_id = 0; row_id < sz; row_id++) { + res[row_id] = binary_cast<Int64, VecDateTimeValue>( + not_nullable_column->get_data()[row_id]) + .to_olap_datetime(); + } + col_writer->WriteBatch(sz, nullptr, nullptr, + reinterpret_cast<const int64_t*>(res.data())); + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_DATEV2: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast<parquet::Int64Writer*>(rgWriter->column(i)); + int64_t default_int64 = 0; + if (null_map != nullptr) { + for (size_t row_id = 0; row_id < sz; row_id++) { + if ((*null_map)[row_id] != 0) { + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + } else { + uint64_t tmp = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( + assert_cast<const ColumnVector<UInt32>&>(*col) + .get_data()[row_id]) + .to_olap_datetime(); + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast<const int64_t*>(&tmp)); + } + } + } else if (const auto* not_nullable_column = + check_and_get_column<const ColumnVector<UInt32>>(col)) { + std::vector<uint64_t> res(sz); + for (size_t row_id = 0; row_id < sz; row_id++) { + res[row_id] = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( + not_nullable_column->get_data()[row_id]) + .to_olap_datetime(); + } + col_writer->WriteBatch(sz, nullptr, nullptr, + reinterpret_cast<const int64_t*>(res.data())); + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_DATETIMEV2: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast<parquet::Int64Writer*>(rgWriter->column(i)); + int64_t default_int64 = 0; + if (null_map != nullptr) { + for (size_t row_id = 0; row_id < sz; row_id++) { + if ((*null_map)[row_id] != 0) { + col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + } else { + uint64_t tmp = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( + assert_cast<const ColumnVector<UInt64>&>(*col) + .get_data()[row_id]) + .to_olap_datetime(); + col_writer->WriteBatch(1, nullptr, nullptr, + reinterpret_cast<const int64_t*>(&tmp)); + } + } + } else if (const auto* not_nullable_column = + check_and_get_column<const ColumnVector<UInt64>>(col)) { + std::vector<uint64_t> res(sz); + for (size_t row_id = 0; row_id < sz; row_id++) { + res[row_id] = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( + not_nullable_column->get_data()[row_id]) + .to_olap_datetime(); + } + col_writer->WriteBatch(sz, nullptr, nullptr, + reinterpret_cast<const int64_t*>(res.data())); + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_OBJECT: { + DISPATCH_PARQUET_COMPLEX_WRITER(ColumnBitmap) + break; + } + case TYPE_HLL: { + DISPATCH_PARQUET_COMPLEX_WRITER(ColumnHLL) + break; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + DISPATCH_PARQUET_COMPLEX_WRITER(ColumnString) + break; + } + case TYPE_DECIMALV2: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); + parquet::ByteArray value; + if (null_map != nullptr) { + for (size_t row_id = 0; row_id < sz; row_id++) { + if ((*null_map)[row_id] != 0) { + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } else { + const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>( + col->get_data_at(row_id).data) + ->value); + char decimal_buffer[MAX_DECIMAL_WIDTH]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); + value.len = decimal_val.to_buffer(decimal_buffer, output_scale); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + } + } else if (const auto* not_nullable_column = + check_and_get_column<const ColumnDecimal128>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const DecimalV2Value decimal_val( + reinterpret_cast<const PackedInt128*>( + not_nullable_column->get_data_at(row_id).data) + ->value); + char decimal_buffer[MAX_DECIMAL_WIDTH]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); + value.len = decimal_val.to_buffer(decimal_buffer, output_scale); + col_writer->WriteBatch(1, nullptr, nullptr, &value); + } + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_DECIMAL32: { + DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32) + break; + } + case TYPE_DECIMAL64: { + DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64) + break; + } + case TYPE_DECIMAL128: { + DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128) + break; + } + default: { + return Status::InvalidArgument( + "Invalid expression type: {}", + _output_vexpr_ctxs[i]->root()->type().debug_string()); + } + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Parquet write error: " << e.what(); + return Status::InternalError(e.what()); + } + _cur_written_rows += sz; + return Status::OK(); +} + +Status VParquetWriterWrapper::init_parquet_writer() { + _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); + if (_writer == nullptr) { + return Status::InternalError("Failed to create file writer"); + } + return Status::OK(); +} + +parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() { + if (_rg_writer == nullptr) { + _rg_writer = _writer->AppendBufferedRowGroup(); + } + if (_cur_written_rows > _max_row_per_group) { + _rg_writer->Close(); + _rg_writer = _writer->AppendBufferedRowGroup(); + _cur_written_rows = 0; + } + return _rg_writer; +} + +int64_t VParquetWriterWrapper::written_len() { + return _outstream->get_written_len(); +} + +void VParquetWriterWrapper::close() { + try { + if (_rg_writer != nullptr) { + _rg_writer->Close(); + _rg_writer = nullptr; + } + _writer->Close(); + arrow::Status st = _outstream->Close(); + if (!st.ok()) { + LOG(WARNING) << "close parquet file error: " << st.ToString(); + } + } catch (const std::exception& e) { + _rg_writer = nullptr; + LOG(WARNING) << "Parquet writer close error: " << e.what(); + } +} + +VParquetWriterWrapper::~VParquetWriterWrapper() {} + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_writer.h new file mode 100644 index 0000000000..111ad6d689 --- /dev/null +++ b/be/src/vec/runtime/vparquet_writer.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <arrow/api.h> +#include <arrow/buffer.h> +#include <arrow/io/api.h> +#include <arrow/io/file.h> +#include <arrow/io/interfaces.h> +#include <parquet/api/reader.h> +#include <parquet/api/writer.h> +#include <parquet/arrow/reader.h> +#include <parquet/arrow/writer.h> +#include <parquet/exception.h> +#include <stdint.h> + +#include <map> +#include <string> + +#include "common/status.h" +#include "exec/parquet_writer.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { +class FileWriter; + +// a wrapper of parquet output stream +class VParquetWriterWrapper { +public: + VParquetWriterWrapper(doris::FileWriter* file_writer, + const std::vector<VExprContext*>& output_vexpr_ctxs, + const std::map<std::string, std::string>& properties, + const std::vector<std::vector<std::string>>& schema, + bool output_object_data); + virtual ~VParquetWriterWrapper(); + + Status init(); + + Status validate_schema(); + + Status write(const Block& block); + + Status init_parquet_writer(); + + void close(); + + void parse_properties(const std::map<std::string, std::string>& propertie_map); + + Status parse_schema(const std::vector<std::vector<std::string>>& schema); + + parquet::RowGroupWriter* get_rg_writer(); + + int64_t written_len(); + +private: + std::shared_ptr<ParquetOutputStream> _outstream; + std::shared_ptr<parquet::WriterProperties> _properties; + std::shared_ptr<parquet::schema::GroupNode> _schema; + std::unique_ptr<parquet::ParquetFileWriter> _writer; + const std::vector<VExprContext*>& _output_vexpr_ctxs; + std::vector<std::vector<std::string>> _str_schema; + int64_t _cur_written_rows = 0; + parquet::RowGroupWriter* _rg_writer; + const int64_t _max_row_per_group = 10; + bool _output_object_data; +}; + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org