wgtmac commented on code in PR #216:
URL: https://github.com/apache/iceberg-cpp/pull/216#discussion_r2418554442
##########
src/iceberg/test/manifest_reader_writer_test.cc:
##########
@@ -133,6 +141,20 @@ class ManifestReaderV1Test : public ManifestReaderTestBase
{
}
return manifest_entries;
}
+
+ void TestWriteManifest(const std::string& manifest_list_path,
+ std::shared_ptr<PartitionSpec> partition_spec,
+ const std::vector<ManifestEntry>& manifest_entries) {
+ std::cout << "Writing manifest list to " << manifest_list_path <<
std::endl;
Review Comment:
remove this?
##########
src/iceberg/test/manifest_reader_writer_test.cc:
##########
@@ -209,6 +246,20 @@ class ManifestReaderV2Test : public ManifestReaderTestBase
{
std::vector<ManifestEntry> PrepareMetadataInheritanceTestData() {
return CreateV2TestData(/*sequence_number=*/15, /*partition_spec_id*/ 12);
}
+
+ void TestWriteManifest(int64_t snapshot_id, const std::string&
manifest_list_path,
+ std::shared_ptr<PartitionSpec> partition_spec,
+ const std::vector<ManifestEntry>& manifest_entries) {
+ std::cout << "Writing manifest list to " << manifest_list_path <<
std::endl;
Review Comment:
remove it?
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
+ protected:
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
protected:
ArrowArray array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
+
+ protected:
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> manifest_schema_;
+ std::unordered_map<std::string, std::string> metadata_;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestFile`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
public:
ManifestFileAdapter() = default;
- ~ManifestFileAdapter() override = default;
+ ~ManifestFileAdapter() override;
virtual Status Append(const ManifestFile& file) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+
+ protected:
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestFile& file);
+ static Status AppendPartitions(ArrowArray* arrow_array,
+ const std::shared_ptr<ListType>&
partition_type,
+ const std::vector<PartitionFieldSummary>&
partitions);
+
+ virtual Result<int64_t> GetSequenceNumber(const ManifestFile& file);
+ virtual Result<int64_t> GetWrappedMinSequenceNumber(const ManifestFile&
file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(const
ManifestFile& file);
+
+ protected:
+ std::shared_ptr<Schema> manifest_list_schema_;
+ std::unordered_map<std::string, std::string> metadata_;
Review Comment:
nit: these two variables can be moved to the base class.
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
+ protected:
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
protected:
ArrowArray array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
Review Comment:
```suggestion
virtual Result<std::optional<std::string>> GetReferenceDataFile(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetFirstRowId(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetContentOffset(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetContentSizeInBytes(
const DataFile& file);
```
Let's be consistent with `GetSequenceNumber` to use const reference as input
and remove `Wrapped`.
##########
src/iceberg/arrow/nanoarrow_error_transform_internal.h:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", error.message); \
Review Comment:
```suggestion
return iceberg::InvalidArrowData("nanoarrow error: {}", error.message); \
```
##########
src/iceberg/partition_spec.cc:
##########
@@ -57,6 +60,49 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
std::span<const PartitionField> PartitionSpec::fields() const { return
fields_; }
+Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
+ if (fields_.empty()) {
+ return nullptr;
+ }
+ {
+ std::scoped_lock<std::mutex> lock(mutex_);
+ if (partition_schema_ != nullptr) {
+ return partition_schema_;
+ }
+ }
+
+ std::vector<SchemaField> partition_fields;
+ for (const auto& partition_field : fields_) {
+ // Get the source field from the original schema by source_id
+ ICEBERG_ASSIGN_OR_RAISE(auto source_field,
+
schema_->FindFieldById(partition_field.source_id()));
+ if (!source_field.has_value()) {
+ return InvalidSchema("Cannot find source field for partition field:{}",
Review Comment:
This is not resolved yet.
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
+ protected:
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
Review Comment:
```suggestion
static Status AppendField(ArrowArray* arrowArray,
std::span<const uint8_t> value);
```
##########
src/iceberg/arrow/nanoarrow_error_transform_internal.h:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", error.message); \
Review Comment:
Do we want to include headers that are used here?
##########
src/iceberg/test/manifest_reader_writer_test.cc:
##########
@@ -230,4 +281,21 @@ TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) {
TestManifestReadingWithManifestFile(manifest_file, expected_entries);
}
+TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) {
+ auto expected_entries = PrepareNonPartitionedTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr,
expected_entries);
+ TestManifestReadingByPath(write_manifest_path, expected_entries);
+}
+
+TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) {
+ auto expected_entries = PrepareMetadataInheritanceTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr,
expected_entries);
+ for (auto& entry : expected_entries) {
+ entry.data_file->partition_spec_id = PartitionSpec::kInitialSpecId;
Review Comment:
Why do we need this? Isn't it the default value?
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = {};
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayInitFromSchema(&array_,
&schema_, &error),
+ error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(&array_));
+ return {};
+}
+
+Result<ArrowArray*> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishBuildingDefault(&array_,
&error),
+ error);
+ return &array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
Review Comment:
This shouldn't happen. Even for an unpartitioned table, it has a special
partition spec.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", status); \
+ }
Review Comment:
```suggestion
```
Remove it by using `ICEBERG_NANOARROW_RETURN_IF_NOT_OK`
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
+ protected:
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
protected:
ArrowArray array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
+
+ protected:
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> manifest_schema_;
+ std::unordered_map<std::string, std::string> metadata_;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestFile`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
public:
ManifestFileAdapter() = default;
- ~ManifestFileAdapter() override = default;
+ ~ManifestFileAdapter() override;
virtual Status Append(const ManifestFile& file) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+
+ protected:
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestFile& file);
+ static Status AppendPartitions(ArrowArray* arrow_array,
+ const std::shared_ptr<ListType>&
partition_type,
+ const std::vector<PartitionFieldSummary>&
partitions);
+
+ virtual Result<int64_t> GetSequenceNumber(const ManifestFile& file);
+ virtual Result<int64_t> GetWrappedMinSequenceNumber(const ManifestFile&
file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(const
ManifestFile& file);
Review Comment:
```suggestion
virtual Result<int64_t> GetMinSequenceNumber(const ManifestFile& file);
virtual Result<std::optional<int64_t>> GetFirstRowId(const ManifestFile&
file);
```
##########
src/iceberg/partition_spec.h:
##########
@@ -67,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable
{
/// \brief Get a view of the partition fields.
std::span<const PartitionField> fields() const;
+ Result<std::shared_ptr<Schema>> GetPartitionSchema();
Review Comment:
```suggestion
Result<std::shared_ptr<StructType>> PartitionType();
```
It is an overkill to use Schema here because it does not need the schema_id.
##########
src/iceberg/test/manifest_reader_writer_test.cc:
##########
@@ -144,6 +166,21 @@ TEST_F(ManifestReaderV1Test, PartitionedTest) {
partition_schema);
}
+TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
+ iceberg::SchemaField partition_field(1000, "order_ts_hour",
iceberg::int32(), true);
+ auto partition_schema =
+ std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
+ auto identity_transform = Transform::Identity();
+ std::vector<PartitionField> fields{
+ PartitionField(1000, 1000, "order_ts_hour", identity_transform)};
Review Comment:
Usually `source_id` and `field_id` cannot be the same.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = {};
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayInitFromSchema(&array_,
&schema_, &error),
+ error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(&array_));
+ return {};
+}
+
+Result<ArrowArray*> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishBuildingDefault(&array_,
&error),
+ error);
+ return &array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->GetPartitionSchema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
Review Comment:
We might need to add more test cases for different partition types. Could
you please add a TODO comment in the test file?
##########
src/iceberg/partition_spec.h:
##########
@@ -83,6 +87,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable
{
const int32_t spec_id_;
std::vector<PartitionField> fields_;
int32_t last_assigned_field_id_;
+ std::mutex mutex_;
+ std::shared_ptr<Schema> partition_schema_;
Review Comment:
Same as above.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
+ return {};
+}
+
+Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
+ return array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_ != nullptr && array_->release != nullptr) {
+ ArrowArrayRelease(array_.get());
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->partition_schema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) currently literal does not support those types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
field.ToString());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(list_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(
+ ArrowArray* arrow_array, const std::map<int32_t, std::vector<uint8_t>>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file->content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
ToString(file->file_format)));
+ break;
+ case 102: // partition (required struct)
+ {
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartition(array, partition_type, file->partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file->key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->key_metadata));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file->sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file->sort_order_id.value())));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file->first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->first_row_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 143: // referenced_data_file (optional string)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
+ GetWrappedReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
referenced_data_file.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file->content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->content_offset.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file->content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, file->content_size_in_bytes.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapter::GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file) {
+ return file->referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file) {
+ return file->first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_offset;
+}
+
+Result<std::optional<int64_t>>
ManifestEntryAdapter::GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_->children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, entry.data_file));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
Review Comment:
This is not resolved yet.
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
Review Comment:
I think `StructType` is redundant.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = {};
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayInitFromSchema(&array_,
&schema_, &error),
+ error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(&array_));
+ return {};
+}
+
+Result<ArrowArray*> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishBuildingDefault(&array_,
&error),
+ error);
+ return &array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->GetPartitionSchema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) currently literal does not support those types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
field.ToString());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(list_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(
+ ArrowArray* arrow_array, const std::map<int32_t, std::vector<uint8_t>>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file->content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
ToString(file->file_format)));
+ break;
+ case 102: // partition (required struct)
+ {
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartition(array, partition_type, file->partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file->key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->key_metadata));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file->sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file->sort_order_id.value())));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file->first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->first_row_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 143: // referenced_data_file (optional string)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
+ GetWrappedReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
referenced_data_file.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file->content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->content_offset.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file->content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, file->content_size_in_bytes.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapter::GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file) {
+ return file->referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file) {
+ return file->first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_offset;
+}
+
+Result<std::optional<int64_t>>
ManifestEntryAdapter::GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_.children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, entry.data_file));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 3: // sequence_number (optional int64)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
+ if (sequence_num.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 4: // file_sequence_number (optional int64)
+ if (entry.file_sequence_number.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, entry.file_sequence_number.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown manifest entry field id: {}",
field.field_id());
+ }
+ }
+
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(&array_));
+ size_++;
+ return {};
+}
+
+Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_schema,
GetManifestEntryStructType())
+ auto fields_span = manifest_entry_schema->fields();
+ std::vector<SchemaField> fields;
+ // TODO(xiao.dong) make this a common function to recursive handle
+ // all nested fields in schema
+ for (const auto& field : fields_span) {
+ if (field.field_id() == 2) {
+ // handle data_file field
+ auto data_file_struct =
internal::checked_pointer_cast<StructType>(field.type());
+ std::vector<SchemaField> data_file_fields;
+ for (const auto& data_file_field : data_file_struct->fields()) {
+ if (fields_ids.contains(data_file_field.field_id())) {
+ data_file_fields.emplace_back(data_file_field);
+ }
+ }
+ auto type = std::make_shared<StructType>(data_file_fields);
+ auto data_file_field = SchemaField::MakeRequired(
+ field.field_id(), std::string(field.name()), std::move(type));
+ fields.emplace_back(std::move(data_file_field));
+ } else {
+ if (fields_ids.contains(field.field_id())) {
+ fields.emplace_back(field);
+ }
+ }
+ }
+ manifest_schema_ = std::make_shared<Schema>(fields);
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
+ return {};
+}
+
+ManifestFileAdapter::~ManifestFileAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Status ManifestFileAdapter::AppendPartitions(
+ ArrowArray* arrow_array, const std::shared_ptr<ListType>& partition_type,
+ const std::vector<PartitionFieldSummary>& partitions) {
+ auto& array = arrow_array->children[0];
+ if (array->n_children != 4) {
+ return InvalidManifestList("Invalid partition array.");
+ }
+ auto partition_struct =
+
internal::checked_pointer_cast<StructType>(partition_type->fields()[0].type());
+ auto fields = partition_struct->fields();
+ for (const auto& partition : partitions) {
+ for (const auto& field : fields) {
Review Comment:
This looks wrong. There is one summary per partition field.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("nanoarrow error: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = {};
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayInitFromSchema(&array_,
&schema_, &error),
+ error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(&array_));
+ return {};
+}
+
+Result<ArrowArray*> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishBuildingDefault(&array_,
&error),
+ error);
+ return &array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->GetPartitionSchema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) currently literal does not support those types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
field.ToString());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(list_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(
+ ArrowArray* arrow_array, const std::map<int32_t, std::vector<uint8_t>>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file->content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
ToString(file->file_format)));
+ break;
+ case 102: // partition (required struct)
+ {
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartition(array, partition_type, file->partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file->key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->key_metadata));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file->sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file->sort_order_id.value())));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file->first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->first_row_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 143: // referenced_data_file (optional string)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
+ GetWrappedReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
referenced_data_file.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file->content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->content_offset.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file->content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, file->content_size_in_bytes.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapter::GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file) {
+ return file->referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file) {
+ return file->first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_offset;
+}
+
+Result<std::optional<int64_t>>
ManifestEntryAdapter::GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_.children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, entry.data_file));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 3: // sequence_number (optional int64)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
+ if (sequence_num.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 4: // file_sequence_number (optional int64)
+ if (entry.file_sequence_number.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, entry.file_sequence_number.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown manifest entry field id: {}",
field.field_id());
+ }
+ }
+
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(&array_));
+ size_++;
+ return {};
+}
+
+Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_schema,
GetManifestEntryStructType())
+ auto fields_span = manifest_entry_schema->fields();
+ std::vector<SchemaField> fields;
+ // TODO(xiao.dong) make this a common function to recursive handle
+ // all nested fields in schema
+ for (const auto& field : fields_span) {
+ if (field.field_id() == 2) {
+ // handle data_file field
+ auto data_file_struct =
internal::checked_pointer_cast<StructType>(field.type());
+ std::vector<SchemaField> data_file_fields;
+ for (const auto& data_file_field : data_file_struct->fields()) {
+ if (fields_ids.contains(data_file_field.field_id())) {
+ data_file_fields.emplace_back(data_file_field);
+ }
+ }
+ auto type = std::make_shared<StructType>(data_file_fields);
+ auto data_file_field = SchemaField::MakeRequired(
+ field.field_id(), std::string(field.name()), std::move(type));
+ fields.emplace_back(std::move(data_file_field));
+ } else {
+ if (fields_ids.contains(field.field_id())) {
+ fields.emplace_back(field);
+ }
+ }
+ }
+ manifest_schema_ = std::make_shared<Schema>(fields);
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
+ return {};
+}
+
+ManifestFileAdapter::~ManifestFileAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Status ManifestFileAdapter::AppendPartitions(
+ ArrowArray* arrow_array, const std::shared_ptr<ListType>& partition_type,
+ const std::vector<PartitionFieldSummary>& partitions) {
Review Comment:
```suggestion
Status ManifestFileAdapter::AppendPartitionSummary(
ArrowArray* arrow_array, const std::shared_ptr<ListType>& partition_type,
const std::vector<PartitionFieldSummary>& summaries) {
```
It is misleading to call it `partitions`
##########
src/iceberg/partition_spec.h:
##########
@@ -67,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable
{
/// \brief Get a view of the partition fields.
std::span<const PartitionField> fields() const;
+ Result<std::shared_ptr<Schema>> GetPartitionSchema();
Review Comment:
BTW, we need to add test case for this. Perhaps add a TODO comment in the
partition_spec_test.cc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]