github-actions[bot] commented on code in PR #15239: URL: https://github.com/apache/doris/pull/15239#discussion_r1054105791
########## be/src/vec/exec/vjson_scanner.cpp: ########## @@ -118,18 +157,144 @@ Status VJsonScanner<JsonReader>::open_vjson_reader() { return Status::OK(); } +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_based_reader() { + RETURN_IF_ERROR(_open_file_reader()); + if (_read_json_by_line) { + RETURN_IF_ERROR(_open_line_reader()); + } + return Status::OK(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_file_reader() { + const TBrokerRangeDesc& range = _ranges[_next_range]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + if (range.__isset.read_json_by_line) { + _read_json_by_line = range.read_json_by_line; + } + + if (range.file_type == TFileType::FILE_STREAM) { + RETURN_IF_ERROR(FileFactory::create_pipe_reader(range.load_id, _cur_file_reader_s)); + _real_reader = _cur_file_reader_s.get(); + } else { + RETURN_IF_ERROR(FileFactory::create_file_reader( + range.file_type, _state->exec_env(), _profile, _broker_addresses, + _params.properties, range, start_offset, _cur_file_reader)); + _real_reader = _cur_file_reader.get(); + } + _cur_reader_eof = false; + return _real_reader->open(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_line_reader() { + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } + + const TBrokerRangeDesc& range = _ranges[_next_range]; + int64_t size = range.size; + if (range.start_offset != 0) { + size += 1; + _skip_next_line = true; + } else { + _skip_next_line = false; + } + _cur_line_reader = new PlainTextLineReader(_profile, _real_reader, nullptr, size, + _line_delimiter, _line_delimiter_length); + _cur_reader_eof = false; + return Status::OK(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_json_reader() { + if (_cur_json_reader != nullptr) { + delete _cur_json_reader; + _cur_json_reader = nullptr; + } + + std::string json_root = ""; Review Comment: warning: redundant string initialization [readability-redundant-string-init] ```suggestion std::string json_root; ``` ########## be/src/exec/exec_node.cpp: ########## @@ -27,28 +27,6 @@ #include "common/object_pool.h" #include "common/status.h" -#include "exec/analytic_eval_node.h" -#include "exec/assert_num_rows_node.h" -#include "exec/broker_scan_node.h" -#include "exec/cross_join_node.h" -#include "exec/empty_set_node.h" -#include "exec/es_http_scan_node.h" -#include "exec/except_node.h" -#include "exec/exchange_node.h" -#include "exec/hash_join_node.h" -#include "exec/intersect_node.h" -#include "exec/merge_node.h" -#include "exec/mysql_scan_node.h" -#include "exec/odbc_scan_node.h" -#include "exec/olap_scan_node.h" -#include "exec/partitioned_aggregation_node.h" -#include "exec/repeat_node.h" -#include "exec/schema_scan_node.h" -#include "exec/select_node.h" -#include "exec/spill_sort_node.h" -#include "exec/table_function_node.h" -#include "exec/topn_node.h" -#include "exec/union_node.h" #include "exprs/expr_context.h" #include "odbc_scan_node.h" Review Comment: warning: 'odbc_scan_node.h' file not found [clang-diagnostic-error] ```cpp #include "odbc_scan_node.h" ^ ``` ########## be/src/vec/exec/vjson_scanner.cpp: ########## @@ -118,18 +157,144 @@ return Status::OK(); } +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_based_reader() { + RETURN_IF_ERROR(_open_file_reader()); + if (_read_json_by_line) { + RETURN_IF_ERROR(_open_line_reader()); + } + return Status::OK(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_file_reader() { + const TBrokerRangeDesc& range = _ranges[_next_range]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + if (range.__isset.read_json_by_line) { + _read_json_by_line = range.read_json_by_line; + } + + if (range.file_type == TFileType::FILE_STREAM) { + RETURN_IF_ERROR(FileFactory::create_pipe_reader(range.load_id, _cur_file_reader_s)); + _real_reader = _cur_file_reader_s.get(); + } else { + RETURN_IF_ERROR(FileFactory::create_file_reader( + range.file_type, _state->exec_env(), _profile, _broker_addresses, + _params.properties, range, start_offset, _cur_file_reader)); + _real_reader = _cur_file_reader.get(); + } + _cur_reader_eof = false; + return _real_reader->open(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_line_reader() { + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } + + const TBrokerRangeDesc& range = _ranges[_next_range]; + int64_t size = range.size; + if (range.start_offset != 0) { + size += 1; + _skip_next_line = true; + } else { + _skip_next_line = false; + } + _cur_line_reader = new PlainTextLineReader(_profile, _real_reader, nullptr, size, + _line_delimiter, _line_delimiter_length); + _cur_reader_eof = false; + return Status::OK(); +} + +template <typename JsonReader> +Status VJsonScanner<JsonReader>::_open_json_reader() { + if (_cur_json_reader != nullptr) { + delete _cur_json_reader; + _cur_json_reader = nullptr; + } + + std::string json_root = ""; + std::string jsonpath = ""; Review Comment: warning: redundant string initialization [readability-redundant-string-init] ```suggestion std::string jsonpath; ``` ########## be/src/vec/exec/vjson_scanner.h: ########## @@ -112,6 +148,54 @@ class VJsonReader : public JsonReader { Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, std::string col_name, bool* valid); + + void _fill_slot(doris::Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, + const uint8_t* value, int32_t len); + Status _parse_json_doc(size_t* size, bool* eof); + Status _set_tuple_value(rapidjson::Value& objectValue, doris::Tuple* tuple, + const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, + bool* valid); + Status _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, + doris::Tuple* tuple, MemPool* tuple_pool, bool* valid); + std::string _print_json_value(const rapidjson::Value& value); + + void _close(); + Status _generate_json_paths(const std::string& jsonpath, + std::vector<std::vector<JsonPath>>* vect); + Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root); + + int _next_line; + int _total_lines; + RuntimeState* _state; + ScannerCounter* _counter; + RuntimeProfile* _profile; + FileReader* _file_reader; + LineReader* _line_reader; + bool _closed; + bool _strip_outer_array; + bool _num_as_string; + bool _fuzzy_parse; + RuntimeProfile::Counter* _bytes_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _file_read_timer; + + std::vector<std::vector<JsonPath>> _parsed_jsonpaths; + std::vector<JsonPath> _parsed_json_root; + + char _value_buffer[4 * 1024 * 1024]; + char _parse_buffer[512 * 1024]; + + typedef rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>, + rapidjson::MemoryPoolAllocator<>> + Document; Review Comment: warning: use 'using' instead of 'typedef' [modernize-use-using] ```suggestion using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>, rapidjson::MemoryPoolAllocator<>>; ``` ########## be/src/runtime/plan_fragment_executor.cpp: ########## @@ -25,7 +25,6 @@ #include <unordered_map> #include "exec/data_sink.h" -#include "exec/exchange_node.h" #include "exec/exec_node.h" #include "exec/scan_node.h" #include "runtime/data_stream_mgr.h" Review Comment: warning: 'runtime/data_stream_mgr.h' file not found [clang-diagnostic-error] ```cpp #include "runtime/data_stream_mgr.h" ^ ``` ########## be/src/vec/runtime/vparquet_writer.cpp: ########## @@ -36,6 +35,175 @@ namespace doris::vectorized { +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer) + : _file_writer(file_writer), _cur_pos(0), _written_len(0) { + set_mode(arrow::io::FileMode::WRITE); +} + +ParquetOutputStream::~ParquetOutputStream() { + arrow::Status st = Close(); + if (!st.ok()) { + LOG(WARNING) << "close parquet file error: " << st.ToString(); + } +} + +arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + if (_is_closed) { + return arrow::Status::OK(); + } + size_t written_len = 0; + Status st = _file_writer->write(static_cast<const uint8_t*>(data), nbytes, &written_len); + if (!st.ok()) { + return arrow::Status::IOError(st.to_string()); + } + _cur_pos += written_len; + _written_len += written_len; + return arrow::Status::OK(); +} + +arrow::Result<int64_t> ParquetOutputStream::Tell() const { + return _cur_pos; +} + +arrow::Status ParquetOutputStream::Close() { + if (_is_closed) { + return arrow::Status::OK(); + } + Status st = _file_writer->close(); + if (!st.ok()) { + LOG(WARNING) << "close parquet output stream failed: " << st; + return arrow::Status::IOError(st.to_string()); + } + _is_closed = true; + return arrow::Status::OK(); +} + +int64_t ParquetOutputStream::get_written_len() { Review Comment: warning: method 'get_written_len' can be made const [readability-make-member-function-const] ```suggestion int64_t ParquetOutputStream::get_written_len() const { ``` be/src/vec/runtime/vparquet_writer.h:55: ```diff - int64_t get_written_len(); + int64_t get_written_len() const; ``` ########## be/src/vec/runtime/vparquet_writer.h: ########## @@ -34,13 +34,52 @@ #include <string> #include "common/status.h" -#include "exec/parquet_writer.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" #include "vec/runtime/vfile_result_writer.h" namespace doris::vectorized { +class ParquetOutputStream : public arrow::io::OutputStream { +public: + ParquetOutputStream(FileWriter* file_writer); + ParquetOutputStream(FileWriter* file_writer, const int64_t& written_len); + virtual ~ParquetOutputStream(); Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion ~ParquetOutputStream() override; ``` ########## be/src/vec/sink/vresult_sink.h: ########## @@ -35,6 +35,75 @@ class ResultSinkOperator; namespace vectorized { class VExprContext; +struct ResultFileOptions { + // [[deprecated]] + bool is_local_file; + std::string file_path; + TFileFormatType::type file_format; + std::string column_separator; + std::string line_delimiter; + size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB + std::vector<TNetworkAddress> broker_addresses; + std::map<std::string, std::string> broker_properties; + std::string success_file_name = ""; Review Comment: warning: redundant string initialization [readability-redundant-string-init] ```suggestion std::string success_file_name; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org