evindj commented on code in PR #552:
URL: https://github.com/apache/iceberg-cpp/pull/552#discussion_r2757420855
##########
src/iceberg/test/data_writer_test.cc:
##########
@@ -17,7 +17,414 @@
* under the License.
*/
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-namespace iceberg {} // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+namespace {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ parquet::RegisterAll();
+ avro::RegisterAll();
+ }
+
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ }
+
+ std::shared_ptr<::arrow::Array> CreateTestData() {
+ ArrowSchema arrow_c_schema;
+ ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ return ::arrow::json::ArrayFromJSONString(
+ ::arrow::struct_(arrow_schema->fields()),
+ R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
+ .ValueOrDie();
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+TEST_F(DataWriterTest, CreateWithParquetFormat) {
+ DataWriterOptions options{
+ .path = "test_data.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_NE(writer, nullptr);
+}
+
+TEST_F(DataWriterTest, CreateWithAvroFormat) {
+ DataWriterOptions options{
+ .path = "test_data.avro",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kAvro,
+ .io = file_io_,
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_NE(writer, nullptr);
+}
Review Comment:
nit: The two tests are quite similar, it is probably possible to leverage a
function to reduce duplication
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,124 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
+
+ Status Initialize() {
+ WriterOptions writer_options;
+ writer_options.path = options_.path;
+ writer_options.schema = options_.schema;
+ writer_options.io = options_.io;
+ writer_options.properties = WriterProperties::FromMap(options_.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(writer_,
+ WriterFactoryRegistry::Open(options_.format,
writer_options));
+ return {};
+ }
+
+ Status Write(ArrowArray* data) {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->length();
+ }
+
+ Status Close() {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ if (closed_) {
+ return InvalidArgument("Writer already closed");
+ }
Review Comment:
I could see a case for making close idempotent, is there any strong reason
why we want to return this error instead of no op for example?
##########
src/iceberg/test/data_writer_test.cc:
##########
@@ -17,7 +17,414 @@
* under the License.
*/
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-namespace iceberg {} // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+namespace {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ parquet::RegisterAll();
+ avro::RegisterAll();
+ }
+
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ }
+
+ std::shared_ptr<::arrow::Array> CreateTestData() {
+ ArrowSchema arrow_c_schema;
+ ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ return ::arrow::json::ArrayFromJSONString(
+ ::arrow::struct_(arrow_schema->fields()),
+ R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
+ .ValueOrDie();
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+TEST_F(DataWriterTest, CreateWithParquetFormat) {
+ DataWriterOptions options{
+ .path = "test_data.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_NE(writer, nullptr);
+}
+
+TEST_F(DataWriterTest, CreateWithAvroFormat) {
+ DataWriterOptions options{
+ .path = "test_data.avro",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kAvro,
+ .io = file_io_,
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_NE(writer, nullptr);
+}
+
+TEST_F(DataWriterTest, WriteAndClose) {
+ DataWriterOptions options{
+ .path = "test_data.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ // Write data
+ auto test_data = CreateTestData();
+ ArrowArray arrow_array;
+ ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+
+ // Check length before close
+ auto length_result = writer->Length();
+ ASSERT_THAT(length_result, IsOk());
+ EXPECT_GT(length_result.value(), 0);
Review Comment:
nit: check the size of the data passed to the write function?
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,124 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
+
+ Status Initialize() {
+ WriterOptions writer_options;
+ writer_options.path = options_.path;
+ writer_options.schema = options_.schema;
+ writer_options.io = options_.io;
+ writer_options.properties = WriterProperties::FromMap(options_.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(writer_,
+ WriterFactoryRegistry::Open(options_.format,
writer_options));
+ return {};
+ }
+
+ Status Write(ArrowArray* data) {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->length();
+ }
+
+ Status Close() {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ if (closed_) {
+ return InvalidArgument("Writer already closed");
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
Review Comment:
Should this class address thread safety?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]