wgtmac commented on code in PR #552:
URL: https://github.com/apache/iceberg-cpp/pull/552#discussion_r2781137269
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
Review Comment:
Will this check ever fail? If not, should we remove the check or use
`ICEBERG_DCHECK` instead? Same question for below.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
Review Comment:
nit: use aggregate initialization
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
+ data_file->file_path = options_.path;
+ data_file->file_format = options_.format;
+ data_file->partition = options_.partition;
+ data_file->record_count = metrics.row_count.value_or(0);
Review Comment:
```suggestion
data_file->record_count = metrics.row_count.value_or(-1);
```
Java impl uses -1 when row count is unavailable.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
Review Comment:
nit: use aggregate initialization for `writer_options`
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
+ data_file->file_path = options_.path;
+ data_file->file_format = options_.format;
+ data_file->partition = options_.partition;
+ data_file->record_count = metrics.row_count.value_or(0);
+ data_file->file_size_in_bytes = length;
+ data_file->sort_order_id = options_.sort_order_id;
+ data_file->split_offsets = std::move(split_offsets);
+
+ // Convert metrics maps from unordered_map to map
+ for (const auto& [col_id, size] : metrics.column_sizes) {
+ data_file->column_sizes[col_id] = size;
Review Comment:
Do you think it makes sense to change `DataFile` and `Metrics` classes to
use `std::map` or `std::unordered_map` consistently so we don't need to use a
for-loop here?
cc @zhjwpku
##########
src/iceberg/test/data_writer_test.cc:
##########
@@ -17,7 +17,416 @@
* under the License.
*/
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-namespace iceberg {} // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ parquet::RegisterAll();
+ avro::RegisterAll();
+ }
+
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
Review Comment:
```suggestion
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "name", string())});
```
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,124 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
+
+ Status Initialize() {
+ WriterOptions writer_options;
+ writer_options.path = options_.path;
+ writer_options.schema = options_.schema;
+ writer_options.io = options_.io;
+ writer_options.properties = WriterProperties::FromMap(options_.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(writer_,
+ WriterFactoryRegistry::Open(options_.format,
writer_options));
+ return {};
+ }
+
+ Status Write(ArrowArray* data) {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->length();
+ }
+
+ Status Close() {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ if (closed_) {
+ return InvalidArgument("Writer already closed");
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
Review Comment:
I don't think a single writer (or reader) should support thread safety so it
is fine not to add comment like this.
##########
src/iceberg/test/data_writer_test.cc:
##########
@@ -17,7 +17,416 @@
* under the License.
*/
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-namespace iceberg {} // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
Review Comment:
Can we try to consolidate the test cases since each of them only test a tiny
api with repeated boilerplate of creating writer and writing data? This may
lead to test cases explosion if more and more cases are like this.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
Review Comment:
```suggestion
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
```
We should return `invalid state` instead of `invalid argument` in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]