wsjz commented on code in PR #11601:
URL: https://github.com/apache/doris/pull/11601#discussion_r945666374


##########
be/src/vec/exec/format/parquet/vparquet_column_reader.cpp:
##########
@@ -19,50 +19,82 @@
 
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
+#include <vec/columns/columns_number.h>
 
 #include "schema_desc.h"
 #include "vparquet_column_chunk_reader.h"
 
 namespace doris::vectorized {
 
-Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* 
field,
-                                const tparquet::ColumnChunk* chunk, const 
TypeDescriptor& col_type,
-                                int64_t chunk_size) {
-    // todo1: init column chunk reader
-    // BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
-    // _chunk_reader(&stream_reader, chunk, field);
-    // _chunk_reader.init();
-    return Status();
-}
-
-Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
-                                   const FieldSchema* field, const 
ParquetReadColumn& column,
-                                   const TypeDescriptor& col_type,
+Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+                                   const ParquetReadColumn& column,
                                    const tparquet::RowGroup& row_group,
-                                   const ParquetColumnReader* reader) {
+                                   std::vector<RowRange>& row_ranges,
+                                   std::unique_ptr<ParquetColumnReader>& 
reader) {
     if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
         return Status::Corruption("not supported type");
     }
     if (field->type.type == TYPE_ARRAY) {
         return Status::Corruption("not supported array type yet");
     } else {
+        VLOG_DEBUG << "field->physical_column_index: " << 
field->physical_column_index;
+        tparquet::ColumnChunk chunk = 
row_group.columns[field->physical_column_index];
         ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
-        RETURN_IF_ERROR(scalar_reader->init(file, field,
-                                            
&row_group.columns[field->physical_column_index],
-                                            col_type, chunk_size));
-        reader = scalar_reader;
+        scalar_reader->init_column_metadata(chunk);
+        RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
+        reader.reset(scalar_reader);
     }
     return Status::OK();
 }
 
-Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& 
row_group_meta,
-                                            ColumnPtr* data) {
-    // todo2: read data with chunk reader to load page data
-    // while (_chunk_reader.has_next) {
-    // _chunk_reader.next_page();
-    // _chunk_reader.load_page_data();
-    // }
-    return Status();
+void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& 
chunk) {
+    auto chunk_meta = chunk.meta_data;
+    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                                  ? chunk_meta.dictionary_page_offset
+                                  : chunk_meta.data_page_offset;
+    size_t chunk_len = chunk_meta.total_compressed_size;
+    _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, 
chunk_meta));
+}
+
+void ParquetColumnReader::_skipped_pages() {}
+
+Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
+                                std::vector<RowRange>& row_ranges) {
+    BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), 
_metadata->size());
+    _row_ranges.reset(&row_ranges);
+    _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
+    _chunk_reader->init();
+    return Status::OK();
+}
+
+Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
+                                            size_t batch_size, int64_t* 
read_rows, bool* eof) {
+    if (_chunk_reader->num_values() <= 0) {
+        // seek to next page header
+        _chunk_reader->next_page();
+        if (_row_ranges->size() != 0) {
+            _skipped_pages();
+        }
+        // load data to decoder
+        _chunk_reader->load_page_data();
+    }
+    size_t read_values =
+            _chunk_reader->num_values() < batch_size ? 
_chunk_reader->num_values() : batch_size;

Review Comment:
   default size of a block is `batch_size`, I think we should limit it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to