kangpinghuang commented on a change in pull request #3230: URL: https://github.com/apache/incubator-doris/pull/3230#discussion_r419932135
########## File path: be/src/exec/broker_reader.h ########## @@ -52,6 +52,7 @@ class BrokerReader : public FileReader { // Read virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; + virtual Status read_all(uint8_t** buf, size_t *length) override; Review comment: ```suggestion virtual Status read_all(uint8_t** buf, size_t* length) override; ``` ########## File path: be/src/exec/file_reader.h ########## @@ -34,6 +34,16 @@ class FileReader { // is set to zero. virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) = 0; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) = 0; + + /** + * if read eof then return Status::OK and length is set 0 and buf is set NULL, + * other return readed bytes. + * + * !! Important !! + * the buf must be deleted by user, otherwise leak memory + * !! Important !! + */ + virtual Status read_all(uint8_t** buf, size_t *length) = 0; Review comment: comment to explain why add this api ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ Review comment: use #pragma once ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; Review comment: remove it if not used ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to Review comment: ```suggestion // Open this scanner, will initialize information needed ``` ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); Review comment: ```suggestion bool _is_end(); ``` ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); + +private: + rapidjson::Value* _json_values; + rapidjson::Value::ConstValueIterator _iterator; +}; + +class JsonReader { +public: + JsonReader(RuntimeState* state, ScannerCounter* counter, SmallFileMgr *fileMgr, RuntimeProfile* profile, FileReader* file_reader, + std::string& jsonpath, std::string& jsonpath_file); + ~JsonReader(); + Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + +private: + void init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file); + void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); + size_t get_data_by_jsonpath(); + int parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo); + Status parse_json_doc(bool *eof); + Status set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status set_tuple_value_from_map(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool); + void close(); + +private: + static constexpr char const *JSON_PATH = "jsonpath"; Review comment: ```suggestion static constexpr char const* JSON_PATH = "jsonpath"; ``` ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); + +private: + rapidjson::Value* _json_values; + rapidjson::Value::ConstValueIterator _iterator; +}; + +class JsonReader { +public: + JsonReader(RuntimeState* state, ScannerCounter* counter, SmallFileMgr *fileMgr, RuntimeProfile* profile, FileReader* file_reader, + std::string& jsonpath, std::string& jsonpath_file); + ~JsonReader(); + Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + +private: + void init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file); + void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); + size_t get_data_by_jsonpath(); + int parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo); + Status parse_json_doc(bool *eof); + Status set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status set_tuple_value_from_map(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool); + void close(); + +private: + static constexpr char const *JSON_PATH = "jsonpath"; + static constexpr char const *DORIS_RECORDS = "RECORDS"; Review comment: ```suggestion static constexpr char const* DORIS_RECORDS = "RECORDS"; ``` ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); + +private: + rapidjson::Value* _json_values; + rapidjson::Value::ConstValueIterator _iterator; +}; + +class JsonReader { +public: + JsonReader(RuntimeState* state, ScannerCounter* counter, SmallFileMgr *fileMgr, RuntimeProfile* profile, FileReader* file_reader, + std::string& jsonpath, std::string& jsonpath_file); + ~JsonReader(); + Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + +private: + void init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file); + void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); + size_t get_data_by_jsonpath(); + int parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo); + Status parse_json_doc(bool *eof); + Status set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status set_tuple_value_from_map(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid); + Status handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); + Status write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool); + void close(); + +private: + static constexpr char const *JSON_PATH = "jsonpath"; + static constexpr char const *DORIS_RECORDS = "RECORDS"; + int _next_line; + int _total_lines; + RuntimeState* _state; + ScannerCounter* _counter; + RuntimeProfile* _profile; + FileReader*_file_reader; + bool _closed; + /** + * _parse_jsonpath_flag == 1, jsonpath is valid + * _parse_jsonpath_flag == 0, jsonpath is empty, default + * _parse_jsonpath_flag == -1, jsonpath parse is error, it will return ERROR + */ + int _parse_jsonpath_flag; Review comment: ```suggestion short _parse_jsonpath_flag; ``` ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; Review comment: break? ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 3) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[1]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[2], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonpath_file; + st = Env::Default()->new_random_access_file(file_path, &jsonpath_file); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonpath_file->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonpath_file->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read_all(&json_str, &length)); Review comment: I think use OwnedSlice is better. ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; Review comment: why minus 1 here? ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 3) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[1]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[2], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonpath_file; + st = Env::Default()->new_random_access_file(file_path, &jsonpath_file); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonpath_file->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonpath_file->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { Review comment: ```suggestion Status JsonReader::parse_json_doc(bool* eof) { ``` ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { Review comment: ```suggestion int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, const std::string& fileinfo ) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org