wgtmac commented on code in PR #105:
URL: https://github.com/apache/iceberg-cpp/pull/105#discussion_r2097552424


##########
src/iceberg/avro/avro_stream.cc:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "avro_stream.h"
+
+#include <format>
+
+#include <arrow/result.h>
+#include <iceberg/exception.h>

Review Comment:
   ```suggestion
   
   #include "iceberg/exception.h"
   ```
   
   nit



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {

Review Comment:
   nit: use `ASSERT_THAT`



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;

Review Comment:
   Can we remove these `std::cout`?



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>

Review Comment:
   ```suggestion
   
   #include "iceberg/arrow/arrow_fs_file_io.h"
   #include "iceberg/avro/avro_stream.h"
   ```



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {
+      throw std::runtime_error("Failed to open output stream: " +
+                               arrow_out_ret.status().message());
+    }
+    return 
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
+                                              buffer_size);
+  }
+
+  std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
+                                                     int64_t buffer_size) {
+    std::cout << "CreateInputStream" << path << std::endl;
+    auto arrow_in_ret = local_fs_->OpenInputFile(path);
+    if (!arrow_in_ret.ok()) {
+      throw std::runtime_error("Failed to open input stream: " +
+                               arrow_in_ret.status().message());
+    }
+    return 
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()),
+                                             buffer_size);
+  }
+
+  void WriteDataToStream(const std::shared_ptr<AvroOutputStream>& 
avro_output_stream,
+                         const std::string& data) {
+    uint8_t* buf;
+    size_t buf_size;
+    ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
+    std::memcpy(buf, data.data(), data.size());
+    avro_output_stream->backup(1024 - data.size());
+    avro_output_stream->flush();
+  }
+
+  void ReadDataFromStream(const std::shared_ptr<AvroInputStream>& 
avro_input_stream,
+                          std::string& data) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&buf, &len));
+    data = std::string(reinterpret_cast<const char*>(buf), len);
+  }
+
+  void CheckStreamEof(const std::shared_ptr<AvroInputStream>& 
avro_input_stream) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_FALSE(avro_input_stream->next(&buf, &len));
+  }
+
+  int64_t buffer_size_ = 1024;
+  std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::string temp_filepath_;
+};
+
+TEST_F(AVROStreamTest, TestAvroBasicStream) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+  {
+    const uint8_t* data{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&data, &len));
+    EXPECT_EQ(len, test_data.size());
+
+    EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
+    EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), 
test_data);
+    std::cout << std::string(reinterpret_cast<const char*>(data), len) << 
std::endl;
+    ASSERT_FALSE(avro_input_stream->next(&data, &len));
+  }
+}
+
+TEST_F(AVROStreamTest, InputStreamBackup) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Read data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size());
+
+  // Backup 10 bytes
+  const size_t backupSize = 10;
+  avro_input_stream->backup(backupSize);
+
+  // Check byteCount after backup
+  EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize);
+
+  // Read the backed-up data again
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, backupSize);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(test_data.size() - backupSize));  // NOLINT
+
+  // Check that we've reached the end of the stream
+  ASSERT_FALSE(avro_input_stream->next(&data, &len));
+}
+
+TEST_F(AVROStreamTest, InputStreamSkip) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Skip the first 10 bytes
+  const size_t skipSize = 10;
+  avro_input_stream->skip(skipSize);
+
+  // Check byteCount after skip
+  EXPECT_EQ(avro_input_stream->byteCount(), skipSize);
+
+  // Read the remaining data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size() - skipSize);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(skipSize));  // NOLINT
+
+  // Check that we've reached the end of the stream
+  ASSERT_FALSE(avro_input_stream->next(&data, &len));
+}
+
+TEST_F(AVROStreamTest, InputStreamSeek) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Seek to position 15
+  const int64_t seekPos = 15;
+  avro_input_stream->seek(seekPos);
+
+  // Check byteCount after seek
+  EXPECT_EQ(avro_input_stream->byteCount(), static_cast<size_t>(seekPos));
+
+  // Read the remaining data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size() - seekPos);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(seekPos));  // NOLINT

Review Comment:
   ```suggestion
               test_data.substr(seekPos));
   ```



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {
+      throw std::runtime_error("Failed to open output stream: " +
+                               arrow_out_ret.status().message());
+    }
+    return 
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
+                                              buffer_size);
+  }
+
+  std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
+                                                     int64_t buffer_size) {
+    std::cout << "CreateInputStream" << path << std::endl;
+    auto arrow_in_ret = local_fs_->OpenInputFile(path);
+    if (!arrow_in_ret.ok()) {
+      throw std::runtime_error("Failed to open input stream: " +
+                               arrow_in_ret.status().message());
+    }
+    return 
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()),
+                                             buffer_size);
+  }
+
+  void WriteDataToStream(const std::shared_ptr<AvroOutputStream>& 
avro_output_stream,
+                         const std::string& data) {
+    uint8_t* buf;
+    size_t buf_size;
+    ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
+    std::memcpy(buf, data.data(), data.size());
+    avro_output_stream->backup(1024 - data.size());
+    avro_output_stream->flush();
+  }
+
+  void ReadDataFromStream(const std::shared_ptr<AvroInputStream>& 
avro_input_stream,
+                          std::string& data) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&buf, &len));
+    data = std::string(reinterpret_cast<const char*>(buf), len);
+  }
+
+  void CheckStreamEof(const std::shared_ptr<AvroInputStream>& 
avro_input_stream) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_FALSE(avro_input_stream->next(&buf, &len));
+  }
+
+  int64_t buffer_size_ = 1024;
+  std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::string temp_filepath_;
+};
+
+TEST_F(AVROStreamTest, TestAvroBasicStream) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+  {
+    const uint8_t* data{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&data, &len));
+    EXPECT_EQ(len, test_data.size());
+
+    EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
+    EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), 
test_data);
+    std::cout << std::string(reinterpret_cast<const char*>(data), len) << 
std::endl;
+    ASSERT_FALSE(avro_input_stream->next(&data, &len));
+  }
+}
+
+TEST_F(AVROStreamTest, InputStreamBackup) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Read data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size());
+
+  // Backup 10 bytes
+  const size_t backupSize = 10;
+  avro_input_stream->backup(backupSize);
+
+  // Check byteCount after backup
+  EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize);
+
+  // Read the backed-up data again
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, backupSize);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(test_data.size() - backupSize));  // NOLINT

Review Comment:
   ```suggestion
               test_data.substr(test_data.size() - backupSize));
   ```



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {
+      throw std::runtime_error("Failed to open output stream: " +
+                               arrow_out_ret.status().message());
+    }
+    return 
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
+                                              buffer_size);
+  }
+
+  std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
+                                                     int64_t buffer_size) {
+    std::cout << "CreateInputStream" << path << std::endl;

Review Comment:
   ditto



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {
+      throw std::runtime_error("Failed to open output stream: " +
+                               arrow_out_ret.status().message());
+    }
+    return 
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
+                                              buffer_size);
+  }
+
+  std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
+                                                     int64_t buffer_size) {
+    std::cout << "CreateInputStream" << path << std::endl;
+    auto arrow_in_ret = local_fs_->OpenInputFile(path);
+    if (!arrow_in_ret.ok()) {
+      throw std::runtime_error("Failed to open input stream: " +
+                               arrow_in_ret.status().message());
+    }
+    return 
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()),
+                                             buffer_size);
+  }
+
+  void WriteDataToStream(const std::shared_ptr<AvroOutputStream>& 
avro_output_stream,
+                         const std::string& data) {
+    uint8_t* buf;
+    size_t buf_size;
+    ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
+    std::memcpy(buf, data.data(), data.size());
+    avro_output_stream->backup(1024 - data.size());
+    avro_output_stream->flush();
+  }
+
+  void ReadDataFromStream(const std::shared_ptr<AvroInputStream>& 
avro_input_stream,
+                          std::string& data) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&buf, &len));
+    data = std::string(reinterpret_cast<const char*>(buf), len);
+  }
+
+  void CheckStreamEof(const std::shared_ptr<AvroInputStream>& 
avro_input_stream) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_FALSE(avro_input_stream->next(&buf, &len));
+  }
+
+  int64_t buffer_size_ = 1024;
+  std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::string temp_filepath_;
+};
+
+TEST_F(AVROStreamTest, TestAvroBasicStream) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+  {
+    const uint8_t* data{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&data, &len));
+    EXPECT_EQ(len, test_data.size());
+
+    EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
+    EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), 
test_data);
+    std::cout << std::string(reinterpret_cast<const char*>(data), len) << 
std::endl;
+    ASSERT_FALSE(avro_input_stream->next(&data, &len));
+  }
+}
+
+TEST_F(AVROStreamTest, InputStreamBackup) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Read data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size());
+
+  // Backup 10 bytes
+  const size_t backupSize = 10;
+  avro_input_stream->backup(backupSize);
+
+  // Check byteCount after backup
+  EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize);
+
+  // Read the backed-up data again
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, backupSize);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(test_data.size() - backupSize));  // NOLINT
+
+  // Check that we've reached the end of the stream
+  ASSERT_FALSE(avro_input_stream->next(&data, &len));
+}
+
+TEST_F(AVROStreamTest, InputStreamSkip) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  // Create a test input stream
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+
+  // Skip the first 10 bytes
+  const size_t skipSize = 10;
+  avro_input_stream->skip(skipSize);
+
+  // Check byteCount after skip
+  EXPECT_EQ(avro_input_stream->byteCount(), skipSize);
+
+  // Read the remaining data
+  const uint8_t* data{};
+  size_t len{};
+  ASSERT_TRUE(avro_input_stream->next(&data, &len));
+  EXPECT_EQ(len, test_data.size() - skipSize);
+  EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
+            test_data.substr(skipSize));  // NOLINT

Review Comment:
   ```suggestion
               test_data.substr(skipSize));
   ```



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+  }
+
+  std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
+                                                       int64_t buffer_size) {
+    std::cout << "CreateOutputStream" << path << std::endl;
+    auto arrow_out_ret = local_fs_->OpenOutputStream(path);
+    if (!arrow_out_ret.ok()) {
+      throw std::runtime_error("Failed to open output stream: " +
+                               arrow_out_ret.status().message());
+    }
+    return 
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
+                                              buffer_size);
+  }
+
+  std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
+                                                     int64_t buffer_size) {
+    std::cout << "CreateInputStream" << path << std::endl;
+    auto arrow_in_ret = local_fs_->OpenInputFile(path);
+    if (!arrow_in_ret.ok()) {
+      throw std::runtime_error("Failed to open input stream: " +
+                               arrow_in_ret.status().message());
+    }
+    return 
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()),
+                                             buffer_size);
+  }
+
+  void WriteDataToStream(const std::shared_ptr<AvroOutputStream>& 
avro_output_stream,
+                         const std::string& data) {
+    uint8_t* buf;
+    size_t buf_size;
+    ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
+    std::memcpy(buf, data.data(), data.size());
+    avro_output_stream->backup(1024 - data.size());
+    avro_output_stream->flush();
+  }
+
+  void ReadDataFromStream(const std::shared_ptr<AvroInputStream>& 
avro_input_stream,
+                          std::string& data) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_TRUE(avro_input_stream->next(&buf, &len));
+    data = std::string(reinterpret_cast<const char*>(buf), len);
+  }
+
+  void CheckStreamEof(const std::shared_ptr<AvroInputStream>& 
avro_input_stream) {
+    const uint8_t* buf{};
+    size_t len{};
+    ASSERT_FALSE(avro_input_stream->next(&buf, &len));
+  }
+
+  int64_t buffer_size_ = 1024;
+  std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::string temp_filepath_;
+};
+
+TEST_F(AVROStreamTest, TestAvroBasicStream) {
+  // Write test data
+  const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
+  WriteDataToStream(avro_output_stream, test_data);
+
+  auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
+  {

Review Comment:
   it seems that the block is unnecessary



##########
test/avro_stream_test.cc:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <gtest/gtest.h>
+#include <iceberg/arrow/arrow_fs_file_io.h>
+#include <iceberg/avro/avro_stream.h>
+
+#include "temp_file_test_base.h"
+
+namespace iceberg::avro {
+
+class AVROStreamTest : public TempFileTestBase {
+ public:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(

Review Comment:
   `file_io_` is not used any where.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to