lidavidm commented on code in PR #30:
URL: https://github.com/apache/iceberg-cpp/pull/30#discussion_r2009380801


##########
src/iceberg/file_io.h:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+// Forward declarations
+class Reader;
+class Writer;
+
+/// \brief An interface used to read input files using Reader and AsyncReader
+class ICEBERG_EXPORT InputFile {
+ public:
+  explicit InputFile(std::string location) : location_(std::move(location)) {}
+
+  virtual ~InputFile() = default;
+
+  /// \brief Checks whether the file exists.
+  virtual bool exists() const = 0;

Review Comment:
   nit: I think Google style is UpperCamelCase for methods, snake_case for 
trivial accessors



##########
src/iceberg/io/fs_file_io.cc:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/io/fs_file_io.h"
+
+#include <fcntl.h>
+
+#include <cassert>
+#include <filesystem>
+#include <format>
+
+#include <sys/stat.h>
+
+#include "iceberg/exception.h"
+#include "iceberg/io/fs_file_reader.h"
+#include "iceberg/io/fs_file_writer.h"
+
+namespace iceberg::io {
+
+bool FsInputFile::exists() const { return std::filesystem::exists(location()); 
}
+
+int64_t FsInputFile::getLength() const {
+  struct stat stat_buffer;
+  if (stat(location().c_str(), &stat_buffer) != 0) {
+    throw IcebergError(std::format(
+        "Failed to get file length. File does not exist or is inaccessible: 
{}",
+        location_));
+  }
+  return stat_buffer.st_size;
+}
+
+std::unique_ptr<Reader> FsInputFile::newReader() {
+  return std::make_unique<FsFileReader>(location_);
+}
+
+void FsOutputFile::create() {
+  // Check if the file already exists
+  std::ifstream existing_file(location_);
+  bool file_exists = existing_file.good();
+  existing_file.close();
+
+  if (file_exists) {
+    throw IcebergError(std::format("File already exists: {}", location_));
+  }
+
+  // Create or overwrite the file by opening it in truncating mode
+  std::ofstream new_file(location_, std::ios::binary | std::ios::out | 
std::ios::trunc);
+  if (!new_file.is_open()) {
+    throw IcebergError(std::format("Failed to create or overwrite file: {}", 
location_));
+  }
+  new_file.close();
+}
+
+std::unique_ptr<Writer> FsOutputFile::newWriter() {
+  return std::make_unique<FsFileWriter>(location_);
+}
+
+std::shared_ptr<InputFile> FsFileIO::newInputFile(const std::string& location) 
{
+  // Check if the file exists
+  if (!fileExists(location)) {
+    throw IcebergError(std::format("InputFile does not exist: {}", location));
+  }
+
+  // Create and return an FsInputFile instance
+  return std::make_shared<FsInputFile>(location);
+}
+
+std::shared_ptr<OutputFile> FsFileIO::newOutputFile(const std::string& 
location) {
+  return std::make_shared<FsOutputFile>(location);
+}
+
+void FsFileIO::DeleteFile(const std::string& location) {
+  // Check if the file exists
+  if (!fileExists(location)) {
+    throw IcebergError(std::format("InputFile does not exist: {}", location));
+  }

Review Comment:
   Is there a need to do this? Seems like a TOC/TOU error - can't we check `ec` 
to determine if a failure to delete was because the file existed or not?



##########
src/iceberg/io/fs_file_reader.cc:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/io/fs_file_reader.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+
+namespace iceberg::io {
+
+FsFileReader::FsFileReader(std::string file_path) : 
file_path_(std::move(file_path)) {
+  // Open the file in binary mode
+  input_file_.open(file_path_, std::ios::binary | std::ios::in);
+  if (!input_file_.is_open()) {
+    throw IcebergError(std::format("Failed to open file: {}", file_path_));
+  }
+
+  // Calculate the file size
+  input_file_.seekg(0, std::ios::end);
+  file_size_ = input_file_.tellg();
+  input_file_.seekg(0, std::ios::beg);
+
+  if (file_size_ < 0) {
+    throw IcebergError(std::format("Failed to determine file size: {}", 
file_path_));
+  }
+}
+
+FsFileReader::~FsFileReader() {
+  if (input_file_.is_open()) {
+    input_file_.close();
+  }
+}
+
+int64_t FsFileReader::read(ReadRange range, void* buffer) {
+  if (!input_file_.is_open()) {
+    throw IcebergError("File is not open for reading");
+  }
+
+  if (range.offset < 0 || range.offset + range.length > file_size_) {
+    throw IcebergError(std::format("Invalid read range: [{}, {})", 
range.offset,
+                                   range.offset + range.length));
+  }
+
+  // Seek to the starting position
+  input_file_.seekg(range.offset, std::ios::beg);
+  if (input_file_.fail()) {
+    throw IcebergError(std::format("Failed to seek to offset: {}", 
range.offset));
+  }
+
+  // Read the data into the buffer
+  input_file_.read(static_cast<char*>(buffer), range.length);
+  auto bytes_read = static_cast<int64_t>(input_file_.gcount());
+
+  return bytes_read;  // Return actual bytes read
+}
+
+std::future<int64_t> FsFileReader::readAsync(ReadRange range, void* buffer) {

Review Comment:
   This API probably needs a way to specify a thread pool too? Maybe also some 
way to tag the I/O task? (I'm thinking needs like being able to prioritize I/O 
for different files)



##########
src/iceberg/io/fs_file_writer.cc:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/io/fs_file_writer.h"
+
+#include "iceberg/exception.h"
+
+namespace iceberg::io {
+
+FsFileWriter::FsFileWriter(std::string file_path) : 
file_path_(std::move(file_path)) {
+  // Open the file in binary write mode, truncating any existing file
+  output_file_.open(file_path_, std::ios::binary | std::ios::out | 
std::ios::trunc);
+  if (!output_file_.is_open()) {
+    throw IcebergError(std::format("Failed to open file for writing: {}", 
file_path_));
+  }
+  // Calculate the file size after opening the file
+  output_file_.seekp(0, std::ios::end);
+  file_size_ = output_file_.tellp();
+}
+
+FsFileWriter::~FsFileWriter() {
+  if (output_file_.is_open()) {
+    output_file_.close();
+  }
+}
+
+int64_t FsFileWriter::write(int64_t offset, const void* buffer, int64_t 
length) {
+  if (!output_file_.is_open()) {
+    throw IcebergError(std::format("File is not open for writing: {}", 
file_path_));
+  }
+
+  if (offset < 0) {
+    throw IcebergError(
+        std::format("Invalid write range. Offset must be non-negative: {}", 
offset));
+  }
+
+  // Seek the position to write
+  output_file_.seekp(offset, std::ios::beg);
+  if (output_file_.fail()) {
+    throw IcebergError(std::format("Failed to seek to offset: {}", offset));
+  }
+
+  // Write data to the file
+  output_file_.write(static_cast<const char*>(buffer), length);
+  if (output_file_.fail()) {
+    throw IcebergError("Failed to write data to file.");
+  }
+
+  // Update the file size based on the last written position
+  file_size_ = std::max(file_size_, offset + length);
+
+  return length;  // Return number of bytes successfully written
+}
+
+std::future<int64_t> FsFileWriter::writeAsync(int64_t offset, const void* 
buffer,
+                                              int64_t length) {
+  return std::async(std::launch::async, [this, offset, buffer, length]() {
+    return this->write(offset, buffer, length);
+  });
+}
+
+void FsFileWriter::flush() {

Review Comment:
   Do we need an async flush?



##########
src/iceberg/file_io.h:
##########


Review Comment:
   Do we need a way to go back from an input/output file to the corresponding 
FileIO?



##########
src/iceberg/io/fs_file_io.cc:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/io/fs_file_io.h"
+
+#include <fcntl.h>
+
+#include <cassert>
+#include <filesystem>
+#include <format>
+
+#include <sys/stat.h>
+
+#include "iceberg/exception.h"
+#include "iceberg/io/fs_file_reader.h"
+#include "iceberg/io/fs_file_writer.h"
+
+namespace iceberg::io {
+
+bool FsInputFile::exists() const { return std::filesystem::exists(location()); 
}
+
+int64_t FsInputFile::getLength() const {
+  struct stat stat_buffer;
+  if (stat(location().c_str(), &stat_buffer) != 0) {
+    throw IcebergError(std::format(
+        "Failed to get file length. File does not exist or is inaccessible: 
{}",
+        location_));
+  }
+  return stat_buffer.st_size;
+}
+
+std::unique_ptr<Reader> FsInputFile::newReader() {
+  return std::make_unique<FsFileReader>(location_);
+}
+
+void FsOutputFile::create() {
+  // Check if the file already exists
+  std::ifstream existing_file(location_);
+  bool file_exists = existing_file.good();
+  existing_file.close();
+
+  if (file_exists) {
+    throw IcebergError(std::format("File already exists: {}", location_));
+  }
+
+  // Create or overwrite the file by opening it in truncating mode
+  std::ofstream new_file(location_, std::ios::binary | std::ios::out | 
std::ios::trunc);
+  if (!new_file.is_open()) {
+    throw IcebergError(std::format("Failed to create or overwrite file: {}", 
location_));
+  }
+  new_file.close();
+}
+
+std::unique_ptr<Writer> FsOutputFile::newWriter() {
+  return std::make_unique<FsFileWriter>(location_);
+}
+
+std::shared_ptr<InputFile> FsFileIO::newInputFile(const std::string& location) 
{
+  // Check if the file exists
+  if (!fileExists(location)) {
+    throw IcebergError(std::format("InputFile does not exist: {}", location));
+  }
+
+  // Create and return an FsInputFile instance
+  return std::make_shared<FsInputFile>(location);
+}
+
+std::shared_ptr<OutputFile> FsFileIO::newOutputFile(const std::string& 
location) {
+  return std::make_shared<FsOutputFile>(location);
+}
+
+void FsFileIO::DeleteFile(const std::string& location) {
+  // Check if the file exists
+  if (!fileExists(location)) {
+    throw IcebergError(std::format("InputFile does not exist: {}", location));
+  }
+  std::error_code ec;
+  if (std::filesystem::remove(location, ec) == false) {
+    throw IcebergError(
+        std::format("Failed to delete file: {}, error code: {}", location, 
ec.message()));
+  }
+}
+
+bool FsFileIO::fileExists(const std::string& location) {
+  std::ifstream file(location);
+  return file.good();

Review Comment:
   Why not std::filesystem::exists like above?



##########
src/iceberg/io/fs_file_io.h:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "iceberg/file_io.h"
+#include "iceberg/io/iceberg_io_export.h"
+
+namespace iceberg::io {
+
+class ICEBERG_IO_EXPORT FsInputFile : public InputFile {

Review Comment:
   nit: do these need to be in the header? The user should never interact with 
the subclasses, right?



##########
src/iceberg/io/fs_file_reader.cc:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/io/fs_file_reader.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+
+namespace iceberg::io {
+
+FsFileReader::FsFileReader(std::string file_path) : 
file_path_(std::move(file_path)) {
+  // Open the file in binary mode
+  input_file_.open(file_path_, std::ios::binary | std::ios::in);
+  if (!input_file_.is_open()) {
+    throw IcebergError(std::format("Failed to open file: {}", file_path_));
+  }
+
+  // Calculate the file size
+  input_file_.seekg(0, std::ios::end);
+  file_size_ = input_file_.tellg();
+  input_file_.seekg(0, std::ios::beg);
+
+  if (file_size_ < 0) {
+    throw IcebergError(std::format("Failed to determine file size: {}", 
file_path_));
+  }
+}
+
+FsFileReader::~FsFileReader() {
+  if (input_file_.is_open()) {
+    input_file_.close();
+  }
+}
+
+int64_t FsFileReader::read(ReadRange range, void* buffer) {
+  if (!input_file_.is_open()) {
+    throw IcebergError("File is not open for reading");
+  }
+
+  if (range.offset < 0 || range.offset + range.length > file_size_) {
+    throw IcebergError(std::format("Invalid read range: [{}, {})", 
range.offset,
+                                   range.offset + range.length));
+  }
+
+  // Seek to the starting position
+  input_file_.seekg(range.offset, std::ios::beg);
+  if (input_file_.fail()) {
+    throw IcebergError(std::format("Failed to seek to offset: {}", 
range.offset));
+  }
+
+  // Read the data into the buffer
+  input_file_.read(static_cast<char*>(buffer), range.length);
+  auto bytes_read = static_cast<int64_t>(input_file_.gcount());
+
+  return bytes_read;  // Return actual bytes read
+}
+
+std::future<int64_t> FsFileReader::readAsync(ReadRange range, void* buffer) {

Review Comment:
   Or more generally an executor instead of specifically a thread pool. I guess 
that's C++26 though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to