This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec055e1acb [feature](new file reader) Integrate new file reader 
(#15175)
ec055e1acb is described below

commit ec055e1acbcc040d97ed3d15b88af2a695a4078b
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Mon Dec 26 08:55:52 2022 +0800

    [feature](new file reader) Integrate new file reader (#15175)
---
 be/src/http/action/stream_load.cpp                 |  13 +-
 be/src/io/CMakeLists.txt                           |   3 +-
 be/src/io/buffered_reader.cpp                      |  12 +-
 be/src/io/buffered_reader.h                        |   5 +-
 be/src/io/file_factory.cpp                         | 142 ++++++++++---
 be/src/io/file_factory.h                           |  41 ++++
 be/src/io/fs/hdfs_file_reader.cpp                  |  11 +-
 be/src/io/fs/hdfs_file_system.cpp                  |  42 ++--
 be/src/io/fs/hdfs_file_system.h                    |   2 +-
 .../fs/kafka_consumer_pipe.h}                      |  11 +-
 ...m_load_pipe_reader.cpp => stream_load_pipe.cpp} |  53 +++--
 ...tream_load_pipe_reader.h => stream_load_pipe.h} |  16 +-
 be/src/io/hdfs_reader_writer.cpp                   |  44 ----
 be/src/runtime/exec_env.h                          |   9 +
 be/src/runtime/fragment_mgr.cpp                    |  23 +--
 be/src/runtime/fragment_mgr.h                      |   9 +-
 be/src/runtime/routine_load/data_consumer.h        |   2 -
 .../runtime/routine_load/data_consumer_group.cpp   |  12 +-
 .../routine_load/routine_load_task_executor.cpp    |  28 +--
 be/src/runtime/stream_load/new_load_stream_mgr.cpp |   4 +-
 be/src/runtime/stream_load/new_load_stream_mgr.h   |   8 +-
 be/src/runtime/stream_load/stream_load_context.h   |   3 +-
 be/src/util/hdfs_storage_backend.cpp               |   5 +-
 be/src/vec/CMakeLists.txt                          |   2 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          |  65 ++++--
 be/src/vec/exec/format/csv/csv_reader.h            |  12 +-
 .../exec/format/file_reader/new_file_factory.cpp   | 201 ------------------
 .../vec/exec/format/file_reader/new_file_factory.h | 115 -----------
 .../file_reader/new_plain_binary_line_reader.cpp   |  71 +++++++
 .../file_reader/new_plain_binary_line_reader.h}    |  27 ++-
 .../file_reader/new_plain_text_line_reader.cpp     |  18 +-
 .../file_reader/new_plain_text_line_reader.h       |   8 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |  79 ++++---
 be/src/vec/exec/format/json/new_json_reader.h      |  19 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  57 +++--
 be/src/vec/exec/format/orc/vorc_reader.h           |  17 +-
 .../vec/exec/format/parquet/parquet_thrift_util.h  |  18 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  10 +-
 .../exec/format/parquet/vparquet_column_reader.h   |   8 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |   2 +-
 .../exec/format/parquet/vparquet_group_reader.h    |   5 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  36 ++--
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   8 +-
 .../runtime/routine_load_task_executor_test.cpp    |  22 +-
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |   7 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  78 ++++---
 .../data/load_p0/stream_load/test_json_load.out    | 165 ---------------
 .../load_p0/stream_load/test_json_load.groovy      | 230 ++-------------------
 48 files changed, 708 insertions(+), 1070 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 5097de5c0d..6595bb6d90 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -40,16 +40,16 @@
 #include "http/http_request.h"
 #include "http/http_response.h"
 #include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
 #include "olap/storage_engine.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/plan_fragment_executor.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "runtime/stream_load/stream_load_executor.h"
-#include "runtime/stream_load/stream_load_pipe.h"
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/byte_buffer.h"
 #include "util/debug_util.h"
@@ -396,10 +396,11 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
     request.__set_header_type(ctx->header_type);
     request.__set_loadId(ctx->id.to_thrift());
     if (ctx->use_streaming) {
-        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* 
max_buffered_bytes */,
-                                                     64 * 1024 /* 
min_chunk_size */,
-                                                     ctx->body_bytes /* 
total_length */);
-        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
+        auto pipe = std::make_shared<io::StreamLoadPipe>(
+                kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+                ctx->body_bytes /* total_length */);
+        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
+
         request.fileType = TFileType::FILE_STREAM;
         ctx->body_sink = pipe;
     } else {
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index c19188deb4..a8ea5ff232 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -28,7 +28,6 @@ set(IO_FILES
     file_factory.cpp
     hdfs_builder.cpp
     hdfs_file_reader.cpp
-    hdfs_reader_writer.cpp
     hdfs_writer.cpp
     local_file_reader.cpp
     local_file_writer.cpp
@@ -45,7 +44,7 @@ set(IO_FILES
     fs/hdfs_file_reader.cpp
     fs/broker_file_system.cpp
     fs/broker_file_reader.cpp
-    fs/stream_load_pipe_reader.cpp
+    fs/stream_load_pipe.cpp
     cache/dummy_file_cache.cpp
     cache/file_cache.cpp
     cache/file_cache_manager.cpp
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index 021f0b9d23..d3944532cf 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -21,6 +21,7 @@
 #include <sstream>
 
 #include "common/config.h"
+#include "olap/iterators.h"
 #include "olap/olap_define.h"
 #include "util/bit_util.h"
 
@@ -185,7 +186,7 @@ bool BufferedReader::closed() {
     return _reader->closed();
 }
 
-BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t 
offset,
+BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, 
uint64_t offset,
                                                    uint64_t length, size_t 
max_buf_size)
         : _file(file),
           _file_start_offset(offset),
@@ -223,11 +224,12 @@ Status BufferedFileStreamReader::read_bytes(const 
uint8_t** buf, uint64_t offset
     int64_t has_read = 0;
     SCOPED_RAW_TIMER(&_statistics.read_time);
     while (has_read < to_read) {
-        int64_t loop_read = 0;
-        RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - 
has_read, &loop_read,
-                                      _buf.get() + buf_remaining + has_read));
+        size_t loop_read = 0;
+        Slice resutl(_buf.get() + buf_remaining + has_read, to_read - 
has_read);
+        IOContext io_context;
+        RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, resutl, 
io_context, &loop_read));
         _statistics.read_calls++;
-        if (loop_read <= 0) {
+        if (loop_read == 0) {
             break;
         }
         has_read += loop_read;
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index abcf24916e..503f5af5ae 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -23,6 +23,7 @@
 
 #include "common/status.h"
 #include "io/file_reader.h"
+#include "io/fs/file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 
@@ -113,7 +114,7 @@ protected:
 
 class BufferedFileStreamReader : public BufferedStreamReader {
 public:
-    BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t 
length,
+    BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, 
uint64_t length,
                              size_t max_buf_size);
     ~BufferedFileStreamReader() override = default;
 
@@ -122,7 +123,7 @@ public:
 
 private:
     std::unique_ptr<uint8_t[]> _buf;
-    FileReader* _file;
+    io::FileReaderSPtr _file;
     uint64_t _file_start_offset;
     uint64_t _file_end_offset;
 
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 61dbfbfbf4..2c8b56ea50 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -17,22 +17,32 @@
 
 #include "io/file_factory.h"
 
+#include "common/status.h"
 #include "io/broker_reader.h"
 #include "io/broker_writer.h"
 #include "io/buffered_reader.h"
-#include "io/hdfs_reader_writer.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "io/hdfs_file_reader.h"
+#include "io/hdfs_writer.h"
 #include "io/local_file_reader.h"
 #include "io/local_file_writer.h"
 #include "io/s3_reader.h"
 #include "io/s3_writer.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
 
-doris::Status doris::FileFactory::create_file_writer(
-        TFileType::type type, doris::ExecEnv* env,
-        const std::vector<TNetworkAddress>& broker_addresses,
-        const std::map<std::string, std::string>& properties, const 
std::string& path,
-        int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) {
+namespace doris {
+
+Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
+                                       const std::vector<TNetworkAddress>& 
broker_addresses,
+                                       const std::map<std::string, 
std::string>& properties,
+                                       const std::string& path, int64_t 
start_offset,
+                                       std::unique_ptr<FileWriter>& 
file_writer) {
     switch (type) {
     case TFileType::FILE_LOCAL: {
         file_writer.reset(new LocalFileWriter(path, start_offset));
@@ -47,7 +57,7 @@ doris::Status doris::FileFactory::create_file_writer(
         break;
     }
     case TFileType::FILE_HDFS: {
-        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+        RETURN_IF_ERROR(create_hdfs_writer(
                 const_cast<std::map<std::string, std::string>&>(properties), 
path, file_writer));
         break;
     }
@@ -60,11 +70,11 @@ doris::Status doris::FileFactory::create_file_writer(
 
 // ============================
 // broker scan node/unique ptr
-doris::Status doris::FileFactory::create_file_reader(
-        doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* 
profile,
-        const std::vector<TNetworkAddress>& broker_addresses,
-        const std::map<std::string, std::string>& properties, const 
doris::TBrokerRangeDesc& range,
-        int64_t start_offset, std::unique_ptr<FileReader>& file_reader) {
+Status FileFactory::create_file_reader(TFileType::type type, ExecEnv* env, 
RuntimeProfile* profile,
+                                       const std::vector<TNetworkAddress>& 
broker_addresses,
+                                       const std::map<std::string, 
std::string>& properties,
+                                       const TBrokerRangeDesc& range, int64_t 
start_offset,
+                                       std::unique_ptr<FileReader>& 
file_reader) {
     FileReader* file_reader_ptr;
     switch (type) {
     case TFileType::FILE_LOCAL: {
@@ -85,8 +95,7 @@ doris::Status doris::FileFactory::create_file_reader(
     }
     case TFileType::FILE_HDFS: {
         FileReader* hdfs_reader = nullptr;
-        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, 
range.path, start_offset,
-                                                        &hdfs_reader));
+        hdfs_reader = new HdfsFileReader(range.hdfs_params, range.path, 
start_offset);
         file_reader_ptr = new BufferedReader(profile, hdfs_reader);
         break;
     }
@@ -100,13 +109,12 @@ doris::Status doris::FileFactory::create_file_reader(
 
 // ============================
 // file scan node/unique ptr
-doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile,
-                                                     const 
TFileScanRangeParams& params,
-                                                     const std::string& path, 
int64_t start_offset,
-                                                     int64_t file_size, 
int64_t buffer_size,
-                                                     
std::unique_ptr<FileReader>& file_reader) {
+Status FileFactory::create_file_reader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                                       const std::string& path, int64_t 
start_offset,
+                                       int64_t file_size, int64_t buffer_size,
+                                       std::unique_ptr<FileReader>& 
file_reader) {
     FileReader* file_reader_ptr;
-    doris::TFileType::type type = params.file_type;
+    TFileType::type type = params.file_type;
     switch (type) {
     case TFileType::FILE_LOCAL: {
         file_reader_ptr = new LocalFileReader(path, start_offset);
@@ -117,8 +125,7 @@ doris::Status 
doris::FileFactory::create_file_reader(RuntimeProfile* profile,
         break;
     }
     case TFileType::FILE_HDFS: {
-        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, 
path, start_offset,
-                                                        &file_reader_ptr));
+        file_reader_ptr = new HdfsFileReader(params.hdfs_params, path, 
start_offset);
         break;
     }
     case TFileType::FILE_BROKER: {
@@ -138,12 +145,99 @@ doris::Status 
doris::FileFactory::create_file_reader(RuntimeProfile* profile,
     return Status::OK();
 }
 
+Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
+                                       const FileSystemProperties& 
system_properties,
+                                       const FileDescription& file_description,
+                                       std::unique_ptr<io::FileSystem>* 
file_system,
+                                       io::FileReaderSPtr* file_reader) {
+    TFileType::type type = system_properties.system_type;
+    io::FileSystem* file_system_ptr = nullptr;
+    switch (type) {
+    case TFileType::FILE_LOCAL: {
+        RETURN_IF_ERROR(
+                
io::global_local_filesystem()->open_file(file_description.path, file_reader));
+        break;
+    }
+    case TFileType::FILE_S3: {
+        RETURN_IF_ERROR(create_s3_reader(system_properties.properties, 
file_description.path,
+                                         &file_system_ptr, file_reader));
+        break;
+    }
+    case TFileType::FILE_HDFS: {
+        RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, 
file_description.path,
+                                           &file_system_ptr, file_reader));
+        break;
+    }
+    case TFileType::FILE_BROKER: {
+        
RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
+                                             system_properties.properties, 
file_description.path,
+                                             &file_system_ptr, file_reader));
+        break;
+    }
+    default:
+        return Status::NotSupported("unsupported file reader type: {}", 
std::to_string(type));
+    }
+    file_system->reset(file_system_ptr);
+    return Status::OK();
+}
+
 // file scan node/stream load pipe
-doris::Status doris::FileFactory::create_pipe_reader(const TUniqueId& load_id,
-                                                     
std::shared_ptr<FileReader>& file_reader) {
+Status FileFactory::create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader) {
+    *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
+    if (!(*file_reader)) {
+        return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
+    }
+    return Status::OK();
+}
+
+Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
+                                       std::shared_ptr<FileReader>& 
file_reader) {
     file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
     if (!file_reader) {
         return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
     }
     return Status::OK();
 }
+
+Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
+                                       io::FileSystem** hdfs_file_system,
+                                       io::FileReaderSPtr* reader) {
+    *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, "");
+    
RETURN_IF_ERROR((dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect());
+    RETURN_IF_ERROR((*hdfs_file_system)->open_file(path, reader));
+    return Status::OK();
+}
+
+Status FileFactory::create_hdfs_writer(const std::map<std::string, 
std::string>& properties,
+                                       const std::string& path,
+                                       std::unique_ptr<FileWriter>& writer) {
+    writer.reset(new HDFSWriter(properties, path));
+    return Status::OK();
+}
+
+Status FileFactory::create_s3_reader(const std::map<std::string, std::string>& 
prop,
+                                     const std::string& path, io::FileSystem** 
s3_file_system,
+                                     io::FileReaderSPtr* reader) {
+    S3URI s3_uri(path);
+    if (!s3_uri.parse()) {
+        return Status::InvalidArgument("s3 uri is invalid: {}", path);
+    }
+    S3Conf s3_conf;
+    RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, 
&s3_conf));
+    *s3_file_system = new io::S3FileSystem(s3_conf, "");
+    
RETURN_IF_ERROR((dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect());
+    RETURN_IF_ERROR((*s3_file_system)->open_file(s3_uri.get_key(), reader));
+    return Status::OK();
+}
+
+Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
+                                         const std::map<std::string, 
std::string>& prop,
+                                         const std::string& path,
+                                         io::FileSystem** broker_file_system,
+                                         io::FileReaderSPtr* reader) {
+    *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
+    
RETURN_IF_ERROR((dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect());
+    RETURN_IF_ERROR((*broker_file_system)->open_file(path, reader));
+    return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 48e22737eb..69d1158816 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -20,12 +20,29 @@
 #include "gen_cpp/Types_types.h"
 #include "io/file_reader.h"
 #include "io/file_writer.h"
+#include "io/fs/file_reader.h"
 
 namespace doris {
+namespace io {
+class FileSystem;
+}
 class ExecEnv;
 class TNetworkAddress;
 class RuntimeProfile;
 
+struct FileSystemProperties {
+    TFileType::type system_type;
+    std::map<std::string, std::string> properties;
+    THdfsParams hdfs_params;
+    std::vector<TNetworkAddress> broker_addresses;
+};
+
+struct FileDescription {
+    std::string path;
+    int64_t start_offset;
+    size_t file_size;
+};
+
 class FileFactory {
 public:
     // Create FileWriter
@@ -53,10 +70,34 @@ public:
                                      int64_t file_size, int64_t buffer_size,
                                      std::unique_ptr<FileReader>& file_reader);
 
+    static Status create_file_reader(RuntimeProfile* profile,
+                                     const FileSystemProperties& 
system_properties,
+                                     const FileDescription& file_description,
+                                     std::unique_ptr<io::FileSystem>* 
file_system,
+                                     io::FileReaderSPtr* file_reader);
+
     // Create FileReader for stream load pipe
+    static Status create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader);
+
+    // [deprecated] Create FileReader for stream load pipe
     static Status create_pipe_reader(const TUniqueId& load_id,
                                      std::shared_ptr<FileReader>& file_reader);
 
+    static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
+                                     io::FileSystem** hdfs_file_system, 
io::FileReaderSPtr* reader);
+
+    static Status create_hdfs_writer(const std::map<std::string, std::string>& 
properties,
+                                     const std::string& path, 
std::unique_ptr<FileWriter>& writer);
+
+    static Status create_s3_reader(const std::map<std::string, std::string>& 
prop,
+                                   const std::string& path, io::FileSystem** 
s3_file_system,
+                                   io::FileReaderSPtr* reader);
+
+    static Status create_broker_reader(const TNetworkAddress& broker_addr,
+                                       const std::map<std::string, 
std::string>& prop,
+                                       const std::string& path, 
io::FileSystem** hdfs_file_system,
+                                       io::FileReaderSPtr* reader);
+
     static TFileType::type convert_storage_type(TStorageBackendType::type 
type) {
         switch (type) {
         case TStorageBackendType::LOCAL:
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index ef03541387..39c9795e95 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -61,6 +61,14 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, 
const IOContext& /*i
         return Status::IOError("offset exceeds file size(offset: {}, file 
size: {}, path: {})",
                                offset, _file_size, _path.native());
     }
+
+    auto handle = _fs->get_handle();
+    int res = hdfsSeek(handle->hdfs_fs, _hdfs_file, offset);
+    if (res != 0) {
+        return Status::InternalError("Seek to offset failed. (BE: {}) 
offset={}, err: {}",
+                                     BackendOptions::get_localhost(), offset, 
hdfsGetLastError());
+    }
+
     size_t bytes_req = result.size;
     char* to = result.data;
     bytes_req = std::min(bytes_req, _file_size - offset);
@@ -69,8 +77,7 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, 
const IOContext& /*i
         return Status::OK();
     }
 
-    auto handle = _fs->get_handle();
-    int64_t has_read = 0;
+    size_t has_read = 0;
     while (has_read < bytes_req) {
         int64_t loop_read =
                 hdfsRead(handle->hdfs_fs, _hdfs_file, to + has_read, bytes_req 
- has_read);
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index cafa7ca34c..b053aeebb4 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -63,14 +63,8 @@ private:
 HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const 
std::string& path)
         : RemoteFileSystem(path, "", FileSystemType::HDFS),
           _hdfs_params(hdfs_params),
-          _path(path),
           _fs_handle(nullptr) {
     _namenode = _hdfs_params.fs_name;
-    // if the format of _path is hdfs://ip:port/path, replace it to /path.
-    // path like hdfs://ip:port/path can't be used by libhdfs3.
-    if (_path.find(_namenode) != std::string::npos) {
-        _path = _path.substr(_namenode.size());
-    }
 }
 
 HdfsFileSystem::~HdfsFileSystem() {
@@ -101,9 +95,12 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, 
FileWriterPtr* /*writer
 
 Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    size_t file_len = -1;
+    size_t file_len = 0;
     RETURN_IF_ERROR(file_size(path, &file_len));
-    auto hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), 
O_RDONLY, 0, 0, 0);
+
+    Path real_path = _covert_path(path);
+    auto hdfs_file =
+            hdfsOpenFile(_fs_handle->hdfs_fs, real_path.string().c_str(), 
O_RDONLY, 0, 0, 0);
     if (hdfs_file == nullptr) {
         if (_fs_handle->from_cache) {
             // hdfsFS may be disconnected if not used for a long time
@@ -130,9 +127,10 @@ Status HdfsFileSystem::open_file(const Path& path, 
FileReaderSPtr* reader) {
 
 Status HdfsFileSystem::delete_file(const Path& path) {
     CHECK_HDFS_HANDLE(_fs_handle);
+    Path real_path = _covert_path(path);
     // The recursive argument `is_recursive` is irrelevant if path is a file.
     int is_recursive = 0;
-    int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), 
is_recursive);
+    int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), 
is_recursive);
     if (res == -1) {
         return Status::InternalError("Failed to delete file {}", 
path.string());
     }
@@ -141,7 +139,8 @@ Status HdfsFileSystem::delete_file(const Path& path) {
 
 Status HdfsFileSystem::create_directory(const Path& path) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, path.string().c_str());
+    Path real_path = _covert_path(path);
+    int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (res == -1) {
         return Status::InternalError("Failed to create directory {}", 
path.string());
     }
@@ -150,9 +149,10 @@ Status HdfsFileSystem::create_directory(const Path& path) {
 
 Status HdfsFileSystem::delete_directory(const Path& path) {
     CHECK_HDFS_HANDLE(_fs_handle);
+    Path real_path = _covert_path(path);
     // delete in recursive mode
     int is_recursive = 1;
-    int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), 
is_recursive);
+    int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), 
is_recursive);
     if (res == -1) {
         return Status::InternalError("Failed to delete directory {}", 
path.string());
     }
@@ -161,7 +161,8 @@ Status HdfsFileSystem::delete_directory(const Path& path) {
 
 Status HdfsFileSystem::exists(const Path& path, bool* res) const {
     CHECK_HDFS_HANDLE(_fs_handle);
-    int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+    Path real_path = _covert_path(path);
+    int is_exists = hdfsExists(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (is_exists == 0) {
         *res = true;
     } else {
@@ -172,7 +173,8 @@ Status HdfsFileSystem::exists(const Path& path, bool* res) 
const {
 
 Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const {
     CHECK_HDFS_HANDLE(_fs_handle);
-    hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, 
path.string().c_str());
+    Path real_path = _covert_path(path);
+    hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (file_info == nullptr) {
         return Status::InternalError("Failed to get file size of {}", 
path.string());
     }
@@ -183,9 +185,10 @@ Status HdfsFileSystem::file_size(const Path& path, size_t* 
file_size) const {
 
 Status HdfsFileSystem::list(const Path& path, std::vector<Path>* files) {
     CHECK_HDFS_HANDLE(_fs_handle);
+    Path real_path = _covert_path(path);
     int numEntries = 0;
     hdfsFileInfo* file_info =
-            hdfsListDirectory(_fs_handle->hdfs_fs, path.string().c_str(), 
&numEntries);
+            hdfsListDirectory(_fs_handle->hdfs_fs, real_path.string().c_str(), 
&numEntries);
     if (file_info == nullptr) {
         return Status::InternalError("Failed to list files/directors of {}", 
path.string());
     }
@@ -200,6 +203,17 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
     return _fs_handle;
 }
 
+Path HdfsFileSystem::_covert_path(const Path& path) const {
+    // if the format of path is hdfs://ip:port/path, replace it to /path.
+    // path like hdfs://ip:port/path can't be used by libhdfs3.
+    Path real_path(path);
+    if (path.string().find(_namenode) != std::string::npos) {
+        std::string real_path_str = path.string().substr(_namenode.size());
+        real_path = real_path_str;
+    }
+    return real_path;
+}
+
 // ************* HdfsFileSystemCache ******************
 int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
 
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index f86e73d1d3..01e8da58ca 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -119,9 +119,9 @@ public:
     HdfsFileSystemHandle* get_handle();
 
 private:
+    Path _covert_path(const Path& path) const;
     const THdfsParams& _hdfs_params;
     std::string _namenode;
-    std::string _path;
     // do not use std::shared_ptr or std::unique_ptr
     // _fs_handle is managed by HdfsFileSystemCache
     HdfsFileSystemHandle* _fs_handle;
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h 
b/be/src/io/fs/kafka_consumer_pipe.h
similarity index 77%
rename from be/src/runtime/routine_load/kafka_consumer_pipe_reader.h
rename to be/src/io/fs/kafka_consumer_pipe.h
index 6555057c5f..6aab83c3b2 100644
--- a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h
+++ b/be/src/io/fs/kafka_consumer_pipe.h
@@ -17,17 +17,16 @@
 
 #pragma once
 
-#include "io/fs/stream_load_pipe_reader.h"
+#include "io/fs/stream_load_pipe.h"
 
 namespace doris {
 namespace io {
-class KafkaConsumerPipeReader : public StreamLoadPipeReader {
+class KafkaConsumerPipe : public StreamLoadPipe {
 public:
-    KafkaConsumerPipeReader(size_t max_buffered_bytes = 1024 * 1024,
-                            size_t min_chunk_size = 64 * 1024)
-            : StreamLoadPipeReader(max_buffered_bytes, min_chunk_size) {}
+    KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t 
min_chunk_size = 64 * 1024)
+            : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}
 
-    ~KafkaConsumerPipeReader() override = default;
+    ~KafkaConsumerPipe() override = default;
 
     Status append_with_line_delimiter(const char* data, size_t size) {
         Status st = append(data, size);
diff --git a/be/src/io/fs/stream_load_pipe_reader.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
similarity index 76%
rename from be/src/io/fs/stream_load_pipe_reader.cpp
rename to be/src/io/fs/stream_load_pipe.cpp
index 6cc7b0a985..cc5132478c 100644
--- a/be/src/io/fs/stream_load_pipe_reader.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -15,32 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "stream_load_pipe_reader.h"
+#include "stream_load_pipe.h"
 
 #include <gen_cpp/internal_service.pb.h>
 
+#include "olap/iterators.h"
 #include "runtime/thread_context.h"
 #include "util/bit_util.h"
 
 namespace doris {
 namespace io {
-StreamLoadPipeReader::StreamLoadPipeReader(size_t max_buffered_bytes, size_t 
min_chunk_size,
-                                           int64_t total_length, bool 
use_proto)
+StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t 
min_chunk_size,
+                               int64_t total_length, bool use_proto)
         : _buffered_bytes(0),
           _proto_buffered_bytes(0),
           _max_buffered_bytes(max_buffered_bytes),
           _min_chunk_size(min_chunk_size),
+          _total_length(total_length),
           _use_proto(use_proto) {}
 
-StreamLoadPipeReader::~StreamLoadPipeReader() {
+StreamLoadPipe::~StreamLoadPipe() {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
     while (!_buf_queue.empty()) {
         _buf_queue.pop_front();
     }
 }
 
-Status StreamLoadPipeReader::read_at(size_t /*offset*/, Slice result, const 
IOContext& /*io_ctx*/,
-                                     size_t* bytes_read) {
+Status StreamLoadPipe::read_at(size_t /*offset*/, Slice result, const 
IOContext& /*io_ctx*/,
+                               size_t* bytes_read) {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
     *bytes_read = 0;
     size_t bytes_req = result.size;
@@ -79,15 +81,38 @@ Status StreamLoadPipeReader::read_at(size_t /*offset*/, 
Slice result, const IOCo
     return Status::OK();
 }
 
-Status StreamLoadPipeReader::append_and_flush(const char* data, size_t size,
-                                              size_t proto_byte_size) {
+// If _total_length == -1, this should be a Kafka routine load task,
+// just get the next buffer directly from the buffer queue, because one buffer 
contains a complete piece of data.
+// Otherwise, this should be a stream load task that needs to read the 
specified amount of data.
+Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, 
size_t* length) {
+    if (_total_length < -1) {
+        return Status::InternalError("invalid, _total_length is: {}", 
_total_length);
+    } else if (_total_length == 0) {
+        // no data
+        *length = 0;
+        return Status::OK();
+    }
+
+    if (_total_length == -1) {
+        return _read_next_buffer(data, length);
+    }
+
+    // _total_length > 0, read the entire data
+    data->reset(new uint8_t[_total_length]);
+    Slice result(data->get(), _total_length);
+    IOContext io_ctx;
+    Status st = read_at(0, result, io_ctx, length);
+    return st;
+}
+
+Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t 
proto_byte_size) {
     ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size 
+ 1));
     buf->put_bytes(data, size);
     buf->flip();
     return _append(buf, proto_byte_size);
 }
 
-Status StreamLoadPipeReader::append(const char* data, size_t size) {
+Status StreamLoadPipe::append(const char* data, size_t size) {
     size_t pos = 0;
     if (_write_buf != nullptr) {
         if (size < _write_buf->remaining()) {
@@ -110,7 +135,7 @@ Status StreamLoadPipeReader::append(const char* data, 
size_t size) {
     return Status::OK();
 }
 
-Status StreamLoadPipeReader::append(const ByteBufferPtr& buf) {
+Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
     if (_write_buf != nullptr) {
         _write_buf->flip();
         RETURN_IF_ERROR(_append(_write_buf));
@@ -120,7 +145,7 @@ Status StreamLoadPipeReader::append(const ByteBufferPtr& 
buf) {
 }
 
 // read the next buffer from _buf_queue
-Status StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>* 
data, int64_t* length) {
+Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, 
size_t* length) {
     std::unique_lock<std::mutex> l(_lock);
     while (!_cancelled && !_finished && _buf_queue.empty()) {
         _get_cond.wait(l);
@@ -150,7 +175,7 @@ Status 
StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>* data,
     return Status::OK();
 }
 
-Status StreamLoadPipeReader::_append(const ByteBufferPtr& buf, size_t 
proto_byte_size) {
+Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t 
proto_byte_size) {
     {
         std::unique_lock<std::mutex> l(_lock);
         // if _buf_queue is empty, we append this buf without size check
@@ -180,7 +205,7 @@ Status StreamLoadPipeReader::_append(const ByteBufferPtr& 
buf, size_t proto_byte
 }
 
 // called when producer finished
-Status StreamLoadPipeReader::finish() {
+Status StreamLoadPipe::finish() {
     if (_write_buf != nullptr) {
         _write_buf->flip();
         _append(_write_buf);
@@ -195,7 +220,7 @@ Status StreamLoadPipeReader::finish() {
 }
 
 // called when producer/consumer failed
-void StreamLoadPipeReader::cancel(const std::string& reason) {
+void StreamLoadPipe::cancel(const std::string& reason) {
     {
         std::lock_guard<std::mutex> l(_lock);
         _cancelled = true;
diff --git a/be/src/io/fs/stream_load_pipe_reader.h 
b/be/src/io/fs/stream_load_pipe.h
similarity index 85%
rename from be/src/io/fs/stream_load_pipe_reader.h
rename to be/src/io/fs/stream_load_pipe.h
index a5cc95e3e8..59d391d5ad 100644
--- a/be/src/io/fs/stream_load_pipe_reader.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -28,13 +28,13 @@ namespace io {
 
 const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
 
-class StreamLoadPipeReader : public MessageBodySink, public FileReader {
+class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
-    StreamLoadPipeReader(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
-                         size_t min_chunk_size = 64 * 1024, int64_t 
total_length = -1,
-                         bool use_proto = false);
+    StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
+                   size_t min_chunk_size = 64 * 1024, int64_t total_length = 
-1,
+                   bool use_proto = false);
 
-    ~StreamLoadPipeReader() override;
+    ~StreamLoadPipe() override;
 
     Status append_and_flush(const char* data, size_t size, size_t 
proto_byte_size = 0);
 
@@ -63,9 +63,11 @@ public:
     // called when producer/consumer failed
     void cancel(const std::string& reason) override;
 
+    Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
+
 private:
     // read the next buffer from _buf_queue
-    Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t* 
length);
+    Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length);
 
     Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0);
 
@@ -82,7 +84,7 @@ private:
     // The default is -1, which means that the data arrives in a stream
     // and the length is unknown.
     // size_t is unsigned, so use int64_t
-    // int64_t _total_length = -1;
+    int64_t _total_length = -1;
     bool _use_proto = false;
     std::deque<ByteBufferPtr> _buf_queue;
     std::condition_variable _put_cond;
diff --git a/be/src/io/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp
deleted file mode 100644
index 3cbe2f4436..0000000000
--- a/be/src/io/hdfs_reader_writer.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/hdfs_reader_writer.h"
-
-#include "io/hdfs_file_reader.h"
-#include "io/hdfs_writer.h"
-
-namespace doris {
-
-Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const 
std::string& path,
-                                       int64_t start_offset, FileReader** 
reader) {
-    *reader = new HdfsFileReader(hdfs_params, path, start_offset);
-    return Status::OK();
-}
-
-Status HdfsReaderWriter::create_reader(const std::map<std::string, 
std::string>& hdfs_params,
-                                       const std::string& path, int64_t 
start_offset,
-                                       FileReader** reader) {
-    *reader = new HdfsFileReader(hdfs_params, path, start_offset);
-    return Status::OK();
-}
-
-Status HdfsReaderWriter::create_writer(const std::map<std::string, 
std::string>& properties,
-                                       const std::string& path,
-                                       std::unique_ptr<FileWriter>& writer) {
-    writer.reset(new HDFSWriter(properties, path));
-    return Status::OK();
-}
-} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0b36a2b8e8..e3ab690168 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -182,6 +182,15 @@ public:
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
     doris::vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
 
+    // only for unit test
+    void set_master_info(TMasterInfo* master_info) { this->_master_info = 
master_info; }
+    void set_new_load_stream_mgr(NewLoadStreamMgr* new_load_stream_mgr) {
+        this->_new_load_stream_mgr = new_load_stream_mgr;
+    }
+    void set_stream_load_executor(StreamLoadExecutor* stream_load_executor) {
+        this->_stream_load_executor = stream_load_executor;
+    }
+
 private:
     Status _init(const std::vector<StorePath>& store_paths);
     void _destroy();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 134df9f221..c89cfc69ef 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -33,6 +33,7 @@
 #include "gen_cpp/QueryPlanExtra_types.h"
 #include "gen_cpp/Types_types.h"
 #include "gutil/strings/substitute.h"
+#include "io/fs/stream_load_pipe.h"
 #include "opentelemetry/trace/scope.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "runtime/client_cache.h"
@@ -41,9 +42,8 @@
 #include "runtime/exec_env.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/runtime_filter_mgr.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_context.h"
-#include "runtime/stream_load/stream_load_pipe.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
@@ -137,8 +137,8 @@ public:
 
     std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return 
_fragments_ctx; }
 
-    void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
-    std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
+    void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
+    std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
 
     void set_need_wait_execution_trigger() { _need_wait_execution_trigger = 
true; }
 
@@ -172,7 +172,7 @@ private:
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
     // The pipe for data transfering, such as insert.
-    std::shared_ptr<StreamLoadPipe> _pipe;
+    std::shared_ptr<io::StreamLoadPipe> _pipe;
 
     // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
     bool _need_wait_execution_trigger = false;
@@ -528,14 +528,13 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
         stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
         stream_load_ctx->need_commit_self = true;
         stream_load_ctx->need_rollback = true;
-        // total_length == -1 means read one message from pipe in once time, 
don't care the length.
-        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* 
max_buffered_bytes */,
-                                                     64 * 1024 /* 
min_chunk_size */,
-                                                     -1 /* total_length */, 
true /* use_proto */);
+        auto pipe = std::make_shared<io::StreamLoadPipe>(
+                kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+                -1 /* total_length */, true /* use_proto */);
         stream_load_ctx->body_sink = pipe;
         stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
 
-        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id, 
pipe));
+        
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, 
pipe));
 
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         set_pipe(params.params.fragment_instance_id, pipe);
@@ -562,7 +561,7 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
 }
 
 void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
-                           std::shared_ptr<StreamLoadPipe> pipe) {
+                           std::shared_ptr<io::StreamLoadPipe> pipe) {
     {
         std::lock_guard<std::mutex> lock(_lock);
         auto iter = _fragment_map.find(fragment_instance_id);
@@ -572,7 +571,7 @@ void FragmentMgr::set_pipe(const TUniqueId& 
fragment_instance_id,
     }
 }
 
-std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& 
fragment_instance_id) {
+std::shared_ptr<io::StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& 
fragment_instance_id) {
     {
         std::lock_guard<std::mutex> lock(_lock);
         auto iter = _fragment_map.find(fragment_instance_id);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index afd5408bbf..050db423dd 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -45,6 +45,10 @@ namespace pipeline {
 class PipelineFragmentContext;
 }
 
+namespace io {
+class StreamLoadPipe;
+}
+
 class QueryFragmentsCtx;
 class ExecEnv;
 class FragmentExecState;
@@ -54,7 +58,6 @@ class TExecPlanFragmentParams;
 class TExecPlanFragmentParamsList;
 class TUniqueId;
 class RuntimeFilterMergeController;
-class StreamLoadPipe;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
@@ -104,9 +107,9 @@ public:
     Status merge_filter(const PMergeFilterRequest* request,
                         butil::IOBufAsZeroCopyInputStream* attach_data);
 
-    void set_pipe(const TUniqueId& fragment_instance_id, 
std::shared_ptr<StreamLoadPipe> pipe);
+    void set_pipe(const TUniqueId& fragment_instance_id, 
std::shared_ptr<io::StreamLoadPipe> pipe);
 
-    std::shared_ptr<StreamLoadPipe> get_pipe(const TUniqueId& 
fragment_instance_id);
+    std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& 
fragment_instance_id);
 
 private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, 
FinishCallback cb);
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index 173ffbd82a..afd4d9f6f7 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -30,7 +30,6 @@ namespace doris {
 
 class KafkaConsumerPipe;
 class Status;
-class StreamLoadPipe;
 
 class DataConsumer {
 public:
@@ -156,7 +155,6 @@ private:
 
     KafkaEventCb _k_event_cb;
     RdKafka::KafkaConsumer* _k_consumer = nullptr;
-    std::shared_ptr<KafkaConsumerPipe> _k_consumer_pipe;
 };
 
 } // end namespace doris
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 0640591ece..869d427568 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -16,10 +16,10 @@
 // under the License.
 #include "runtime/routine_load/data_consumer_group.h"
 
+#include "io/fs/kafka_consumer_pipe.h"
 #include "librdkafka/rdkafka.h"
 #include "librdkafka/rdkafkacpp.h"
 #include "runtime/routine_load/data_consumer.h"
-#include "runtime/routine_load/kafka_consumer_pipe.h"
 #include "runtime/stream_load/stream_load_context.h"
 
 namespace doris {
@@ -96,8 +96,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* 
ctx) {
     int64_t left_rows = ctx->max_batch_rows;
     int64_t left_bytes = ctx->max_batch_size;
 
-    std::shared_ptr<KafkaConsumerPipe> kafka_pipe =
-            std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);
+    std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
+            std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
 
     LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << 
left_time
               << ", batch rows: " << left_rows << ", batch size: " << 
left_bytes << ". "
@@ -107,11 +107,11 @@ Status 
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
     std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
 
     //improve performance
-    Status (KafkaConsumerPipe::*append_data)(const char* data, size_t size);
+    Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t 
size);
     if (ctx->format == TFileFormatType::FORMAT_JSON) {
-        append_data = &KafkaConsumerPipe::append_json;
+        append_data = &io::KafkaConsumerPipe::append_json;
     } else {
-        append_data = &KafkaConsumerPipe::append_with_line_delimiter;
+        append_data = &io::KafkaConsumerPipe::append_with_line_delimiter;
     }
 
     MonotonicStopWatch watch;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index a5535199e2..da8842ac59 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -23,9 +23,10 @@
 #include "gen_cpp/BackendService_types.h"
 #include "gen_cpp/FrontendService_types.h"
 #include "gen_cpp/Types_types.h"
+#include "io/fs/kafka_consumer_pipe.h"
+#include "olap/iterators.h"
 #include "runtime/exec_env.h"
 #include "runtime/routine_load/data_consumer_group.h"
-#include "runtime/routine_load/kafka_consumer_pipe.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/defer_op.h"
 #include "util/uid_util.h"
@@ -193,9 +194,9 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     TStatus tstatus;
     tstatus.status_code = TStatusCode::OK;
     put_result.status = tstatus;
-    put_result.params = std::move(task.params);
+    put_result.params = task.params;
     put_result.__isset.params = true;
-    ctx->put_result = std::move(put_result);
+    ctx->put_result = put_result;
     if (task.__isset.format) {
         ctx->format = task.format;
     }
@@ -267,10 +268,10 @@ void 
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
     HANDLE_ERROR(consumer_pool->get_consumer_grp(ctx, &consumer_grp), "failed 
to get consumers");
 
     // create and set pipe
-    std::shared_ptr<StreamLoadPipe> pipe;
+    std::shared_ptr<io::StreamLoadPipe> pipe;
     switch (ctx->load_src_type) {
     case TLoadSourceType::KAFKA: {
-        pipe = std::make_shared<KafkaConsumerPipe>();
+        pipe = std::make_shared<io::KafkaConsumerPipe>();
         Status st = 
std::static_pointer_cast<KafkaDataConsumerGroup>(consumer_grp)
                             ->assign_topic_partitions(ctx);
         if (!st.ok()) {
@@ -291,7 +292,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* 
ctx, DataConsumerPool
     ctx->body_sink = pipe;
 
     // must put pipe before executing plan fragment
-    HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to 
add pipe");
+    HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed 
to add pipe");
 
 #ifndef BE_TEST
     // execute plan fragment, async
@@ -365,32 +366,31 @@ void 
RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
         _exec_env->stream_load_executor()->rollback_txn(ctx);
         ctx->need_rollback = false;
     }
-    if (ctx->body_sink.get() != nullptr) {
+    if (ctx->body_sink != nullptr) {
         ctx->body_sink->cancel(err_msg);
     }
-
-    return;
 }
 
 // for test only
 Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) 
{
     auto mock_consumer = [this, ctx]() {
         ctx->ref();
-        std::shared_ptr<StreamLoadPipe> pipe = 
_exec_env->load_stream_mgr()->get(ctx->id);
-        bool eof = false;
+        std::shared_ptr<io::StreamLoadPipe> pipe = 
_exec_env->new_load_stream_mgr()->get(ctx->id);
         std::stringstream ss;
         while (true) {
             char one;
             int64_t len = 1;
-            int64_t read_bytes = 0;
-            Status st = pipe->read((uint8_t*)&one, len, &read_bytes, &eof);
+            size_t read_bytes = 0;
+            Slice result((uint8_t*)&one, len);
+            IOContext io_ctx;
+            Status st = pipe->read_at(0, result, io_ctx, &read_bytes);
             if (!st.ok()) {
                 LOG(WARNING) << "read failed";
                 ctx->promise.set_value(st);
                 break;
             }
 
-            if (eof) {
+            if (read_bytes == 0) {
                 ctx->promise.set_value(Status::OK());
                 break;
             }
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.cpp 
b/be/src/runtime/stream_load/new_load_stream_mgr.cpp
index 20cbb295cb..c7a57362fe 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.cpp
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.cpp
@@ -22,8 +22,8 @@ namespace doris {
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(new_stream_load_pipe_count, 
MetricUnit::NOUNIT);
 
 NewLoadStreamMgr::NewLoadStreamMgr() {
-    // Each StreamLoadPipeReader has a limited buffer size (default 1M), it's 
not needed to count the
-    // actual size of all StreamLoadPipeReader.
+    // Each StreamLoadPipe has a limited buffer size (default 1M), it's not 
needed to count the
+    // actual size of all StreamLoadPipe.
     REGISTER_HOOK_METRIC(new_stream_load_pipe_count, [this]() { return 
_stream_map.size(); });
 }
 
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h 
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index c009fbc71e..60cd3aa446 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -21,7 +21,7 @@
 #include <mutex>
 #include <unordered_map>
 
-#include "io/fs/stream_load_pipe_reader.h"
+#include "io/fs/stream_load_pipe.h"
 #include "util/doris_metrics.h"
 #include "util/uid_util.h"
 
@@ -34,7 +34,7 @@ public:
     NewLoadStreamMgr();
     ~NewLoadStreamMgr();
 
-    Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipeReader> 
stream) {
+    Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipe> stream) 
{
         std::lock_guard<std::mutex> l(_lock);
         auto it = _stream_map.find(id);
         if (it != std::end(_stream_map)) {
@@ -45,7 +45,7 @@ public:
         return Status::OK();
     }
 
-    std::shared_ptr<io::StreamLoadPipeReader> get(const UniqueId& id) {
+    std::shared_ptr<io::StreamLoadPipe> get(const UniqueId& id) {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _stream_map.find(id);
         if (it == std::end(_stream_map)) {
@@ -67,6 +67,6 @@ public:
 
 private:
     std::mutex _lock;
-    std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipeReader>> 
_stream_map;
+    std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipe>> 
_stream_map;
 };
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 795b0f304b..0fc27a27f9 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -29,6 +29,7 @@
 #include "gen_cpp/FrontendService_types.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
 #include "service/backend_options.h"
 #include "util/string_util.h"
@@ -92,7 +93,7 @@ public:
             need_rollback = false;
         }
 
-        _exec_env->load_stream_mgr()->remove(id);
+        _exec_env->new_load_stream_mgr()->remove(id);
     }
 
     std::string to_json() const;
diff --git a/be/src/util/hdfs_storage_backend.cpp 
b/be/src/util/hdfs_storage_backend.cpp
index 71df59c086..83c25716f9 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -18,7 +18,6 @@
 #include "util/hdfs_storage_backend.h"
 
 #include "io/hdfs_file_reader.h"
-#include "io/hdfs_reader_writer.h"
 #include "io/hdfs_writer.h"
 #include "olap/file_helper.h"
 #include "util/hdfs_util.h"
@@ -55,7 +54,7 @@ Status HDFSStorageBackend::close() {
 // if the format of path is hdfs://ip:port/path, replace it to /path.
 // path like hdfs://ip:port/path can't be used by libhdfs3.
 std::string HDFSStorageBackend::parse_path(const std::string& path) {
-    if (path.find(hdfs_file_prefix) != path.npos) {
+    if (path.find(hdfs_file_prefix) != std::string::npos) {
         std::string temp = path.substr(hdfs_file_prefix.size());
         std::size_t pos = temp.find_first_of('/');
         return temp.substr(pos);
@@ -143,7 +142,7 @@ Status HDFSStorageBackend::list(const std::string& 
remote_path, bool contain_md5
         // get filename
         std::filesystem::path file_path(file_name_with_path);
         std::string file_name = file_path.filename();
-        size_t pos = file_name.find_last_of(".");
+        size_t pos = file_name.find_last_of('.');
         if (pos == std::string::npos || pos == file_name.size() - 1) {
             // Not found checksum separator, ignore this file
             continue;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 428f6b4f91..4426ba63a7 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -281,8 +281,8 @@ set(VEC_FILES
   exec/format/json/new_json_reader.cpp
   exec/format/table/table_format_reader.cpp
   exec/format/table/iceberg_reader.cpp
-  exec/format/file_reader/new_file_factory.cpp
   exec/format/file_reader/new_plain_text_line_reader.cpp
+  exec/format/file_reader/new_plain_binary_line_reader.cpp
   )
 
 if (WITH_MYSQL)
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 48f8d84dc7..63bdbd69f2 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -23,14 +23,15 @@
 #include "common/consts.h"
 #include "common/status.h"
 #include "exec/decompressor.h"
-#include "exec/plain_binary_line_reader.h"
-#include "exec/plain_text_line_reader.h"
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
+#include "io/file_factory.h"
 #include "util/string_util.h"
 #include "util/utf8_check.h"
 #include "vec/core/block.h"
-#include "vec/exec/scan/vfile_scanner.h"
+#include "vec/exec/format/file_reader/new_plain_binary_line_reader.h"
+#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
+#include "vec/exec/scan/vscanner.h"
 
 namespace doris::vectorized {
 
@@ -45,6 +46,8 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* 
profile, ScannerCounte
           _params(params),
           _range(range),
           _file_slot_descs(file_slot_descs),
+          _file_system(nullptr),
+          _file_reader(nullptr),
           _line_reader(nullptr),
           _line_reader_eof(false),
           _text_converter(nullptr),
@@ -76,7 +79,7 @@ CsvReader::CsvReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params
     _size = _range.size;
 }
 
-CsvReader::~CsvReader() {}
+CsvReader::~CsvReader() = default;
 
 Status CsvReader::init_reader(bool is_load) {
     // set the skip lines and start offset
@@ -102,18 +105,23 @@ Status CsvReader::init_reader(bool is_load) {
         _skip_lines = 1;
     }
 
-    // create and open file reader
-    FileReader* real_reader = nullptr;
+    FileSystemProperties system_properties;
+    system_properties.system_type = _params.file_type;
+    system_properties.properties = _params.properties;
+    system_properties.hdfs_params = _params.hdfs_params;
+
+    FileDescription file_description;
+    file_description.path = _range.path;
+    file_description.start_offset = start_offset;
+    file_description.file_size = _range.__isset.file_size ? _range.file_size : 
0;
+
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
_file_reader_s));
-        real_reader = _file_reader_s.get();
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
     } else {
         RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, _params, _range.path, start_offset, 
_range.file_size, 0, _file_reader));
-        real_reader = _file_reader.get();
+                _profile, system_properties, file_description, &_file_system, 
&_file_reader));
     }
-    RETURN_IF_ERROR(real_reader->open());
-    if (real_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
+    if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
         return Status::EndOfFile("Empty File");
     }
@@ -135,11 +143,13 @@ Status CsvReader::init_reader(bool is_load) {
     case TFileFormatType::FORMAT_CSV_LZ4FRAME:
     case TFileFormatType::FORMAT_CSV_LZOP:
     case TFileFormatType::FORMAT_CSV_DEFLATE:
-        _line_reader.reset(new PlainTextLineReader(_profile, real_reader, 
_decompressor.get(),
-                                                   _size, _line_delimiter, 
_line_delimiter_length));
+        _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, 
_decompressor.get(),
+                                                      _size, _line_delimiter,
+                                                      _line_delimiter_length, 
start_offset));
+
         break;
     case TFileFormatType::FORMAT_PROTO:
-        _line_reader.reset(new PlainBinaryLineReader(real_reader));
+        _line_reader.reset(new NewPlainBinaryLineReader(_file_reader, 
_params.file_type));
         break;
     default:
         return Status::InternalError(
@@ -495,11 +505,20 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
         }
     }
 
-    // create and open file reader
-    RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, 
_range.path, start_offset,
-                                                    _range.file_size, 0, 
_file_reader));
-    RETURN_IF_ERROR(_file_reader->open());
-    if (_file_reader->size() == 0) {
+    FileSystemProperties system_properties;
+    system_properties.system_type = _params.file_type;
+    system_properties.properties = _params.properties;
+    system_properties.hdfs_params = _params.hdfs_params;
+
+    FileDescription file_description;
+    file_description.path = _range.path;
+    file_description.start_offset = start_offset;
+    file_description.file_size = _range.__isset.file_size ? _range.file_size : 
0;
+
+    RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
system_properties, file_description,
+                                                    &_file_system, 
&_file_reader));
+    if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
+        _params.file_type != TFileType::FILE_BROKER) {
         return Status::EndOfFile("Empty File");
     }
 
@@ -513,8 +532,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
     // _decompressor may be nullptr if this is not a compressed file
     RETURN_IF_ERROR(_create_decompressor());
 
-    _line_reader.reset(new PlainTextLineReader(_profile, _file_reader.get(), 
_decompressor.get(),
-                                               _size, _line_delimiter, 
_line_delimiter_length));
+    _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, 
_decompressor.get(),
+                                                  _size, _line_delimiter, 
_line_delimiter_length,
+                                                  start_offset));
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index 5bb14523e3..3a22e0aa14 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -17,10 +17,11 @@
 
 #pragma once
 
+#include "io/fs/file_reader.h"
 #include "vec/exec/format/generic_reader.h"
+
 namespace doris {
 
-class FileReader;
 class LineReader;
 class TextConverter;
 class Decompressor;
@@ -63,13 +64,13 @@ private:
     bool _is_array(const Slice& slice);
 
     // used for parse table schema of csv file.
+    // Currently, this feature is for table valued function.
     Status _prepare_parse(size_t* read_line, bool* is_parse_name);
     Status _parse_col_nums(size_t* col_nums);
     Status _parse_col_names(std::vector<std::string>* col_names);
     // TODO(ftw): parse type
     Status _parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* 
col_types);
 
-private:
     RuntimeState* _state;
     RuntimeProfile* _profile;
     ScannerCounter* _counter;
@@ -84,11 +85,8 @@ private:
     // True if this is a load task
     bool _is_load = false;
 
-    // _file_reader_s is for stream load pipe reader,
-    // and _file_reader is for other file reader.
-    // TODO: refactor this to use only shared_ptr or unique_ptr
-    std::unique_ptr<FileReader> _file_reader;
-    std::shared_ptr<FileReader> _file_reader_s;
+    std::unique_ptr<io::FileSystem> _file_system;
+    io::FileReaderSPtr _file_reader;
     std::unique_ptr<LineReader> _line_reader;
     bool _line_reader_eof;
     std::unique_ptr<TextConverter> _text_converter;
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp 
b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
deleted file mode 100644
index a02b5ebdba..0000000000
--- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/format/file_reader/new_file_factory.h"
-
-#include "io/broker_reader.h"
-#include "io/broker_writer.h"
-#include "io/buffered_reader.h"
-#include "io/fs/broker_file_system.h"
-#include "io/fs/file_system.h"
-#include "io/fs/hdfs_file_system.h"
-#include "io/fs/s3_file_system.h"
-#include "io/hdfs_file_reader.h"
-#include "io/hdfs_writer.h"
-#include "io/local_file_reader.h"
-#include "io/local_file_writer.h"
-#include "io/s3_reader.h"
-#include "io/s3_writer.h"
-#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
-#include "runtime/stream_load/new_load_stream_mgr.h"
-#include "util/s3_util.h"
-
-namespace doris {
-
-Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
-                                          const std::vector<TNetworkAddress>& 
broker_addresses,
-                                          const std::map<std::string, 
std::string>& properties,
-                                          const std::string& path, int64_t 
start_offset,
-                                          std::unique_ptr<FileWriter>& 
file_writer) {
-    switch (type) {
-    case TFileType::FILE_LOCAL: {
-        file_writer.reset(new LocalFileWriter(path, start_offset));
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        file_writer.reset(new BrokerWriter(env, broker_addresses, properties, 
path, start_offset));
-        break;
-    }
-    case TFileType::FILE_S3: {
-        file_writer.reset(new S3Writer(properties, path, start_offset));
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        RETURN_IF_ERROR(create_hdfs_writer(
-                const_cast<std::map<std::string, std::string>&>(properties), 
path, file_writer));
-        break;
-    }
-    default:
-        return Status::InternalError("unsupported file writer type: {}", 
std::to_string(type));
-    }
-
-    return Status::OK();
-}
-
-// ============================
-// broker scan node/unique ptr
-Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env,
-                                          RuntimeProfile* profile,
-                                          const std::vector<TNetworkAddress>& 
broker_addresses,
-                                          const std::map<std::string, 
std::string>& properties,
-                                          const TBrokerRangeDesc& range, 
int64_t start_offset,
-                                          std::unique_ptr<FileReader>& 
file_reader) {
-    FileReader* file_reader_ptr;
-    switch (type) {
-    case TFileType::FILE_LOCAL: {
-        file_reader_ptr = new LocalFileReader(range.path, start_offset);
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        file_reader_ptr = new BufferedReader(
-                profile,
-                new BrokerReader(env, broker_addresses, properties, 
range.path, start_offset,
-                                 range.__isset.file_size ? range.file_size : 
0));
-        break;
-    }
-    case TFileType::FILE_S3: {
-        file_reader_ptr =
-                new BufferedReader(profile, new S3Reader(properties, 
range.path, start_offset));
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        FileReader* hdfs_reader = nullptr;
-        RETURN_IF_ERROR(
-                create_hdfs_reader(range.hdfs_params, range.path, 
start_offset, &hdfs_reader));
-        file_reader_ptr = new BufferedReader(profile, hdfs_reader);
-        break;
-    }
-    default:
-        return Status::InternalError("unsupported file reader type: " + 
std::to_string(type));
-    }
-    file_reader.reset(file_reader_ptr);
-
-    return Status::OK();
-}
-
-// ============================
-// file scan node/unique ptr
-Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/,
-                                          const FileSystemProperties& 
system_properties,
-                                          const FileDescription& 
file_description,
-                                          std::unique_ptr<io::FileSystem>* 
file_system,
-                                          io::FileReaderSPtr* file_reader) {
-    TFileType::type type = system_properties.system_type;
-    io::FileSystem* file_system_ptr = nullptr;
-    switch (type) {
-    case TFileType::FILE_S3: {
-        RETURN_IF_ERROR(create_s3_reader(system_properties.properties, 
file_description.path,
-                                         &file_system_ptr, file_reader));
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, 
file_description.path,
-                                           &file_system_ptr, file_reader));
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        
RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
-                                             system_properties.properties, 
file_description.path,
-                                             &file_system_ptr, file_reader));
-        break;
-    }
-    default:
-        return Status::NotSupported("unsupported file reader type: {}", 
std::to_string(type));
-    }
-    file_system->reset(file_system_ptr);
-    return Status::OK();
-}
-
-// file scan node/stream load pipe
-Status NewFileFactory::create_pipe_reader(const TUniqueId& load_id,
-                                          io::FileReaderSPtr* file_reader) {
-    *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
-    if (!(*file_reader)) {
-        return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
-    }
-    return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, 
const std::string& path,
-                                          int64_t start_offset, FileReader** 
reader) {
-    *reader = new HdfsFileReader(hdfs_params, path, start_offset);
-    return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, 
const std::string& path,
-                                          io::FileSystem** hdfs_file_system,
-                                          io::FileReaderSPtr* reader) {
-    *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, path);
-    (dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect();
-    (*hdfs_file_system)->open_file(path, reader);
-    return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_writer(const std::map<std::string, 
std::string>& properties,
-                                          const std::string& path,
-                                          std::unique_ptr<FileWriter>& writer) 
{
-    writer.reset(new HDFSWriter(properties, path));
-    return Status::OK();
-}
-
-Status NewFileFactory::create_s3_reader(const std::map<std::string, 
std::string>& prop,
-                                        const std::string& path, 
io::FileSystem** s3_file_system,
-                                        io::FileReaderSPtr* reader) {
-    S3URI s3_uri(path);
-    if (!s3_uri.parse()) {
-        return Status::InvalidArgument("s3 uri is invalid: {}", path);
-    }
-    S3Conf s3_conf;
-    RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, 
&s3_conf));
-    *s3_file_system = new io::S3FileSystem(s3_conf, "");
-    (dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect();
-    (*s3_file_system)->open_file(s3_uri.get_key(), reader);
-    return Status::OK();
-}
-
-Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
-                                            const std::map<std::string, 
std::string>& prop,
-                                            const std::string& path,
-                                            io::FileSystem** 
broker_file_system,
-                                            io::FileReaderSPtr* reader) {
-    *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
-    (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect();
-    (*broker_file_system)->open_file(path, reader);
-    return Status::OK();
-}
-} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h 
b/be/src/vec/exec/format/file_reader/new_file_factory.h
deleted file mode 100644
index aa510718ba..0000000000
--- a/be/src/vec/exec/format/file_reader/new_file_factory.h
+++ /dev/null
@@ -1,115 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
-#include "io/file_writer.h"
-#include "io/fs/file_reader.h"
-
-namespace doris {
-namespace io {
-class FileSystem;
-}
-
-class ExecEnv;
-class TNetworkAddress;
-class RuntimeProfile;
-
-struct FileSystemProperties {
-    TFileType::type system_type;
-    std::map<std::string, std::string> properties;
-    THdfsParams hdfs_params;
-    std::vector<TNetworkAddress> broker_addresses;
-};
-
-struct FileDescription {
-    std::string path;
-    int64_t start_offset;
-    size_t file_size;
-    size_t buffer_size;
-};
-
-class NewFileFactory {
-public:
-    // Create FileWriter
-    static Status create_file_writer(TFileType::type type, ExecEnv* env,
-                                     const std::vector<TNetworkAddress>& 
broker_addresses,
-                                     const std::map<std::string, std::string>& 
properties,
-                                     const std::string& path, int64_t 
start_offset,
-                                     std::unique_ptr<FileWriter>& file_writer);
-
-    /**
-     * Create FileReader for broker scan node related scanners and readers
-     */
-    static Status create_file_reader(TFileType::type type, ExecEnv* env, 
RuntimeProfile* profile,
-                                     const std::vector<TNetworkAddress>& 
broker_addresses,
-                                     const std::map<std::string, std::string>& 
properties,
-                                     const TBrokerRangeDesc& range, int64_t 
start_offset,
-                                     std::unique_ptr<FileReader>& file_reader);
-    /**
-     * Create FileReader for file scan node rlated scanners and readers
-     * If buffer_size > 0, use BufferedReader to wrap the underlying 
FileReader;
-     * Otherwise, return the underlying FileReader directly.
-     */
-    static Status create_file_reader(RuntimeProfile* profile,
-                                     const FileSystemProperties& 
system_properties,
-                                     const FileDescription& file_description,
-                                     std::unique_ptr<io::FileSystem>* 
file_system,
-                                     io::FileReaderSPtr* file_reader);
-
-    // Create FileReader for stream load pipe
-    static Status create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader);
-
-    // TODO(ftw): should be delete after new_hdfs_file_reader ready
-    static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
-                                     int64_t start_offset, FileReader** 
reader);
-
-    static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
-                                     io::FileSystem** hdfs_file_system, 
io::FileReaderSPtr* reader);
-
-    // TODO(ftw): should be delete after new_hdfs_file_writer ready
-    static Status create_hdfs_writer(const std::map<std::string, std::string>& 
properties,
-                                     const std::string& path, 
std::unique_ptr<FileWriter>& writer);
-
-    static Status create_s3_reader(const std::map<std::string, std::string>& 
prop,
-                                   const std::string& path, io::FileSystem** 
s3_file_system,
-                                   io::FileReaderSPtr* reader);
-
-    static Status create_broker_reader(const TNetworkAddress& broker_addr,
-                                       const std::map<std::string, 
std::string>& prop,
-                                       const std::string& path, 
io::FileSystem** hdfs_file_system,
-                                       io::FileReaderSPtr* reader);
-
-    static TFileType::type convert_storage_type(TStorageBackendType::type 
type) {
-        switch (type) {
-        case TStorageBackendType::LOCAL:
-            return TFileType::FILE_LOCAL;
-        case TStorageBackendType::S3:
-            return TFileType::FILE_S3;
-        case TStorageBackendType::BROKER:
-            return TFileType::FILE_BROKER;
-        case TStorageBackendType::HDFS:
-            return TFileType::FILE_HDFS;
-        default:
-            LOG(FATAL) << "not match type to convert, from type:" << type;
-        }
-        __builtin_unreachable();
-    }
-};
-} // namespace doris
diff --git 
a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
new file mode 100644
index 0000000000..016d29ca1d
--- /dev/null
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "new_plain_binary_line_reader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include "io/fs/file_reader.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/iterators.h"
+
+namespace doris {
+
+NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr 
file_reader,
+                                                   TFileType::type file_type)
+        : _file_reader(file_reader), _file_type(file_type) {}
+
+NewPlainBinaryLineReader::~NewPlainBinaryLineReader() {
+    close();
+}
+
+void NewPlainBinaryLineReader::close() {}
+
+Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, 
bool* eof) {
+    std::unique_ptr<uint8_t[]> file_buf;
+    size_t read_size = 0;
+    IOContext io_ctx;
+    io_ctx.reader_type = READER_QUERY;
+    switch (_file_type) {
+    case TFileType::FILE_LOCAL:
+    case TFileType::FILE_HDFS:
+    case TFileType::FILE_S3: {
+        size_t file_size = _file_reader->size();
+        file_buf.reset(new uint8_t[file_size]);
+        Slice result(file_buf.get(), file_size);
+        RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size));
+        break;
+    }
+    case TFileType::FILE_STREAM: {
+        RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
+                                ->read_one_message(&file_buf, &read_size));
+
+        break;
+    }
+    default: {
+        return Status::NotSupported("no supported file reader type: {}", 
_file_type);
+    }
+    }
+    *ptr = file_buf.release();
+    *size = read_size;
+    if (read_size == 0) {
+        *eof = true;
+    }
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/io/hdfs_reader_writer.h 
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
similarity index 54%
rename from be/src/io/hdfs_reader_writer.h
rename to be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
index 3b27b22656..b8b3f398db 100644
--- a/be/src/io/hdfs_reader_writer.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
@@ -17,22 +17,27 @@
 
 #pragma once
 
-#include "gen_cpp/PlanNodes_types.h"
-#include "io/file_reader.h"
-#include "io/file_writer.h"
+#include <gen_cpp/Types_types.h>
+
+#include "exec/line_reader.h"
+#include "io/fs/file_reader.h"
 
 namespace doris {
 
-// TODO(ftw): This file should be deleted when new_file_factory.h replace 
file_factory.h
-class HdfsReaderWriter {
+class NewPlainBinaryLineReader : public LineReader {
 public:
-    static Status create_reader(const THdfsParams& hdfs_params, const 
std::string& path,
-                                int64_t start_offset, FileReader** reader);
+    NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, TFileType::type 
file_type);
+
+    ~NewPlainBinaryLineReader() override;
+
+    Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
 
-    static Status create_reader(const std::map<std::string, std::string>& 
properties,
-                                const std::string& path, int64_t start_offset, 
FileReader** reader);
+    void close() override;
 
-    static Status create_writer(const std::map<std::string, std::string>& 
properties,
-                                const std::string& path, 
std::unique_ptr<FileWriter>& writer);
+private:
+    io::FileReaderSPtr _file_reader;
+
+    TFileType::type _file_type;
 };
+
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 56bc26c648..52af605154 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -33,7 +33,8 @@
 
 namespace doris {
 
-NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, 
io::FileReader* file_reader,
+NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
+                                               io::FileReaderSPtr file_reader,
                                                Decompressor* decompressor, 
size_t length,
                                                const std::string& 
line_delimiter,
                                                size_t line_delimiter_length, 
size_t current_offset)
@@ -134,11 +135,6 @@ void NewPlainTextLineReader::extend_input_buf() {
         _input_buf_limit -= _input_buf_pos;
         _input_buf_pos = 0;
     } while (false);
-
-    // LOG(INFO) << "extend input buf."
-    //           << " input_buf_size: " << _input_buf_size
-    //           << " input_buf_pos: " << _input_buf_pos
-    //           << " input_buf_limit: " << _input_buf_limit;
 }
 
 void NewPlainTextLineReader::extend_output_buf() {
@@ -176,11 +172,6 @@ void NewPlainTextLineReader::extend_output_buf() {
         _output_buf_limit -= _output_buf_pos;
         _output_buf_pos = 0;
     } while (false);
-
-    // LOG(INFO) << "extend output buf."
-    //           << " output_buf_size: " << _output_buf_size
-    //           << " output_buf_pos: " << _output_buf_pos
-    //           << " output_buf_limit: " << _output_buf_limit;
 }
 
 Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, 
bool* eof) {
@@ -241,9 +232,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
                     IOContext io_ctx;
                     RETURN_IF_ERROR(
                             _file_reader->read_at(_current_offset, file_slice, 
io_ctx, &read_len));
+                    _current_offset += read_len;
+                    if (read_len == 0) {
+                        _file_eof = true;
+                    }
                     COUNTER_UPDATE(_bytes_read_counter, read_len);
                 }
-                // LOG(INFO) << "after read file: _file_eof: " << _file_eof << 
" read_len: " << read_len;
                 if (_file_eof || read_len == 0) {
                     if (!_stream_end) {
                         return Status::InternalError(
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index a39e578577..99debf6d18 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -18,19 +18,17 @@
 #pragma once
 
 #include "exec/line_reader.h"
+#include "io/fs/file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
-namespace io {
-class FileReader;
-}
 
 class Decompressor;
 class Status;
 
 class NewPlainTextLineReader : public LineReader {
 public:
-    NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* 
file_reader,
+    NewPlainTextLineReader(RuntimeProfile* profile, io::FileReaderSPtr 
file_reader,
                            Decompressor* decompressor, size_t length,
                            const std::string& line_delimiter, size_t 
line_delimiter_length,
                            size_t current_offset);
@@ -60,7 +58,7 @@ private:
     void extend_output_buf();
 
     RuntimeProfile* _profile;
-    io::FileReader* _file_reader;
+    io::FileReaderSPtr _file_reader;
     Decompressor* _decompressor;
     // the min length that should be read.
     // -1 means endless(for stream load)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e179f91eea..13e86b1ba9 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -18,12 +18,14 @@
 #include "vec/exec/format/json/new_json_reader.h"
 
 #include "common/compiler_util.h"
-#include "exec/plain_text_line_reader.h"
 #include "exprs/json_functions.h"
 #include "io/file_factory.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/iterators.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
+#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
 #include "vec/exec/scan/vscanner.h"
 namespace doris::vectorized {
 using namespace ErrorCode;
@@ -38,9 +40,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _params(params),
           _range(range),
           _file_slot_descs(file_slot_descs),
+          _file_system(nullptr),
           _file_reader(nullptr),
-          _file_reader_s(nullptr),
-          _real_file_reader(nullptr),
           _line_reader(nullptr),
           _reader_eof(false),
           _skip_first_line(false),
@@ -49,7 +50,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _value_allocator(_value_buffer, sizeof(_value_buffer)),
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
-          _scanner_eof(scanner_eof) {
+          _scanner_eof(scanner_eof),
+          _current_offset(0) {
     _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
     _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
@@ -64,9 +66,6 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const 
TFileScanRangeParams
           _params(params),
           _range(range),
           _file_slot_descs(file_slot_descs),
-          _file_reader(nullptr),
-          _file_reader_s(nullptr),
-          _real_file_reader(nullptr),
           _line_reader(nullptr),
           _reader_eof(false),
           _skip_first_line(false),
@@ -159,11 +158,11 @@ Status 
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
     if (_line_reader != nullptr) {
         RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof));
     } else {
-        int64_t length = 0;
-        RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, 
&length));
+        size_t read_size = 0;
+        RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size));
         json_str = json_str_ptr.get();
-        size = length;
-        if (length == 0) {
+        size = read_size;
+        if (read_size == 0) {
             eof = true;
         }
     }
@@ -286,15 +285,25 @@ Status NewJsonReader::_open_file_reader() {
         start_offset -= 1;
     }
 
+    _current_offset = start_offset;
+
+    FileSystemProperties system_properties;
+    system_properties.system_type = _params.file_type;
+    system_properties.properties = _params.properties;
+    system_properties.hdfs_params = _params.hdfs_params;
+
+    FileDescription file_description;
+    file_description.path = _range.path;
+    file_description.start_offset = start_offset;
+    file_description.file_size = _range.__isset.file_size ? _range.file_size : 
0;
+
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
_file_reader_s));
-        _real_file_reader = _file_reader_s.get();
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
     } else {
         RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, _params, _range.path, start_offset, 
_range.file_size, 0, _file_reader));
-        _real_file_reader = _file_reader.get();
+                _profile, system_properties, file_description, &_file_system, 
&_file_reader));
     }
-    return _real_file_reader->open();
+    return Status::OK();
 }
 
 Status NewJsonReader::_open_line_reader() {
@@ -306,8 +315,9 @@ Status NewJsonReader::_open_line_reader() {
     } else {
         _skip_first_line = false;
     }
-    _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader, 
nullptr, size,
-                                               _line_delimiter, 
_line_delimiter_length));
+    _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, 
nullptr, size,
+                                                  _line_delimiter, 
_line_delimiter_length,
+                                                  _current_offset));
     return Status::OK();
 }
 
@@ -509,13 +519,12 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
     if (_line_reader != nullptr) {
         RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
     } else {
-        int64_t length = 0;
-        RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, 
&length));
-        json_str = json_str_ptr.get();
-        *size = length;
-        if (length == 0) {
+        RETURN_IF_ERROR(_read_one_message(&json_str_ptr, size));
+        json_str = json_str_ptr.release();
+        if (*size == 0) {
             *eof = true;
         }
+        _current_offset += *size;
     }
 
     _bytes_read_counter += *size;
@@ -877,4 +886,28 @@ std::string NewJsonReader::_print_json_value(const 
rapidjson::Value& value) {
     return std::string(buffer.GetString());
 }
 
+Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, 
size_t* read_size) {
+    IOContext io_ctx;
+    io_ctx.reader_type = READER_QUERY;
+    switch (_params.file_type) {
+    case TFileType::FILE_LOCAL:
+    case TFileType::FILE_HDFS:
+    case TFileType::FILE_S3: {
+        size_t file_size = _file_reader->size();
+        file_buf->reset(new uint8_t[file_size]);
+        Slice result(file_buf->get(), file_size);
+        RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, io_ctx, 
read_size));
+        break;
+    }
+    case TFileType::FILE_STREAM: {
+        RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
+                                ->read_one_message(file_buf, read_size));
+        break;
+    }
+    default: {
+        return Status::NotSupported("no supported file reader type: {}", 
_params.file_type);
+    }
+    }
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 5a057d32fe..98aae55ea4 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -22,6 +22,7 @@
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/writer.h>
 
+#include "io/fs/file_reader.h"
 #include "vec/exec/format/generic_reader.h"
 namespace doris {
 
@@ -92,7 +93,8 @@ private:
 
     std::string _print_json_value(const rapidjson::Value& value);
 
-private:
+    Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* 
read_size);
+
     Status (NewJsonReader::*_vhandle_json_callback)(
             std::vector<vectorized::MutableColumnPtr>& columns,
             const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row, bool* eof);
@@ -103,12 +105,8 @@ private:
     const TFileRangeDesc& _range;
     const std::vector<SlotDescriptor*>& _file_slot_descs;
 
-    // _file_reader_s is for stream load pipe reader,
-    // and _file_reader is for other file reader.
-    // TODO: refactor this to use only shared_ptr or unique_ptr
-    std::unique_ptr<FileReader> _file_reader;
-    std::shared_ptr<FileReader> _file_reader_s;
-    FileReader* _real_file_reader;
+    std::unique_ptr<io::FileSystem> _file_system;
+    io::FileReaderSPtr _file_reader;
     std::unique_ptr<LineReader> _line_reader;
     bool _reader_eof;
 
@@ -134,9 +132,8 @@ private:
     char _value_buffer[4 * 1024 * 1024]; // 4MB
     char _parse_buffer[512 * 1024];      // 512KB
 
-    typedef rapidjson::GenericDocument<rapidjson::UTF8<>, 
rapidjson::MemoryPoolAllocator<>,
-                                       rapidjson::MemoryPoolAllocator<>>
-            Document;
+    using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, 
rapidjson::MemoryPoolAllocator<>,
+                                                
rapidjson::MemoryPoolAllocator<>>;
     rapidjson::MemoryPoolAllocator<> _value_allocator;
     rapidjson::MemoryPoolAllocator<> _parse_allocator;
     Document _origin_json_doc;   // origin json document object from parsed 
json string
@@ -145,6 +142,8 @@ private:
 
     bool* _scanner_eof;
 
+    size_t _current_offset;
+
     RuntimeProfile::Counter* _bytes_read_counter;
     RuntimeProfile::Counter* _read_timer;
     RuntimeProfile::Counter* _file_read_timer;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 2748160e97..c6867b0d83 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -23,6 +23,9 @@
 #include "cctz/time_zone.h"
 #include "gutil/strings/substitute.h"
 #include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "olap/iterators.h"
+#include "util/slice.h"
 #include "vec/columns/column_array.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -47,10 +50,11 @@ void ORCFileInputStream::read(void* buf, uint64_t length, 
uint64_t offset) {
     SCOPED_RAW_TIMER(&_statistics.read_time);
     uint64_t has_read = 0;
     char* out = reinterpret_cast<char*>(buf);
+    IOContext io_ctx;
     while (has_read < length) {
-        int64_t loop_read;
-        Status st = _file_reader->readat(offset + has_read, length - has_read, 
&loop_read,
-                                         out + has_read);
+        size_t loop_read;
+        Slice result(out + has_read, length - has_read);
+        Status st = _file_reader->read_at(offset + has_read, result, io_ctx, 
&loop_read);
         if (!st.ok()) {
             throw orc::ParseError(
                     strings::Substitute("Failed to read $0: $1", _file_name, 
st.to_string()));
@@ -87,7 +91,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
           _scan_params(params),
           _scan_range(range),
           _ctz(ctz),
-          _column_names(column_names) {}
+          _column_names(column_names),
+          _file_system(nullptr) {}
 
 OrcReader::~OrcReader() {
     close();
@@ -132,12 +137,22 @@ Status OrcReader::init_reader(
         std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
     if (_file_reader == nullptr) {
-        std::unique_ptr<FileReader> inner_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range.path,
-                                                        
_scan_range.start_offset,
-                                                        _scan_range.file_size, 
0, inner_reader));
-        RETURN_IF_ERROR(inner_reader->open());
-        _file_reader = new ORCFileInputStream(_scan_range.path, 
inner_reader.release());
+        io::FileReaderSPtr inner_reader;
+
+        FileSystemProperties system_properties;
+        system_properties.system_type = _scan_params.file_type;
+        system_properties.properties = _scan_params.properties;
+        system_properties.hdfs_params = _scan_params.hdfs_params;
+
+        FileDescription file_description;
+        file_description.path = _scan_range.path;
+        file_description.start_offset = _scan_range.start_offset;
+        file_description.file_size = _scan_range.__isset.file_size ? 
_scan_range.file_size : 0;
+
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                _profile, system_properties, file_description, &_file_system, 
&inner_reader));
+
+        _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
     }
     if (_file_reader->getLength() == 0) {
         return Status::EndOfFile("Empty orc file");
@@ -179,12 +194,22 @@ Status OrcReader::init_reader(
 Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
     if (_file_reader == nullptr) {
-        std::unique_ptr<FileReader> inner_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range.path,
-                                                        
_scan_range.start_offset,
-                                                        _scan_range.file_size, 
0, inner_reader));
-        RETURN_IF_ERROR(inner_reader->open());
-        _file_reader = new ORCFileInputStream(_scan_range.path, 
inner_reader.release());
+        io::FileReaderSPtr inner_reader;
+
+        FileSystemProperties system_properties;
+        system_properties.system_type = _scan_params.file_type;
+        system_properties.properties = _scan_params.properties;
+        system_properties.hdfs_params = _scan_params.hdfs_params;
+
+        FileDescription file_description;
+        file_description.path = _scan_range.path;
+        file_description.start_offset = _scan_range.start_offset;
+        file_description.file_size = _scan_range.__isset.file_size ? 
_scan_range.file_size : 0;
+
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                _profile, system_properties, file_description, &_file_system, 
&inner_reader));
+
+        _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
     }
     if (_file_reader->getLength() == 0) {
         return Status::EndOfFile("Empty orc file");
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 98c2fc7c02..4f5f4dba10 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -21,7 +21,7 @@
 
 #include "common/config.h"
 #include "exec/olap_common.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/exec/format/format_common.h"
@@ -37,15 +37,10 @@ public:
         int64_t read_bytes = 0;
     };
 
-    ORCFileInputStream(const std::string& file_name, FileReader* file_reader)
+    ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
file_reader)
             : _file_name(file_name), _file_reader(file_reader) {};
 
-    ~ORCFileInputStream() override {
-        if (_file_reader != nullptr) {
-            _file_reader->close();
-            delete _file_reader;
-        }
-    }
+    ~ORCFileInputStream() override = default;
 
     uint64_t getLength() const override { return _file_reader->size(); }
 
@@ -60,7 +55,7 @@ public:
 private:
     Statistics _statistics;
     const std::string& _file_name;
-    FileReader* _file_reader;
+    io::FileReaderSPtr _file_reader;
 };
 
 class OrcReader : public GenericReader {
@@ -82,7 +77,7 @@ public:
 
     ~OrcReader() override;
     // for test
-    void set_file_reader(const std::string& file_name, FileReader* 
file_reader) {
+    void set_file_reader(const std::string& file_name, io::FileReaderSPtr 
file_reader) {
         _file_reader = new ORCFileInputStream(file_name, file_reader);
     }
 
@@ -283,6 +278,8 @@ private:
     orc::ReaderOptions _reader_options;
     orc::RowReaderOptions _row_reader_options;
 
+    std::unique_ptr<io::FileSystem> _file_system;
+
     // only for decimal
     DecimalScaleParams _decimal_scale_params;
 };
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 5811f034bd..269f6a4368 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -23,7 +23,8 @@
 
 #include "common/logging.h"
 #include "gen_cpp/parquet_types.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
+#include "olap/iterators.h"
 #include "util/coding.h"
 #include "util/thrift_util.h"
 #include "vparquet_file_metadata.h"
@@ -33,12 +34,14 @@ namespace doris::vectorized {
 constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
 
-static Status parse_thrift_footer(FileReader* file, 
std::shared_ptr<FileMetaData>& file_metadata) {
+static Status parse_thrift_footer(io::FileReaderSPtr file,
+                                  std::shared_ptr<FileMetaData>& 
file_metadata) {
     uint8_t footer[PARQUET_FOOTER_SIZE];
     int64_t file_size = file->size();
-    int64_t bytes_read = 0;
-    RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE, 
PARQUET_FOOTER_SIZE, &bytes_read,
-                                 footer));
+    size_t bytes_read = 0;
+    Slice result(footer, PARQUET_FOOTER_SIZE);
+    IOContext io_ctx;
+    RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, 
io_ctx, &bytes_read));
     DCHECK_EQ(bytes_read, PARQUET_FOOTER_SIZE);
 
     // validate magic
@@ -57,8 +60,9 @@ static Status parse_thrift_footer(FileReader* file, 
std::shared_ptr<FileMetaData
     tparquet::FileMetaData t_metadata;
     // deserialize footer
     uint8_t meta_buff[metadata_size];
-    RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE - 
metadata_size, metadata_size,
-                                 &bytes_read, meta_buff));
+    Slice res(meta_buff, metadata_size);
+    RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - 
metadata_size, res, io_ctx,
+                                  &bytes_read));
     DCHECK_EQ(bytes_read, metadata_size);
     RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff, &metadata_size, true, 
&t_metadata));
     file_metadata.reset(new FileMetaData(t_metadata));
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 75f43b4730..a7d95225b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -26,7 +26,7 @@
 
 namespace doris::vectorized {
 
-Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
                                    const ParquetReadColumn& column,
                                    const tparquet::RowGroup& row_group,
                                    const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
@@ -80,8 +80,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t 
start_index, int64_t end
     }
 }
 
-Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
-                                size_t max_buf_size) {
+Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+                                tparquet::ColumnChunk* chunk, size_t 
max_buf_size) {
     _stream_reader =
             new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size(),
                                          std::min((size_t)_metadata->size(), 
max_buf_size));
@@ -273,8 +273,8 @@ void ArrayColumnReader::_reserve_def_levels_buf(size_t 
size) {
     }
 }
 
-Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
-                               size_t max_buf_size) {
+Status ArrayColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+                               tparquet::ColumnChunk* chunk, size_t 
max_buf_size) {
     _stream_reader =
             new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size(),
                                          std::min((size_t)_metadata->size(), 
max_buf_size));
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index de0ec185b9..0f61c5742c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -107,8 +107,8 @@ public:
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                                     ColumnSelectVector& select_vector, size_t 
batch_size,
                                     size_t* read_rows, bool* eof) = 0;
-    static Status create(FileReader* file, FieldSchema* field, const 
ParquetReadColumn& column,
-                         const tparquet::RowGroup& row_group,
+    static Status create(io::FileReaderSPtr file, FieldSchema* field,
+                         const ParquetReadColumn& column, const 
tparquet::RowGroup& row_group,
                          const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
                          std::unique_ptr<ParquetColumnReader>& reader, size_t 
max_buf_size);
     void init_column_metadata(const tparquet::ColumnChunk& chunk);
@@ -139,7 +139,7 @@ public:
     ScalarColumnReader(const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz)
             : ParquetColumnReader(row_ranges, ctz) {};
     ~ScalarColumnReader() override { close(); };
-    Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
+    Status init(io::FileReaderSPtr file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
                 size_t max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
@@ -155,7 +155,7 @@ public:
     ArrayColumnReader(const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz)
             : ParquetColumnReader(row_ranges, ctz) {};
     ~ArrayColumnReader() override { close(); };
-    Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
+    Status init(io::FileReaderSPtr file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
                 size_t max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 24441896a9..b406919b4c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -25,7 +25,7 @@ namespace doris::vectorized {
 
 const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
 
-RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
+RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
                                const std::vector<ParquetReadColumn>& 
read_columns,
                                const int32_t row_group_id, const 
tparquet::RowGroup& row_group,
                                cctz::time_zone* ctz,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 53e8a052c0..d53e32de2a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -19,6 +19,7 @@
 
 #include "exec/text_converter.h"
 #include "io/file_reader.h"
+#include "io/fs/file_reader.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vparquet_column_reader.h"
@@ -99,7 +100,7 @@ public:
         PositionDeleteContext(const PositionDeleteContext& filter) = default;
     };
 
-    RowGroupReader(doris::FileReader* file_reader,
+    RowGroupReader(io::FileReaderSPtr file_reader,
                    const std::vector<ParquetReadColumn>& read_columns, const 
int32_t row_group_id,
                    const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
                    const PositionDeleteContext& position_delete_ctx,
@@ -131,7 +132,7 @@ private:
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns);
 
-    doris::FileReader* _file_reader;
+    io::FileReaderSPtr _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
     const std::vector<ParquetReadColumn>& _read_columns;
     const int32_t _row_group_id;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 3d00f52355..ecf4d23259 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "io/file_factory.h"
+#include "olap/iterators.h"
 #include "parquet_pred_cmp.h"
 #include "parquet_thrift_util.h"
 #include "vec/exprs/vbloom_predicate.h"
@@ -135,18 +136,26 @@ void ParquetReader::close() {
 }
 
 Status ParquetReader::_open_file() {
+    FileSystemProperties system_properties;
+    system_properties.system_type = _scan_params.file_type;
+    system_properties.properties = _scan_params.properties;
+    system_properties.hdfs_params = _scan_params.hdfs_params;
+
+    FileDescription file_description;
+    file_description.path = _scan_range.path;
+    file_description.start_offset = _scan_range.start_offset;
+    file_description.file_size = _scan_range.__isset.file_size ? 
_scan_range.file_size : 0;
+
     if (_file_reader == nullptr) {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range.path,
-                                                        
_scan_range.start_offset,
-                                                        _scan_range.file_size, 
0, _file_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                _profile, system_properties, file_description, &_file_system, 
&_file_reader));
     }
     if (_file_metadata == nullptr) {
         SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
-        RETURN_IF_ERROR(_file_reader->open());
         if (_file_reader->size() == 0) {
             return Status::EndOfFile("Empty Parquet File");
         }
-        RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), 
_file_metadata));
+        RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
     }
     return Status::OK();
 }
@@ -403,7 +412,7 @@ Status ParquetReader::_next_row_group_reader() {
 
     RowGroupReader::PositionDeleteContext position_delete_ctx =
             _get_position_delete_ctx(row_group, row_group_index);
-    _current_group_reader.reset(new RowGroupReader(_file_reader.get(), 
_read_columns,
+    _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns,
                                                    
row_group_index.row_group_id, row_group, _ctz,
                                                    position_delete_ctx, 
_lazy_read_ctx));
     return _current_group_reader->init(_file_metadata->schema(), 
candidate_row_ranges,
@@ -493,16 +502,17 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         return Status::OK();
     }
     uint8_t col_index_buff[page_index._column_index_size];
-    int64_t bytes_read = 0;
-    RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
-                                         page_index._column_index_size, 
&bytes_read,
-                                         col_index_buff));
+    size_t bytes_read = 0;
+    Slice result(col_index_buff, page_index._column_index_size);
+    IOContext io_ctx;
+    RETURN_IF_ERROR(
+            _file_reader->read_at(page_index._column_index_start, result, 
io_ctx, &bytes_read));
     auto& schema_desc = _file_metadata->schema();
     std::vector<RowRange> skipped_row_ranges;
     uint8_t off_index_buff[page_index._offset_index_size];
-    RETURN_IF_ERROR(_file_reader->readat(page_index._offset_index_start,
-                                         page_index._offset_index_size, 
&bytes_read,
-                                         off_index_buff));
+    Slice res(off_index_buff, page_index._offset_index_size);
+    RETURN_IF_ERROR(
+            _file_reader->read_at(page_index._offset_index_start, res, io_ctx, 
&bytes_read));
     for (auto& read_col : _read_columns) {
         auto conjunct_iter = 
_colname_to_value_range->find(read_col._file_slot_name);
         if (_colname_to_value_range->end() == conjunct_iter) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 2bfc74f823..2f2d37aec5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -26,7 +26,8 @@
 #include "common/status.h"
 #include "exec/olap_common.h"
 #include "gen_cpp/parquet_types.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
 #include "vec/core/block.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exprs/vexpr_context.h"
@@ -61,7 +62,7 @@ public:
 
     ~ParquetReader() override;
     // for test
-    void set_file_reader(FileReader* file_reader) { 
_file_reader.reset(file_reader); }
+    void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = 
file_reader; }
 
     Status init_reader(const std::vector<std::string>& column_names, bool 
filter_groups = true) {
         // without predicate
@@ -151,7 +152,8 @@ private:
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
-    std::unique_ptr<FileReader> _file_reader = nullptr;
+    std::unique_ptr<io::FileSystem> _file_system = nullptr;
+    io::FileReaderSPtr _file_reader = nullptr;
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
     std::unique_ptr<RowGroupReader> _current_group_reader;
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index 2e5a1e93c6..4cc3122714 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -23,7 +23,7 @@
 #include "gen_cpp/FrontendService_types.h"
 #include "gen_cpp/HeartbeatService_types.h"
 #include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
 #include "util/cpu_info.h"
 
@@ -38,8 +38,8 @@ extern TStreamLoadPutResult k_stream_load_put_result;
 
 class RoutineLoadTaskExecutorTest : public testing::Test {
 public:
-    RoutineLoadTaskExecutorTest() {}
-    virtual ~RoutineLoadTaskExecutorTest() {}
+    RoutineLoadTaskExecutorTest() = default;
+    ~RoutineLoadTaskExecutorTest() override = default;
 
     void SetUp() override {
         k_stream_load_begin_result = TLoadTxnBeginResult();
@@ -47,24 +47,20 @@ public:
         k_stream_load_rollback_result = TLoadTxnRollbackResult();
         k_stream_load_put_result = TStreamLoadPutResult();
 
-        _env._master_info = new TMasterInfo();
-        _env._load_stream_mgr = new LoadStreamMgr();
-        _env._stream_load_executor = new StreamLoadExecutor(&_env);
+        _env.set_master_info(new TMasterInfo());
+        _env.set_new_load_stream_mgr(new NewLoadStreamMgr());
+        _env.set_stream_load_executor(new StreamLoadExecutor(&_env));
 
         config::routine_load_thread_pool_size = 5;
         config::max_consumer_num_per_group = 3;
     }
 
     void TearDown() override {
-        delete _env._master_info;
-        _env._master_info = nullptr;
-        delete _env._load_stream_mgr;
-        _env._load_stream_mgr = nullptr;
-        delete _env._stream_load_executor;
-        _env._stream_load_executor = nullptr;
+        delete _env.master_info();
+        delete _env.new_load_stream_mgr();
+        delete _env.stream_load_executor();
     }
 
-private:
     ExecEnv _env;
 };
 
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 1424cf3203..319feb0b8c 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -18,7 +18,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "io/local_file_reader.h"
+#include "io/fs/local_file_system.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -89,8 +89,9 @@ TEST_F(ParquetReaderTest, normal) {
     DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
 
     auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
-    LocalFileReader* reader =
-            new 
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
 0);
+    io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+    io::FileReaderSPtr reader;
+    
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
 &reader);
 
     cctz::time_zone ctz;
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index d1f59e9234..f0712cf7c5 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -24,8 +24,8 @@
 
 #include "exec/schema_scanner.h"
 #include "io/buffered_reader.h"
-#include "io/file_reader.h"
-#include "io/local_file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "olap/iterators.h"
 #include "runtime/string_value.h"
 #include "util/runtime_profile.h"
 #include "util/timezone_utils.h"
@@ -47,13 +47,14 @@ public:
 };
 
 TEST_F(ParquetThriftReaderTest, normal) {
-    LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0);
-
-    auto st = reader.open();
+    io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+    io::FileReaderSPtr reader;
+    auto st = 
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/localfile.parquet",
+                                  &reader);
     EXPECT_TRUE(st.ok());
 
     std::shared_ptr<FileMetaData> meta_data;
-    parse_thrift_footer(&reader, meta_data);
+    parse_thrift_footer(reader, meta_data);
     tparquet::FileMetaData t_metadata = meta_data->to_thrift();
 
     LOG(WARNING) << "=====================================";
@@ -77,13 +78,15 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
     //   `hobby` array<map<string,string>>,
     //   `friend` map<string,string>,
     //   `mark` struct<math:int,english:int>)
-    LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet", 0);
 
-    auto st = reader.open();
+    io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+    io::FileReaderSPtr reader;
+    auto st = 
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet",
+                                  &reader);
     EXPECT_TRUE(st.ok());
 
     std::shared_ptr<FileMetaData> metadata;
-    parse_thrift_footer(&reader, metadata);
+    parse_thrift_footer(reader, metadata);
     tparquet::FileMetaData t_metadata = metadata->to_thrift();
     FieldDescriptor schemaDescriptor;
     schemaDescriptor.parse_from_thrift(t_metadata.schema);
@@ -146,7 +149,7 @@ static int fill_nullable_column(ColumnPtr& doris_column, 
level_t* definitions, s
     return null_cnt;
 }
 
-static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk* column_chunk,
+static Status get_column_values(io::FileReaderSPtr file_reader, 
tparquet::ColumnChunk* column_chunk,
                                 FieldSchema* field_schema, ColumnPtr& 
doris_column,
                                 DataTypePtr& data_type, level_t* definitions) {
     tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
@@ -280,14 +283,15 @@ static void read_parquet_data_and_check(const 
std::string& parquet_file,
      * `list_string` array<string>) // 14
      */
 
-    LocalFileReader reader(parquet_file, 0);
-    auto st = reader.open();
+    io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+    io::FileReaderSPtr reader;
+    auto st = local_fs->open_file(parquet_file, &reader);
     EXPECT_TRUE(st.ok());
 
     std::unique_ptr<vectorized::Block> block;
     create_block(block);
     std::shared_ptr<FileMetaData> metadata;
-    parse_thrift_footer(&reader, metadata);
+    parse_thrift_footer(reader, metadata);
     tparquet::FileMetaData t_metadata = metadata->to_thrift();
     FieldDescriptor schema_descriptor;
     schema_descriptor.parse_from_thrift(t_metadata.schema);
@@ -297,7 +301,7 @@ static void read_parquet_data_and_check(const std::string& 
parquet_file,
         auto& column_name_with_type = block->get_by_position(c);
         auto& data_column = column_name_with_type.column;
         auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[c],
+        get_column_values(reader, &t_metadata.row_groups[0].columns[c],
                           
const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column,
                           data_type, defs);
     }
@@ -306,7 +310,7 @@ static void read_parquet_data_and_check(const std::string& 
parquet_file,
         auto& column_name_with_type = block->get_by_position(14);
         auto& data_column = column_name_with_type.column;
         auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
+        get_column_values(reader, &t_metadata.row_groups[0].columns[13],
                           
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
                           data_type, defs);
     }
@@ -315,19 +319,20 @@ static void read_parquet_data_and_check(const 
std::string& parquet_file,
         auto& column_name_with_type = block->get_by_position(15);
         auto& data_column = column_name_with_type.column;
         auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
+        get_column_values(reader, &t_metadata.row_groups[0].columns[9],
                           
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
                           data_type, defs);
     }
 
-    LocalFileReader result(result_file, 0);
-    auto rst = result.open();
+    io::FileReaderSPtr result;
+    auto rst = local_fs->open_file(result_file, &result);
     EXPECT_TRUE(rst.ok());
-    uint8_t result_buf[result.size() + 1];
-    result_buf[result.size()] = '\0';
-    int64_t bytes_read;
-    bool eof;
-    result.read(result_buf, result.size(), &bytes_read, &eof);
+    uint8_t result_buf[result->size() + 1];
+    result_buf[result->size()] = '\0';
+    size_t bytes_read;
+    Slice res(result_buf, result->size());
+    IOContext io_ctx;
+    result->read_at(0, res, io_ctx, &bytes_read);
     ASSERT_STREQ(block->dump_data(0, rows).c_str(), 
reinterpret_cast<char*>(result_buf));
 }
 
@@ -400,13 +405,15 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
         lazy_read_ctx.all_read_columns.emplace_back(slot->col_name());
         read_columns.emplace_back(ParquetReadColumn(7, slot->col_name()));
     }
-    LocalFileReader 
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
-    auto st = file_reader.open();
+    io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+    io::FileReaderSPtr file_reader;
+    auto st = 
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
+                                  &file_reader);
     EXPECT_TRUE(st.ok());
 
     // prepare metadata
     std::shared_ptr<FileMetaData> meta_data;
-    parse_thrift_footer(&file_reader, meta_data);
+    parse_thrift_footer(file_reader, meta_data);
     tparquet::FileMetaData t_metadata = meta_data->to_thrift();
 
     cctz::time_zone ctz;
@@ -414,10 +421,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     auto row_group = t_metadata.row_groups[0];
     std::shared_ptr<RowGroupReader> row_group_reader;
     RowGroupReader::PositionDeleteContext 
position_delete_ctx(row_group.num_rows, 0);
-    row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, 
row_group, &ctz,
+    row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, 
row_group, &ctz,
                                               position_delete_ctx, 
lazy_read_ctx));
     std::vector<RowRange> row_ranges;
     row_ranges.emplace_back(0, row_group.num_rows);
+
     auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
     auto stg = row_group_reader->init(meta_data->schema(), row_ranges, 
col_offsets);
     EXPECT_TRUE(stg.ok());
@@ -435,14 +443,16 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     auto stb = row_group_reader->next_batch(&block, 1024, &read_rows, 
&batch_eof);
     EXPECT_TRUE(stb.ok());
 
-    LocalFileReader 
result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0);
-    auto rst = result.open();
+    io::FileReaderSPtr result;
+    auto rst = 
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/group-reader.txt",
+                                   &result);
     EXPECT_TRUE(rst.ok());
-    uint8_t result_buf[result.size() + 1];
-    result_buf[result.size()] = '\0';
-    int64_t bytes_read;
-    bool eof;
-    result.read(result_buf, result.size(), &bytes_read, &eof);
+    uint8_t result_buf[result->size() + 1];
+    result_buf[result->size()] = '\0';
+    size_t bytes_read;
+    Slice res(result_buf, result->size());
+    IOContext io_ctx;
+    result->read_at(0, res, io_ctx, &bytes_read);
     ASSERT_STREQ(block.dump_data(0, 10).c_str(), 
reinterpret_cast<char*>(result_buf));
 }
 } // namespace vectorized
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out 
b/regression-test/data/load_p0/stream_load/test_json_load.out
index ebf706f594..d0de96b7d7 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -12,32 +12,6 @@
 10     hefei   23456710
 200    changsha        3456789
 
--- !select1 --
-1      beijing 2345671
-2      shanghai        2345672
-3      guangzhou       2345673
-4      shenzhen        2345674
-5      hangzhou        2345675
-6      nanjing 2345676
-7      wuhan   2345677
-8      chengdu 2345678
-9      xian    2345679
-10     hefei   23456710
-200    changsha        3456789
-
--- !select2 --
-10     beijing 2345671
-20     shanghai        2345672
-30     guangzhou       2345673
-40     shenzhen        2345674
-50     hangzhou        2345675
-60     nanjing 2345676
-70     wuhan   2345677
-80     chengdu 2345678
-90     xian    2345679
-100    hefei   23456710
-200    changsha        3456789
-
 -- !select2 --
 10     beijing 2345671
 20     shanghai        2345672
@@ -64,32 +38,6 @@
 10     23456710
 200    755
 
--- !select3 --
-1      2345671
-2      2345672
-3      2345673
-4      2345674
-5      2345675
-6      2345676
-7      2345677
-8      2345678
-9      2345679
-10     23456710
-200    755
-
--- !select4 --
-1      210
-2      220
-3      230
-4      240
-5      250
-6      260
-7      270
-8      280
-9      290
-10     300
-200    755
-
 -- !select4 --
 1      210
 2      220
@@ -116,32 +64,6 @@
 10     2345676
 200    755
 
--- !select5 --
-1      1454547
-2      1244264
-3      528369
-4      594201
-5      594201
-6      2345672
-7      2345673
-8      2345674
-9      2345675
-10     2345676
-200    755
-
--- !select6 --
-10     1454547
-20     1244264
-30     528369
-40     594201
-50     594201
-60     2345672
-70     2345673
-80     2345674
-90     2345675
-100    2345676
-200    755
-
 -- !select6 --
 10     1454547
 20     1244264
@@ -163,22 +85,6 @@
 100    2345676
 200    755
 
--- !select7 --
-60     2345672
-70     2345673
-80     2345674
-90     2345675
-100    2345676
-200    755
-
--- !select8 --
-60     2345672
-70     2345673
-80     2345674
-90     2345675
-100    2345676
-200    755
-
 -- !select8 --
 60     2345672
 70     2345673
@@ -195,33 +101,9 @@
 50     guangzhou       2345675
 200    changsha        3456789
 
--- !select9 --
-10     beijing 2345671
-20     shanghai        2345672
-30     hangzhou        2345673
-40     shenzhen        2345674
-50     guangzhou       2345675
-200    changsha        3456789
-
 -- !select10 --
 200    changsha        3456789
 
--- !select10 --
-200    changsha        3456789
-
--- !select11 --
-1      beijing 2345671
-2      shanghai        2345672
-3      guangzhou       2345673
-4      shenzhen        2345674
-5      hangzhou        2345675
-6      nanjing 2345676
-7      wuhan   2345677
-8      chengdu 2345678
-9      xian    2345679
-10     hefei   23456710
-200    changsha        3456789
-
 -- !select11 --
 1      beijing 2345671
 2      shanghai        2345672
@@ -248,28 +130,6 @@
 10     \N      23456710
 200    changsha        3456789
 
--- !select12 --
-1      \N      2345671
-2      shanghai        2345672
-3      beijing 2345673
-4      shenzhen        2345674
-5      hangzhou        2345675
-6      nanjing 2345676
-7      \N      2345677
-8      chengdu 2345678
-9      \N      2345679
-10     \N      23456710
-200    changsha        3456789
-
--- !select13 --
-2      shanghai        2345672
-3      beijing 2345673
-4      shenzhen        2345674
-5      hangzhou        2345675
-6      nanjing 2345676
-8      chengdu 2345678
-200    hangzhou        12345
-
 -- !select13 --
 2      shanghai        2345672
 3      beijing 2345673
@@ -287,22 +147,6 @@
 50     2345675 \N
 200    changsha        3456789
 
--- !select14 --
-10     2345671 \N
-20     2345672 \N
-30     2345673 \N
-40     2345674 \N
-50     2345675 \N
-200    changsha        3456789
-
--- !select15 --
-10     beijing 2345671
-20     shanghai        2345672
-30     hangzhou        2345673
-40     shenzhen        2345674
-50     guangzhou       2345675
-200    changsha        3456789
-
 -- !select15 --
 10     beijing 2345671
 20     shanghai        2345672
@@ -320,12 +164,3 @@
 6      fuyang  2345676
 200    changsha        3456789
 
--- !select16 --
-1      xihu    2345671
-2      xiaoshan        2345672
-3      binjiang        2345673
-4      shangcheng      2345674
-5      tonglu  2345675
-6      fuyang  2345676
-200    changsha        3456789
-
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 513d3e14dd..d2c5d15fe0 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -114,7 +114,7 @@ suite("test_json_load", "p0") {
         assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
     }
     
-    def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, 
format_flag, exprs, json_paths, 
+    def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, 
json_paths, 
                         json_root, where_expr, fuzzy_flag, file_name, 
ignore_failure=false ->
         
         // load the json data
@@ -207,17 +207,7 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
 
-        load_json_data.call('false', 'test_json_load_case1', 'true', '', 
'json', '', '', '', '', '', 'simple_json.json')
-
-        sql "sync"
-        qt_select1 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table1.call(testTable)
-
-        load_json_data.call('true', 'test_json_load_case1_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json.json')
+        load_json_data.call('test_json_load_case1_2', 'true', '', 'json', '', 
'', '', '', '', 'simple_json.json')
 
         sql "sync"
         qt_select1 "select * from ${testTable} order by id"
@@ -232,17 +222,7 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
 
-        load_json_data.call('false', 'test_json_load_case2', 'true', '', 
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
-
-        sql "sync" 
-        qt_select2 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table1.call(testTable)
-
-        load_json_data.call('true', 'test_json_load_case2_2', 'true', '', 
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
+        load_json_data.call('test_json_load_case2_2', 'true', '', 'json', 'id= 
id * 10', '', '', '', '', 'simple_json.json')
 
         sql "sync" 
         qt_select2 "select * from ${testTable} order by id"
@@ -257,18 +237,7 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case3', 'true', '', 
'json', '', '[\"$.id\", \"$.code\"]',
-                            '', '', '', 'simple_json.json')
-
-        sql "sync"
-        qt_select3 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case3_2', 'true', '', 
'json', '', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case3_2', 'true', '', 'json', '', 
'[\"$.id\", \"$.code\"]',
                             '', '', '', 'simple_json.json')
 
         sql "sync"
@@ -284,18 +253,7 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case4', 'true', '', 
'json', 'code = id * 10 + 200', '[\"$.id\"]',
-                            '', '', '', 'simple_json.json')
-
-        sql "sync"
-        qt_select4 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case4_2', 'true', '', 
'json', 'code = id * 10 + 200', '[\"$.id\"]',
+        load_json_data.call('test_json_load_case4_2', 'true', '', 'json', 
'code = id * 10 + 200', '[\"$.id\"]',
                             '', '', '', 'simple_json.json')
 
         sql "sync"
@@ -311,18 +269,7 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case5', 'true', 'true', 
'json', '', '[\"$.id\", \"$.code\"]',
-                            '', '', '', 'multi_line_json.json')
-        
-        sql "sync"
-        qt_select5 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true', 
'json', '', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case5_2', 'true', 'true', 'json', 
'', '[\"$.id\", \"$.code\"]',
                             '', '', '', 'multi_line_json.json')
         
         sql "sync"
@@ -338,19 +285,7 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case6', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
-                            '', '', '', 'multi_line_json.json')
-
-        sql "sync"
-        qt_select6 "select * from ${testTable} order by id"
-
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case6_2', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', '', '', 'multi_line_json.json')
 
         sql "sync"
@@ -366,18 +301,7 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case7', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
-                            '', 'id > 50', '', 'multi_line_json.json')
-
-        sql "sync"
-        qt_select7 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case7_2', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', 'id > 50', '', 'multi_line_json.json')
 
         sql "sync"
@@ -393,19 +317,7 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case8', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
-                            '', 'id > 50', 'true', 'multi_line_json.json')
-
-        sql "sync"
-        qt_select8 "select * from ${testTable} order by id"
-
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table2.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case8_2', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', 'id > 50', 'true', 'multi_line_json.json')
 
         sql "sync"
@@ -421,18 +333,7 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case9', '', 'true', 
'json', 'id= id * 10', '',
-                            '$.item', '', 'true', 'nest_json.json')
-
-        sql "sync"
-        qt_select9 "select * from ${testTable} order by id"
-
-        // test new json reader
-         sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table1.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case9_2', '', 'true', 
'json', 'id= id * 10', '',
+        load_json_data.call('test_json_load_case9_2', '', 'true', 'json', 'id= 
id * 10', '',
                             '$.item', '', 'true', 'nest_json.json')
 
         sql "sync"
@@ -448,19 +349,7 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case10', '', 'true', 
'json', 'id= id * 10', '',
-                            '$.item', '', 'true', 'invalid_json.json', true)
-
-        sql "sync"
-        qt_select10 "select * from ${testTable} order by id"
-
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table1.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case10_2', '', 'true', 
'json', 'id= id * 10', '',
+        load_json_data.call('test_json_load_case10_2', '', 'true', 'json', 
'id= id * 10', '',
                             '$.item', '', 'true', 'invalid_json.json', true)
 
         sql "sync"
@@ -476,17 +365,7 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
 
-        load_json_data.call('false', 'test_json_load_case11', 'true', '', 
'json', '', '', '', '', '', 'simple_json2.json')
-
-        sql "sync"
-        qt_select11 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table1.call(testTable)
-
-        load_json_data.call('true', 'test_json_load_case11_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json2.json')
+        load_json_data.call('test_json_load_case11_2', 'true', '', 'json', '', 
'', '', '', '', 'simple_json2.json')
 
         sql "sync"
         qt_select11 "select * from ${testTable} order by id"
@@ -501,17 +380,7 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
 
-        load_json_data.call('false', 'test_json_load_case12', 'true', '', 
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
-
-        sql "sync"
-        qt_select12 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table1.call(testTable)
-
-        load_json_data.call('true', 'test_json_load_case12_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+        load_json_data.call('test_json_load_case12_2', 'true', '', 'json', '', 
'', '', '', '', 'simple_json2_lack_one_column.json')
 
         sql "sync"
         qt_select12 "select * from ${testTable} order by id"
@@ -553,38 +422,6 @@ suite("test_json_load", "p0") {
         sql "sync"
         qt_select13 "select * from ${testTable} order by id"
 
-
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        create_test_table3.call(testTable)
-        // load the json data
-        streamLoad {
-            table "${testTable}"
-            
-            // set http request header params
-            set 'strip_outer_array', "true"
-            set 'format', "json"
-            set 'max_filter_ratio', '1'
-            file "simple_json2_lack_one_column.json" // import json file
-            time 10000 // limit inflight 10s
-
-            // if declared a check callback, the default check condition will 
ignore.
-            // So you must check all condition
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows + json.NumberFilteredRows)
-                assertEquals(json.NumberFilteredRows, 4)
-                assertEquals(json.NumberLoadedRows, 6)
-                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
-            }
-        }
-        sql "sync"
-        qt_select13 "select * from ${testTable} order by id"
-
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
@@ -595,18 +432,7 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case14', '', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
-                            '$.item', '', 'true', 'nest_json.json')
-
-        sql "sync"
-        qt_select14 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-
-        create_test_table1.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case14_2', '', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('test_json_load_case14_2', '', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '$.item', '', 'true', 'nest_json.json')
 
         sql "sync"
@@ -622,18 +448,7 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case15', '', 'true', 
'json', 'id, code, city, id= id * 10',
-                            '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', 
'', 'true', 'nest_json.json')
-
-        sql "sync"
-        qt_select15 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table1.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case15_2', '', 'true', 
'json', 'id, code, city,id= id * 10',
+        load_json_data.call('test_json_load_case15_2', '', 'true', 'json', 
'id, code, city,id= id * 10',
                             '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', 
'', 'true', 'nest_json.json')
 
         sql "sync"
@@ -649,18 +464,7 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
         
-        load_json_data.call('false', 'test_json_load_case16', 'true', '', 
'json', 'id, code, city',
-                            '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', 
'', 'true', 'nest_json_array.json')
-
-        sql "sync"
-        qt_select16 "select * from ${testTable} order by id"
-
-        // test new json reader
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table1.call(testTable)
-        
-        load_json_data.call('true', 'test_json_load_case16_2', 'true', '', 
'json', 'id, code, city',
+        load_json_data.call('test_json_load_case16_2', 'true', '', 'json', 
'id, code, city',
                             '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', 
'', 'true', 'nest_json_array.json')
 
         sql "sync"
@@ -714,7 +518,7 @@ suite("test_json_load", "p0") {
             sql "DROP TABLE IF EXISTS ${testTable}"
 
             test_invalid_json_array_table.call(testTable)
-            load_json_data.call('false', 'test_json_load_case19', 'true', '', 
'json', '', '',
+            load_json_data.call('test_json_load_case19', 'true', '', 'json', 
'', '',
                     '', '', '', 'invalid_json_array.json', true)
 
             sql "sync"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to