This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac5228c05 [feature-wip][multi-catalog]Support prefetch for orc file 
format (#11292)
0ac5228c05 is described below

commit 0ac5228c0535fa42efed5ec4e1a1850c5e753da0
Author: huangzhaowei <huangzhaowei....@bytedance.com>
AuthorDate: Tue Aug 2 11:01:15 2022 +0800

    [feature-wip][multi-catalog]Support prefetch for orc file format (#11292)
    
    Refactor the prefetch code in parquet and support prefetch for orc file 
format
---
 be/src/exec/arrow/arrow_reader.cpp   | 61 +++++++++++++++++++++++
 be/src/exec/arrow/arrow_reader.h     | 16 +++++-
 be/src/exec/arrow/orc_reader.cpp     | 40 +++++++--------
 be/src/exec/arrow/orc_reader.h       |  3 +-
 be/src/exec/arrow/parquet_reader.cpp | 94 ++++++++----------------------------
 be/src/exec/arrow/parquet_reader.h   | 15 ++----
 6 files changed, 120 insertions(+), 109 deletions(-)

diff --git a/be/src/exec/arrow/arrow_reader.cpp 
b/be/src/exec/arrow/arrow_reader.cpp
index 9d20697148..5d1785f744 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -48,6 +48,11 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, 
int64_t batch_size,
 
 ArrowReaderWrap::~ArrowReaderWrap() {
     close();
+    _closed = true;
+    _queue_writer_cond.notify_one();
+    if (_thread.joinable()) {
+        _thread.join();
+    }
 }
 
 void ArrowReaderWrap::close() {
@@ -76,6 +81,62 @@ Status ArrowReaderWrap::column_indices(const 
std::vector<SlotDescriptor*>& tuple
     return Status::OK();
 }
 
+Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, 
bool* eof) {
+    std::unique_lock<std::mutex> lock(_mtx);
+    while (!_closed && _queue.empty()) {
+        if (_batch_eof) {
+            _include_column_ids.clear();
+            *eof = true;
+            _batch_eof = false;
+            return Status::OK();
+        }
+        _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
+    }
+    if (UNLIKELY(_closed)) {
+        return Status::InternalError(_status.message());
+    }
+    *batch = _queue.front();
+    _queue.pop_front();
+    _queue_writer_cond.notify_one();
+    return Status::OK();
+}
+
+void ArrowReaderWrap::prefetch_batch() {
+    auto insert_batch = [this](const auto& batch) {
+        std::unique_lock<std::mutex> lock(_mtx);
+        while (!_closed && _queue.size() == _max_queue_size) {
+            _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
+        }
+        if (UNLIKELY(_closed)) {
+            return;
+        }
+        _queue.push_back(batch);
+        _queue_reader_cond.notify_one();
+    };
+    int current_group = _current_group;
+    int total_groups = _total_groups;
+    while (true) {
+        if (_closed || current_group >= total_groups) {
+            _batch_eof = true;
+            _queue_reader_cond.notify_one();
+            return;
+        }
+        if (filter_row_group(current_group)) {
+            current_group++;
+            continue;
+        }
+
+        arrow::RecordBatchVector batches;
+        read_batches(batches, current_group);
+        if (!_status.ok()) {
+            _closed = true;
+            return;
+        }
+        std::for_each(batches.begin(), batches.end(), insert_batch);
+        current_group++;
+    }
+}
+
 ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
 
 ArrowFile::~ArrowFile() {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 1389426fc9..704ca0750e 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -92,13 +92,17 @@ public:
         return Status::NotSupported("Not Implemented read");
     }
     // for vec
-    virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, 
bool* eof) = 0;
+    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
     std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
     virtual Status size(int64_t* size) { return Status::NotSupported("Not 
Implemented size"); }
 
+    void prefetch_batch();
+
 protected:
     virtual Status column_indices(const std::vector<SlotDescriptor*>& 
tuple_slot_descs);
+    virtual void read_batches(arrow::RecordBatchVector& batches, int 
current_group) = 0;
+    virtual bool filter_row_group(int current_group) = 0;
 
 protected:
     const int64_t _batch_size;
@@ -110,6 +114,16 @@ protected:
     std::map<std::string, int> _map_column; // column-name <---> column-index
     std::vector<int> _include_column_ids;   // columns that need to get from 
file
     std::shared_ptr<Statistics> _statistics;
+
+    std::atomic<bool> _closed = false;
+    std::atomic<bool> _batch_eof = false;
+    arrow::Status _status;
+    std::mutex _mtx;
+    std::condition_variable _queue_reader_cond;
+    std::condition_variable _queue_writer_cond;
+    std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+    const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
+    std::thread _thread;
 };
 
 } // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 78d51d8376..0db5640369 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -71,11 +71,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
     }
     RETURN_IF_ERROR(column_indices(tuple_slot_descs));
 
-    bool eof = false;
-    RETURN_IF_ERROR(_next_stripe_reader(&eof));
-    if (eof) {
-        return Status::EndOfFile("end of file");
-    }
+    _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
 
     return Status::OK();
 }
@@ -143,23 +139,23 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
     return Status::OK();
 }
 
-Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, 
bool* eof) {
-    *eof = false;
-    do {
-        auto st = _rb_reader->ReadNext(batch);
-        if (!st.ok()) {
-            LOG(WARNING) << "failed to get next batch, errmsg=" << st;
-            return Status::InternalError(st.ToString());
-        }
-        if (*batch == nullptr) {
-            // try next stripe
-            RETURN_IF_ERROR(_next_stripe_reader(eof));
-            if (*eof) {
-                break;
-            }
-        }
-    } while (*batch == nullptr);
-    return Status::OK();
+void ORCReaderWrap::read_batches(arrow::RecordBatchVector& batches, int 
current_group) {
+    bool eof = false;
+    Status status = _next_stripe_reader(&eof);
+    if (!status.ok()) {
+        _closed = true;
+        return;
+    }
+    if (eof) {
+        _closed = true;
+        return;
+    }
+
+    _status = _rb_reader->ReadAll(&batches);
+}
+
+bool ORCReaderWrap::filter_row_group(int current_group) {
+    return false;
 }
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index fe8c5c54a4..1e6f0f83e6 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -40,11 +40,12 @@ public:
                        const std::vector<SlotDescriptor*>& tuple_slot_descs,
                        const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
-    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) 
override;
 
 private:
     Status _next_stripe_reader(bool* eof);
     Status _seek_start_stripe();
+    void read_batches(arrow::RecordBatchVector& batches, int current_group) 
override;
+    bool filter_row_group(int current_group) override;
 
 private:
     // orc file reader object
diff --git a/be/src/exec/arrow/parquet_reader.cpp 
b/be/src/exec/arrow/parquet_reader.cpp
index c0f45f52af..8d119146b4 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -46,14 +46,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* 
file_reader, int64_t batch_size
           _range_start_offset(range_start_offset),
           _range_size(range_size) {}
 
-ParquetReaderWrap::~ParquetReaderWrap() {
-    _closed = true;
-    _queue_writer_cond.notify_one();
-    if (_thread.joinable()) {
-        _thread.join();
-    }
-}
-
 Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
                                       const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
                                       const std::vector<ExprContext*>& 
conjunct_ctxs,
@@ -111,7 +103,7 @@ Status ParquetReaderWrap::init_reader(const 
TupleDescriptor* tuple_desc,
             _row_group_reader->init_filter_groups(tuple_desc, _map_column, 
_include_column_ids,
                                                   file_size);
         }
-        _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
+        _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
         return Status::OK();
     } catch (parquet::ParquetException& e) {
         std::stringstream str_error;
@@ -184,26 +176,6 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) {
     return Status::OK();
 }
 
-Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* 
batch, bool* eof) {
-    std::unique_lock<std::mutex> lock(_mtx);
-    while (!_closed && _queue.empty()) {
-        if (_batch_eof) {
-            _include_column_ids.clear();
-            *eof = true;
-            _batch_eof = false;
-            return Status::OK();
-        }
-        _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
-    }
-    if (UNLIKELY(_closed)) {
-        return Status::InternalError(_status.message());
-    }
-    *batch = _queue.front();
-    _queue.pop_front();
-    _queue_writer_cond.notify_one();
-    return Status::OK();
-}
-
 Status ParquetReaderWrap::handle_timestamp(const 
std::shared_ptr<arrow::TimestampArray>& ts_array,
                                            uint8_t* buf, int32_t* wbytes) {
     const auto type = 
std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
@@ -546,50 +518,6 @@ Status ParquetReaderWrap::read(Tuple* tuple, const 
std::vector<SlotDescriptor*>&
     return read_record_batch(eof);
 }
 
-void ParquetReaderWrap::prefetch_batch() {
-    auto insert_batch = [this](const auto& batch) {
-        std::unique_lock<std::mutex> lock(_mtx);
-        while (!_closed && _queue.size() == _max_queue_size) {
-            _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
-        }
-        if (UNLIKELY(_closed)) {
-            return;
-        }
-        _queue.push_back(batch);
-        _queue_reader_cond.notify_one();
-    };
-    int current_group = 0;
-    int total_groups = _total_groups;
-    while (true) {
-        if (_closed || current_group >= total_groups) {
-            _batch_eof = true;
-            _queue_reader_cond.notify_one();
-            return;
-        }
-        if (config::parquet_predicate_push_down) {
-            auto filter_group_set = _row_group_reader->filter_groups();
-            if (filter_group_set.end() != 
filter_group_set.find(current_group)) {
-                // find filter group, skip
-                current_group++;
-                continue;
-            }
-        }
-        _status = _reader->GetRecordBatchReader({current_group}, 
_include_column_ids, &_rb_reader);
-        if (!_status.ok()) {
-            _closed = true;
-            return;
-        }
-        arrow::RecordBatchVector batches;
-        _status = _rb_reader->ReadAll(&batches);
-        if (!_status.ok()) {
-            _closed = true;
-            return;
-        }
-        std::for_each(batches.begin(), batches.end(), insert_batch);
-        current_group++;
-    }
-}
-
 Status ParquetReaderWrap::read_next_batch() {
     std::unique_lock<std::mutex> lock(_mtx);
     while (!_closed && _queue.empty()) {
@@ -609,4 +537,24 @@ Status ParquetReaderWrap::read_next_batch() {
     return Status::OK();
 }
 
+void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int 
current_group) {
+    _status = _reader->GetRecordBatchReader({current_group}, 
_include_column_ids, &_rb_reader);
+    if (!_status.ok()) {
+        _closed = true;
+        return;
+    }
+    _status = _rb_reader->ReadAll(&batches);
+}
+
+bool ParquetReaderWrap::filter_row_group(int current_group) {
+    if (config::parquet_predicate_push_down) {
+        auto filter_group_set = _row_group_reader->filter_groups();
+        if (filter_group_set.end() != filter_group_set.find(current_group)) {
+            // find filter group, skip
+            return true;
+        }
+    }
+    return false;
+}
+
 } // namespace doris
diff --git a/be/src/exec/arrow/parquet_reader.h 
b/be/src/exec/arrow/parquet_reader.h
index 3bf4cf4814..95774f60b0 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -64,7 +64,7 @@ public:
     // batch_size is not use here
     ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
                       int64_t range_start_offset, int64_t range_size);
-    ~ParquetReaderWrap() override;
+    ~ParquetReaderWrap() override = default;
 
     // Read
     Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
@@ -75,7 +75,6 @@ public:
                        const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
     Status init_parquet_type();
-    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) 
override;
 
 private:
     void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, 
const uint8_t* value,
@@ -86,8 +85,9 @@ private:
                             int32_t* wbtyes);
 
 private:
-    void prefetch_batch();
     Status read_next_batch();
+    void read_batches(arrow::RecordBatchVector& batches, int current_group) 
override;
+    bool filter_row_group(int current_group) override;
 
 private:
     // parquet file reader object
@@ -104,16 +104,7 @@ private:
     int64_t _range_size;
 
 private:
-    std::atomic<bool> _closed = false;
-    std::atomic<bool> _batch_eof = false;
-    arrow::Status _status;
-    std::mutex _mtx;
-    std::condition_variable _queue_reader_cond;
-    std::condition_variable _queue_writer_cond;
-    std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
     std::unique_ptr<doris::RowGroupReader> _row_group_reader;
-    const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
-    std::thread _thread;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to