lidavidm commented on code in PR #154:
URL: https://github.com/apache/iceberg-cpp/pull/154#discussion_r2230887735
##########
cmake_modules/IcebergThirdpartyToolchain.cmake:
##########
@@ -65,8 +65,9 @@ function(resolve_arrow_dependency)
set(ARROW_BUILD_STATIC
ON
CACHE BOOL "" FORCE)
+ # Workaround undefined symbol:
arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
Review Comment:
Ah, parquet-cpp can embed Arrow schemas in the Parquet metadata, so we need
to be able to read IPC schemas. Possibly upstream should enable/require
ARROW_IPC when Parquet is being built.
##########
src/iceberg/parquet/parquet_data_util_internal.h:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "iceberg/schema_util.h"
+
+namespace arrow {
+class RecordBatch;
+class Schema;
+} // namespace arrow
+
+namespace iceberg::parquet {
+
+/// \brief Convert record batch read from a Parquet file to projected Iceberg
Schema.
+///
+/// \param record_batch The record batch to convert.
+/// \param output_arrow_schema The Arrow schema to convert to.
+/// \param projected_schema The projected Iceberg schema.
+/// \param projection The projection from projected Iceberg schema to the
record batch.
+/// \return The converted record batch.
+Result<std::shared_ptr<::arrow::RecordBatch>> ConvertRecordBatch(
Review Comment:
nit, but if the job is to project a record batch, maybe just call it
`ProjectRecordBatch`?
##########
src/iceberg/parquet/parquet_reader.cc:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_reader.h"
+
+#include <memory>
+
+#include <arrow/c/bridge.h>
+#include <arrow/memory_pool.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <arrow/type.h>
+#include <iceberg/result.h>
+#include <iceberg/schema_util.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/schema.h>
+#include <parquet/file_reader.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_fs_file_io.h"
+#include "iceberg/parquet/parquet_data_util_internal.h"
+#include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream(
+ const ReaderOptions& options) {
+ ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
+ if (options.length) {
+ file_info.set_size(options.length.value());
+ }
+
+ auto io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+ auto result = io->fs()->OpenInputFile(file_info);
+ if (!result.ok()) {
+ return IOError("Failed to open file {} for reading: {}", options.path,
+ result.status().message());
+ }
+
+ return result.MoveValueUnsafe();
+}
+
+Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader,
+ const Schema& read_schema) {
+ auto metadata = reader->parquet_reader()->metadata();
+
+ ICEBERG_ASSIGN_OR_RAISE(auto has_field_ids,
+ HasFieldIds(metadata->schema()->schema_root()));
+ if (!has_field_ids) {
+ // TODO(gangwu): apply name mapping to Parquet schema
+ return NotImplemented("Applying name mapping to Parquet schema is not
implemented");
+ }
+
+ ::parquet::arrow::SchemaManifest schema_manifest;
+ auto schema_manifest_result = ::parquet::arrow::SchemaManifest::Make(
+ metadata->schema(), metadata->key_value_metadata(), reader->properties(),
+ &schema_manifest);
+ if (!schema_manifest_result.ok()) {
+ return ParquetError("Failed to make schema manifest: {}",
+ schema_manifest_result.message());
+ }
+
+ // Leverage SchemaManifest to project the schema
+ ICEBERG_ASSIGN_OR_RAISE(auto projection, Project(read_schema,
schema_manifest));
+
+ return projection;
+}
+
+class EmptyRecordBatchReader : public ::arrow::RecordBatchReader {
+ public:
+ EmptyRecordBatchReader() = default;
+ ~EmptyRecordBatchReader() override = default;
+
+ std::shared_ptr<::arrow::Schema> schema() const override { return nullptr; }
+
+ ::arrow::Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* batch)
override {
+ batch = nullptr;
+ return ::arrow::Status::OK();
+ }
+};
+
+} // namespace
+
+// A stateful context to keep track of the reading progress.
+struct ReadContext {
+ // The arrow schema to output record batches. It may be different with
+ // the schema of record batches returned by `record_batch_reader_`
+ // when there is any schema evolution.
+ std::shared_ptr<::arrow::Schema> output_arrow_schema_;
+ // The reader to read record batches from the Parquet file.
+ std::unique_ptr<::arrow::RecordBatchReader> record_batch_reader_;
+};
+
+// TODO(gangwu): list of work items
+// 1. Make the memory pool configurable
+// 2. Catch ParquetException and convert to Status/Result
+// 3. Add utility to convert Arrow Status/Result to Iceberg Status/Result
+// 4. Check field ids and apply name mapping if needed
+class ParquetReader::Impl {
+ public:
+ // Open the Parquet reader with the given options
+ Status Open(const ReaderOptions& options) {
+ if (options.projection == nullptr) {
+ return InvalidArgument("Projected schema is required by Parquet reader");
+ }
+
+ split_ = options.split;
+ read_schema_ = options.projection;
+
+ // TODO(gangwu): make memory pool configurable
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
+
+ // Prepare reader properties
+ ::parquet::ReaderProperties reader_properties(pool);
+ ::parquet::ArrowReaderProperties arrow_reader_properties;
+ arrow_reader_properties.set_batch_size(options.batch_size);
+ arrow_reader_properties.set_arrow_extensions_enabled(true);
+
+ // Open the Parquet file reader
+ ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options));
+ auto file_reader =
+ ::parquet::ParquetFileReader::Open(std::move(input_stream),
reader_properties);
+ auto make_reader_result = ::parquet::arrow::FileReader::Make(
+ pool, std::move(file_reader), arrow_reader_properties, &reader_);
+ if (!make_reader_result.ok()) {
+ return ParquetError("Failed to make file reader: {}",
make_reader_result.message());
+ }
+
+ // Project read schema onto the Parquet file schema
+ ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(),
*read_schema_));
+
+ return {};
+ }
+
+ // Read the next batch of data
+ Result<std::optional<ArrowArray>> Next() {
+ if (!context_) {
+ ICEBERG_RETURN_UNEXPECTED(InitReadContext());
+ }
+
+ auto next_result = context_->record_batch_reader_->Next();
+ if (!next_result.ok()) {
+ return ParquetError("Failed to read next batch: {}",
+ next_result.status().message());
+ }
+
+ auto batch = next_result.MoveValueUnsafe();
+ if (!batch) {
+ return std::nullopt;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ batch, ConvertRecordBatch(std::move(batch),
context_->output_arrow_schema_,
+ *read_schema_, projection_));
+
+ ArrowArray arrow_array;
+ auto export_result = ::arrow::ExportRecordBatch(*batch, &arrow_array);
+ if (!export_result.ok()) {
+ return ParquetError("Failed to export the Arrow record batch: {}",
+ export_result.message());
+ }
+ return arrow_array;
+ }
+
+ // Close the reader and release resources
+ Status Close() {
+ if (reader_ == nullptr) {
+ return {}; // Already closed
+ }
+
+ if (context_ != nullptr) {
+ auto close_result = context_->record_batch_reader_->Close();
+ if (!close_result.ok()) {
+ return ParquetError("Failed to close record batch reader: {}",
+ close_result.message());
+ }
+ context_.reset();
+ }
+
+ reader_.reset();
+ return {};
+ }
+
+ // Get the schema of the data
+ Result<ArrowSchema> Schema() {
+ if (!context_) {
+ ICEBERG_RETURN_UNEXPECTED(InitReadContext());
+ }
+
+ ArrowSchema arrow_schema;
+ auto export_result =
+ ::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema);
+ if (!export_result.ok()) {
+ return ParquetError("Failed to export Arrow schema: {}",
export_result.message());
Review Comment:
Is this really a ParquetError? (Ditto for ExportRecordBatch above)
##########
src/iceberg/result.h:
##########
@@ -93,6 +94,7 @@ DEFINE_ERROR_FUNCTION(NotAllowed)
DEFINE_ERROR_FUNCTION(NotFound)
DEFINE_ERROR_FUNCTION(NotImplemented)
DEFINE_ERROR_FUNCTION(NotSupported)
+DEFINE_ERROR_FUNCTION(ParquetError)
Review Comment:
Hmm, Parquet itself has some exception classes too; would it be possible to
translate errors into IOError or some other error instead of requiring a
Parquet-specific class?
That said there's also InvalidArrowData. Should we call this
InvalidParquetData to match? (Maybe that's too specific, but perhaps
InvalidParquetData + IOError covers everything?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]