wgtmac commented on code in PR #169:
URL: https://github.com/apache/iceberg-cpp/pull/169#discussion_r2272214692


##########
src/iceberg/file_writer.h:
##########
@@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer {
   ///
   /// \return Status of write results.
   virtual Status Write(ArrowArray data) = 0;
+
+  /// \brief Get the file statistics.
+  virtual std::shared_ptr<Metrics> metrics() = 0;
+
+  /// \brief Get the file length.
+  virtual int64_t length() = 0;
+
+  /// \brief Get the file length.
+  /// Returns a list of recommended split locations, if applicable, null 
otherwise.
+  /// When available, this information is used for planning scan tasks whose 
boundaries
+  /// are determined by these offsets. The returned list must be sorted in 
ascending order
+  /// Only valid after the file is closed.

Review Comment:
   ```suggestion
     /// \brief Returns a list of recommended split locations, if applicable, 
empty otherwise.
     /// When available, this information is used for planning scan tasks whose 
boundaries
     /// are determined by these offsets. The returned list must be sorted in 
ascending order.
     /// Only valid after the file is closed.
   ```



##########
src/iceberg/avro/avro_writer.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/avro/avro_writer.h"
+
+#include <memory>
+
+#include <arrow/array/builder_base.h>
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <avro/DataFile.hh>
+#include <avro/GenericDatum.hh>
+#include <avro/NodeImpl.hh>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/avro/avro_stream_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::avro {
+
+namespace {
+
+Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const 
WriterOptions& options,
+                                                             int64_t 
buffer_size) {
+  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+  auto result = io->fs()->OpenOutputStream(options.path);
+  if (!result.ok()) {
+    return IOError("Failed to open file {} for {}", options.path,
+                   result.status().message());
+  }
+  return std::make_unique<AvroOutputStream>(result.MoveValueUnsafe(), 
buffer_size);
+}
+
+}  // namespace
+
+// A stateful context to keep track of the writing progress.
+struct WriteContext {};
+
+class AvroWriter::Impl {
+ public:
+  Status Open(const WriterOptions& options) {
+    write_schema_ = options.schema;
+
+    auto root = std::make_shared<::avro::NodeRecord>();
+    ToAvroNodeVisitor visitor;
+    for (const auto& field : write_schema_->fields()) {
+      ::avro::NodePtr node;
+      ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node));
+      root->addLeaf(node);
+    }

Review Comment:
   ```suggestion
       ::avro::NodePtr root;
       ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, 
&root));
   ```



##########
src/iceberg/avro/avro_writer.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/avro/avro_writer.h"
+
+#include <memory>
+
+#include <arrow/array/builder_base.h>
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <avro/DataFile.hh>
+#include <avro/GenericDatum.hh>
+#include <avro/NodeImpl.hh>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/avro/avro_stream_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::avro {
+
+namespace {
+
+Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const 
WriterOptions& options,
+                                                             int64_t 
buffer_size) {
+  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+  auto result = io->fs()->OpenOutputStream(options.path);
+  if (!result.ok()) {
+    return IOError("Failed to open file {} for {}", options.path,
+                   result.status().message());
+  }
+  return std::make_unique<AvroOutputStream>(result.MoveValueUnsafe(), 
buffer_size);
+}
+
+}  // namespace
+
+// A stateful context to keep track of the writing progress.
+struct WriteContext {};

Review Comment:
   Do we really need a context? Writer is much simpler than the reader impl.



##########
src/iceberg/file_writer.h:
##########
@@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer {
   ///
   /// \return Status of write results.
   virtual Status Write(ArrowArray data) = 0;
+
+  /// \brief Get the file statistics.
+  virtual std::shared_ptr<Metrics> metrics() = 0;
+
+  /// \brief Get the file length.

Review Comment:
   ```suggestion
     /// \brief Get the file length.
     /// Only valid after the file is closed.
   ```



##########
src/iceberg/manifest_writer.h:
##########
@@ -35,9 +35,16 @@ namespace iceberg {
 class ICEBERG_EXPORT ManifestWriter {
  public:
   virtual ~ManifestWriter() = default;
+
+  /// \brief Write manifest entries to file
+  /// \param entries List of manifest entries to write.
+  /// \return Status::OK() if all entries were written successfully
   virtual Status WriteManifestEntries(
       const std::vector<ManifestEntry>& entries) const = 0;
 
+  /// \brief Close writer and flush to storage.
+  virtual void Close() = 0;

Review Comment:
   ```suggestion
     virtual Status Close() = 0;
   ```



##########
src/iceberg/metrics.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/metrics.h
+
+#include <unordered_map>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+/// \brief Iceberg file format metrics
+class ICEBERG_EXPORT Metrics {
+ public:
+  Metrics() = default;
+
+  explicit Metrics(int64_t row_count,
+                   std::unordered_map<int64_t, int64_t> column_sizes = {},
+                   std::unordered_map<int64_t, int64_t> value_counts = {},
+                   std::unordered_map<int64_t, int64_t> null_value_counts = {},
+                   std::unordered_map<int64_t, int64_t> nan_value_counts = {},
+                   std::unordered_map<int64_t, Literal> lower_bounds = {},
+                   std::unordered_map<int64_t, Literal> upper_bounds = {})
+      : row_count_(row_count),
+        column_sizes_(std::move(column_sizes)),
+        value_counts_(std::move(value_counts)),
+        null_value_counts_(std::move(null_value_counts)),
+        nan_value_counts_(std::move(nan_value_counts)),
+        lower_bounds_(std::move(lower_bounds)),
+        upper_bounds_(std::move(upper_bounds)) {}
+
+ private:
+  int64_t row_count_ = 0;
+  std::unordered_map<int64_t, int64_t> column_sizes_;
+  std::unordered_map<int64_t, int64_t> value_counts_;
+  std::unordered_map<int64_t, int64_t> null_value_counts_;
+  std::unordered_map<int64_t, int64_t> nan_value_counts_;
+  std::unordered_map<int64_t, Literal> lower_bounds_;
+  std::unordered_map<int64_t, Literal> upper_bounds_;
+};

Review Comment:
   ```suggestion
   struct ICEBERG_EXPORT Metrics {
     int64_t row_count = 0;
     std::unordered_map<int64_t, int64_t> column_sizes;
     std::unordered_map<int64_t, int64_t> value_counts;
     std::unordered_map<int64_t, int64_t> null_value_counts;
     std::unordered_map<int64_t, int64_t> nan_value_counts;
     std::unordered_map<int64_t, Literal> lower_bounds;
     std::unordered_map<int64_t, Literal> upper_bounds;
   };
   ```
   
   What about making it a simple struct to enable aggregate initialization?



##########
src/iceberg/file_writer.h:
##########
@@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer {
   ///
   /// \return Status of write results.
   virtual Status Write(ArrowArray data) = 0;
+
+  /// \brief Get the file statistics.
+  virtual std::shared_ptr<Metrics> metrics() = 0;

Review Comment:
   ```suggestion
     virtual Metrics metrics() = 0;
   ```
   
   Perhaps we can just return a simple struct instead of a shared_ptr?



##########
src/iceberg/file_writer.h:
##########
@@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer {
   ///
   /// \return Status of write results.
   virtual Status Write(ArrowArray data) = 0;
+
+  /// \brief Get the file statistics.

Review Comment:
   ```suggestion
     /// \brief Get the file statistics.
     /// Only valid after the file is closed.
   ```



##########
src/iceberg/metrics.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/metrics.h

Review Comment:
   Add some comment here? e.g. `Iceberg file format metrics`



##########
src/iceberg/file_writer.h:
##########
@@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer {
   ///
   /// \return Status of write results.
   virtual Status Write(ArrowArray data) = 0;
+
+  /// \brief Get the file statistics.
+  virtual std::shared_ptr<Metrics> metrics() = 0;
+
+  /// \brief Get the file length.
+  virtual int64_t length() = 0;
+
+  /// \brief Get the file length.
+  /// Returns a list of recommended split locations, if applicable, null 
otherwise.
+  /// When available, this information is used for planning scan tasks whose 
boundaries
+  /// are determined by these offsets. The returned list must be sorted in 
ascending order
+  /// Only valid after the file is closed.
+  virtual std::vector<int64_t> splitOffsets() = 0;

Review Comment:
   ```suggestion
     virtual std::vector<int64_t> split_offsets() = 0;
   ```



##########
src/iceberg/manifest_writer.h:
##########
@@ -51,8 +58,15 @@ class ICEBERG_EXPORT ManifestWriter {
 class ICEBERG_EXPORT ManifestListWriter {
  public:
   virtual ~ManifestListWriter() = default;
+
+  /// \brief Write manifest file list to mainifest list file.
+  /// \param files List of manifest files to write.
+  /// \return Status::OK() if all files were written successfully
   virtual Status WriteManifestFiles(const std::vector<ManifestFile>& files) 
const = 0;
 
+  /// \brief Close writer and flush to storage.
+  virtual void Close() = 0;

Review Comment:
   ```suggestion
     virtual Status Close() = 0;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to