wgtmac commented on code in PR #611:
URL: https://github.com/apache/iceberg-cpp/pull/611#discussion_r3031487799


##########
src/iceberg/test/incremental_changelog_scan_test.cc:
##########
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/expression/expressions.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/scan_test_base.h"
+
+namespace iceberg {
+
+namespace {
+
+/// \brief Sort changelog scan tasks for deterministic ordering.
+/// Sorts by change_ordinal, then by operation type name, then by file path.
+void SortTasks(std::vector<std::shared_ptr<ChangelogScanTask>>& tasks) {
+  std::ranges::sort(tasks, [](const auto& t1, const auto& t2) {
+    if (t1->change_ordinal() != t2->change_ordinal()) {
+      return t1->change_ordinal() < t2->change_ordinal();
+    }
+    if (t1->operation() != t2->operation()) {
+      return static_cast<uint8_t>(t1->operation()) <
+             static_cast<uint8_t>(t2->operation());
+    }
+    return t1->data_file()->file_path < t2->data_file()->file_path;
+  });
+}
+
+}  // namespace
+
+class IncrementalChangelogScanTest : public ScanTestBase {};
+
+TEST_P(IncrementalChangelogScanTest, DataFilters) {

Review Comment:
   Java's `testDataFilters` uses `withUnavailableLocations` to verify that 
snap1's manifest is actually pruned and never read. This test only checks 
result correctness but doesn't verify manifest pruning. Should we add a similar 
one?



##########
src/iceberg/table_scan.h:
##########
@@ -102,14 +102,87 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   std::shared_ptr<Expression> residual_filter_;
 };
 
+enum class ChangelogOperation : uint8_t {
+  kInsert,
+  kDelete,
+  kUpdateBefore,
+  kUpdateAfter,
+};
+
 /// \brief A scan task for reading changelog entries between snapshots.
 class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
  public:
+  /// \brief Construct an AddedRowsScanTask.
+  ///
+  /// \param change_ordinal Position in the changelog order (0-based).
+  /// \param commit_snapshot_id The snapshot ID that committed this change.
+  /// \param data_file The data file containing the added rows.
+  /// \param delete_files Delete files that apply to this data file.
+  /// \param residual_filter Optional residual filter to apply after reading.
+  ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
+                    std::shared_ptr<DataFile> data_file,
+                    std::vector<std::shared_ptr<DataFile>> delete_files = {},
+                    std::shared_ptr<Expression> residual_filter = nullptr)
+      : change_ordinal_(change_ordinal),
+        commit_snapshot_id_(commit_snapshot_id),
+        data_file_(std::move(data_file)),
+        delete_files_(std::move(delete_files)),
+        residual_filter_(std::move(residual_filter)) {}
+
   Kind kind() const override { return Kind::kChangelogScanTask; }
-  // TODO(): Return actual values once member fields are implemented
-  int64_t size_bytes() const override { return 0; }
-  int32_t files_count() const override { return 0; }
-  int64_t estimated_row_count() const override { return 0; }
+
+  int64_t size_bytes() const override;
+  int32_t files_count() const override;
+  int64_t estimated_row_count() const override;
+
+  virtual ChangelogOperation operation() const = 0;
+
+  /// \brief The position of this change in the changelog order (0-based).
+  virtual int32_t change_ordinal() const { return change_ordinal_; }
+
+  /// \brief The snapshot ID that committed this change.
+  virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }

Review Comment:
   Ditto



##########
src/iceberg/table_scan.h:
##########
@@ -102,14 +102,87 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   std::shared_ptr<Expression> residual_filter_;
 };
 
+enum class ChangelogOperation : uint8_t {
+  kInsert,
+  kDelete,
+  kUpdateBefore,
+  kUpdateAfter,
+};
+
 /// \brief A scan task for reading changelog entries between snapshots.
 class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
  public:
+  /// \brief Construct an AddedRowsScanTask.
+  ///
+  /// \param change_ordinal Position in the changelog order (0-based).
+  /// \param commit_snapshot_id The snapshot ID that committed this change.
+  /// \param data_file The data file containing the added rows.
+  /// \param delete_files Delete files that apply to this data file.
+  /// \param residual_filter Optional residual filter to apply after reading.
+  ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
+                    std::shared_ptr<DataFile> data_file,
+                    std::vector<std::shared_ptr<DataFile>> delete_files = {},
+                    std::shared_ptr<Expression> residual_filter = nullptr)
+      : change_ordinal_(change_ordinal),
+        commit_snapshot_id_(commit_snapshot_id),
+        data_file_(std::move(data_file)),
+        delete_files_(std::move(delete_files)),
+        residual_filter_(std::move(residual_filter)) {}
+
   Kind kind() const override { return Kind::kChangelogScanTask; }
-  // TODO(): Return actual values once member fields are implemented
-  int64_t size_bytes() const override { return 0; }
-  int32_t files_count() const override { return 0; }
-  int64_t estimated_row_count() const override { return 0; }
+
+  int64_t size_bytes() const override;
+  int32_t files_count() const override;
+  int64_t estimated_row_count() const override;
+
+  virtual ChangelogOperation operation() const = 0;
+
+  /// \brief The position of this change in the changelog order (0-based).
+  virtual int32_t change_ordinal() const { return change_ordinal_; }
+
+  /// \brief The snapshot ID that committed this change.
+  virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
+
+  /// \brief The data file containing the added rows.
+  const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
+
+  /// \brief Delete files that apply to this data file.
+  const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
+    return delete_files_;
+  }
+
+  /// \brief Residual filter to apply after reading.
+  const std::shared_ptr<Expression>& residual_filter() const { return 
residual_filter_; }
+
+ protected:
+  int32_t change_ordinal_;
+  int64_t commit_snapshot_id_;
+  std::shared_ptr<DataFile> data_file_;
+  std::vector<std::shared_ptr<DataFile>> delete_files_;
+  std::shared_ptr<Expression> residual_filter_;
+};
+
+/// \brief A scan task for reading rows that were added between snapshots.
+///
+/// This task represents data files that were added to the table, along with 
any
+/// delete files that should be applied when reading the data.
+class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {

Review Comment:
   Let's add the comment from Java which helps understand what is for.
   
   ```
   /**
    * A scan task for inserts generated by adding a data file to the table.
    *
    * <p>Note that added data files may have matching delete files. This may 
happen if a matching
    * position delete file is committed in the same snapshot or if changes for 
multiple snapshots are
    * squashed together.
    *
    * <p>Suppose snapshot S1 adds data files F1, F2, F3 and a position delete 
file, D1, that marks
    * particular records in F1 as deleted. A scan for changes generated by S1 
should include the
    * following tasks:
    *
    * <ul>
    *   <li>AddedRowsScanTask(file=F1, deletes=[D1], snapshot=S1)
    *   <li>AddedRowsScanTask(file=F2, deletes=[], snapshot=S1)
    *   <li>AddedRowsScanTask(file=F3, deletes=[], snapshot=S1)
    * </ul>
    *
    * <p>Readers consuming these tasks should produce added records with 
metadata like change ordinal
    * and commit snapshot ID.
    */
   ```



##########
src/iceberg/table_scan.h:
##########
@@ -133,8 +206,24 @@ struct TableScanContext {
 
   // Validate the context parameters to see if they have conflicts.
   [[nodiscard]] Status Validate() const;
+
+  /// \brief Returns true if this scan is a current lineage scan, which means 
it does not
+  /// specify from/to snapshot IDs.
+  bool IsScanCurrentLineage() const;
+
+  /// \brief Get the snapshot ID to scan up to (inclusive) based on the 
context.
+  Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;
+
+  /// \brief Get the snapshot ID to scan from (exclusive) based on the context.
+  Result<std::optional<int64_t>> FromSnapshotIdExclusive(
+      const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
 };
 
+// Internal validation functions for IncrementalScanBuilder

Review Comment:
   Is it better to move them to snapshot_util.h/cc?



##########
src/iceberg/table_scan.cc:
##########
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalChangelogScan::PlanFiles is not 
implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::pair<std::shared_ptr<Snapshot>, 
std::unique_ptr<SnapshotCache>>>
+      changelog_snapshots;
+
+  for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
+    auto operation = snapshot->Operation();
+    if (!operation.has_value() || operation.value() != 
DataOperation::kReplace) {
+      auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
+                              snapshot_cache->DeleteManifests(io_));
+      if (!delete_manifests.empty()) {
+        return NotSupported(
+            "Delete files are currently not supported in changelog scans");
+      }
+      changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
+    }
+  }
+  if (changelog_snapshots.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::unordered_map<int64_t, int32_t> snapshot_ordinals;
+  int32_t ordinal = 0;
+  for (const auto& snapshot : changelog_snapshots) {
+    snapshot_ids.insert(snapshot.first->snapshot_id);
+    snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
+  }
+
+  std::vector<ManifestFile> data_manifests;
+  std::unordered_set<std::string> seen_manifest_paths;
+  for (const auto& snapshot : changelog_snapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, 
snapshot.second->DataManifests(io_));
+    for (auto& manifest : manifests) {
+      if (snapshot_ids.contains(manifest.added_snapshot_id) &&
+          seen_manifest_paths.insert(manifest.manifest_path).second) {
+        data_manifests.push_back(manifest);
+      }
+    }
+  }
+  if (data_manifests.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(io_, schema_, specs_by_id, 
std::move(data_manifests), {}));
+
+  manifest_group->CaseSensitive(context_.case_sensitive)
+      .Select(ScanColumns())
+      .FilterData(filter())
+      .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
+        return entry.snapshot_id.has_value() &&
+               snapshot_ids.contains(entry.snapshot_id.value());
+      })
+      .IgnoreExisting()
+      .ColumnsToKeepStats(context_.columns_to_keep_stats);
+
+  if (context_.ignore_residuals) {
+    manifest_group->IgnoreResiduals();
+  }
+
+  auto create_tasks_func =
+      [&snapshot_ordinals](
+          std::vector<ManifestEntry>&& entries,
+          const TaskContext& ctx) -> 
Result<std::vector<std::shared_ptr<ScanTask>>> {
+    std::vector<std::shared_ptr<ScanTask>> tasks;
+    tasks.reserve(entries.size());
+
+    for (auto& entry : entries) {
+      if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
+        continue;

Review Comment:
   This should not happen. Use ICEBERG_PRECHECK to return an error instead?



##########
src/iceberg/table_scan.cc:
##########
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalChangelogScan::PlanFiles is not 
implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::pair<std::shared_ptr<Snapshot>, 
std::unique_ptr<SnapshotCache>>>
+      changelog_snapshots;
+
+  for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
+    auto operation = snapshot->Operation();
+    if (!operation.has_value() || operation.value() != 
DataOperation::kReplace) {
+      auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
+                              snapshot_cache->DeleteManifests(io_));
+      if (!delete_manifests.empty()) {
+        return NotSupported(
+            "Delete files are currently not supported in changelog scans");
+      }
+      changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
+    }
+  }
+  if (changelog_snapshots.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::unordered_map<int64_t, int32_t> snapshot_ordinals;
+  int32_t ordinal = 0;
+  for (const auto& snapshot : changelog_snapshots) {
+    snapshot_ids.insert(snapshot.first->snapshot_id);
+    snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
+  }
+
+  std::vector<ManifestFile> data_manifests;
+  std::unordered_set<std::string> seen_manifest_paths;
+  for (const auto& snapshot : changelog_snapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, 
snapshot.second->DataManifests(io_));
+    for (auto& manifest : manifests) {
+      if (snapshot_ids.contains(manifest.added_snapshot_id) &&
+          seen_manifest_paths.insert(manifest.manifest_path).second) {
+        data_manifests.push_back(manifest);
+      }
+    }
+  }
+  if (data_manifests.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(io_, schema_, specs_by_id, 
std::move(data_manifests), {}));
+
+  manifest_group->CaseSensitive(context_.case_sensitive)
+      .Select(ScanColumns())
+      .FilterData(filter())
+      .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
+        return entry.snapshot_id.has_value() &&
+               snapshot_ids.contains(entry.snapshot_id.value());
+      })
+      .IgnoreExisting()
+      .ColumnsToKeepStats(context_.columns_to_keep_stats);
+
+  if (context_.ignore_residuals) {
+    manifest_group->IgnoreResiduals();
+  }
+
+  auto create_tasks_func =
+      [&snapshot_ordinals](
+          std::vector<ManifestEntry>&& entries,
+          const TaskContext& ctx) -> 
Result<std::vector<std::shared_ptr<ScanTask>>> {
+    std::vector<std::shared_ptr<ScanTask>> tasks;
+    tasks.reserve(entries.size());
+
+    for (auto& entry : entries) {
+      if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
+        continue;
+      }
+
+      int64_t commit_snapshot_id = entry.snapshot_id.value();
+      auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
+      if (ordinal_it == snapshot_ordinals.end()) {
+        continue;

Review Comment:
   Same question here.



##########
src/iceberg/test/scan_test_base.h:
##########
@@ -261,6 +261,87 @@ class ScanTestBase : public testing::TestWithParam<int8_t> 
{
     });
   }
 
+  /// \brief Create a delete snapshot with the given files.
+  std::shared_ptr<Snapshot> MakeDeleteSnapshot(

Review Comment:
   Should we add partition_spec as an input to `MakeDeleteSnapshot` and 
`MakeOverwriteSnapshot`?



##########
src/iceberg/table_scan.cc:
##########
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalChangelogScan::PlanFiles is not 
implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::pair<std::shared_ptr<Snapshot>, 
std::unique_ptr<SnapshotCache>>>
+      changelog_snapshots;
+
+  for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
+    auto operation = snapshot->Operation();
+    if (!operation.has_value() || operation.value() != 
DataOperation::kReplace) {
+      auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
+                              snapshot_cache->DeleteManifests(io_));
+      if (!delete_manifests.empty()) {
+        return NotSupported(
+            "Delete files are currently not supported in changelog scans");
+      }
+      changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
+    }
+  }
+  if (changelog_snapshots.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::unordered_map<int64_t, int32_t> snapshot_ordinals;
+  int32_t ordinal = 0;
+  for (const auto& snapshot : changelog_snapshots) {
+    snapshot_ids.insert(snapshot.first->snapshot_id);
+    snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
+  }
+
+  std::vector<ManifestFile> data_manifests;
+  std::unordered_set<std::string> seen_manifest_paths;
+  for (const auto& snapshot : changelog_snapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, 
snapshot.second->DataManifests(io_));
+    for (auto& manifest : manifests) {
+      if (snapshot_ids.contains(manifest.added_snapshot_id) &&
+          seen_manifest_paths.insert(manifest.manifest_path).second) {
+        data_manifests.push_back(manifest);
+      }
+    }
+  }
+  if (data_manifests.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(io_, schema_, specs_by_id, 
std::move(data_manifests), {}));
+
+  manifest_group->CaseSensitive(context_.case_sensitive)
+      .Select(ScanColumns())
+      .FilterData(filter())
+      .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
+        return entry.snapshot_id.has_value() &&
+               snapshot_ids.contains(entry.snapshot_id.value());
+      })
+      .IgnoreExisting()
+      .ColumnsToKeepStats(context_.columns_to_keep_stats);
+
+  if (context_.ignore_residuals) {
+    manifest_group->IgnoreResiduals();
+  }
+
+  auto create_tasks_func =
+      [&snapshot_ordinals](
+          std::vector<ManifestEntry>&& entries,
+          const TaskContext& ctx) -> 
Result<std::vector<std::shared_ptr<ScanTask>>> {
+    std::vector<std::shared_ptr<ScanTask>> tasks;
+    tasks.reserve(entries.size());
+
+    for (auto& entry : entries) {
+      if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
+        continue;
+      }
+
+      int64_t commit_snapshot_id = entry.snapshot_id.value();
+      auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
+      if (ordinal_it == snapshot_ordinals.end()) {
+        continue;
+      }
+      int32_t change_ordinal = ordinal_it->second;
+
+      if (ctx.drop_stats) {
+        ContentFileUtil::DropAllStats(*entry.data_file);
+      } else if (!ctx.columns_to_keep_stats.empty()) {
+        ContentFileUtil::DropUnselectedStats(*entry.data_file, 
ctx.columns_to_keep_stats);
+      }
+
+      ICEBERG_ASSIGN_OR_RAISE(auto residual,
+                              
ctx.residuals->ResidualFor(entry.data_file->partition));

Review Comment:
   Why do we need this specifically for changelog scan?



##########
src/iceberg/table_scan.h:
##########
@@ -102,14 +102,87 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   std::shared_ptr<Expression> residual_filter_;
 };
 
+enum class ChangelogOperation : uint8_t {
+  kInsert,
+  kDelete,
+  kUpdateBefore,
+  kUpdateAfter,
+};
+
 /// \brief A scan task for reading changelog entries between snapshots.
 class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
  public:
+  /// \brief Construct an AddedRowsScanTask.
+  ///
+  /// \param change_ordinal Position in the changelog order (0-based).
+  /// \param commit_snapshot_id The snapshot ID that committed this change.
+  /// \param data_file The data file containing the added rows.
+  /// \param delete_files Delete files that apply to this data file.
+  /// \param residual_filter Optional residual filter to apply after reading.
+  ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
+                    std::shared_ptr<DataFile> data_file,
+                    std::vector<std::shared_ptr<DataFile>> delete_files = {},
+                    std::shared_ptr<Expression> residual_filter = nullptr)
+      : change_ordinal_(change_ordinal),
+        commit_snapshot_id_(commit_snapshot_id),
+        data_file_(std::move(data_file)),
+        delete_files_(std::move(delete_files)),
+        residual_filter_(std::move(residual_filter)) {}
+
   Kind kind() const override { return Kind::kChangelogScanTask; }
-  // TODO(): Return actual values once member fields are implemented
-  int64_t size_bytes() const override { return 0; }
-  int32_t files_count() const override { return 0; }
-  int64_t estimated_row_count() const override { return 0; }
+
+  int64_t size_bytes() const override;
+  int32_t files_count() const override;
+  int64_t estimated_row_count() const override;
+
+  virtual ChangelogOperation operation() const = 0;
+
+  /// \brief The position of this change in the changelog order (0-based).
+  virtual int32_t change_ordinal() const { return change_ordinal_; }

Review Comment:
   This doesn't need to be virtual.



##########
src/iceberg/table_scan.cc:
##########
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalChangelogScan::PlanFiles is not 
implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::pair<std::shared_ptr<Snapshot>, 
std::unique_ptr<SnapshotCache>>>
+      changelog_snapshots;
+
+  for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
+    auto operation = snapshot->Operation();
+    if (!operation.has_value() || operation.value() != 
DataOperation::kReplace) {
+      auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
+                              snapshot_cache->DeleteManifests(io_));
+      if (!delete_manifests.empty()) {
+        return NotSupported(
+            "Delete files are currently not supported in changelog scans");
+      }
+      changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
+    }
+  }
+  if (changelog_snapshots.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::unordered_map<int64_t, int32_t> snapshot_ordinals;
+  int32_t ordinal = 0;
+  for (const auto& snapshot : changelog_snapshots) {
+    snapshot_ids.insert(snapshot.first->snapshot_id);
+    snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
+  }
+
+  std::vector<ManifestFile> data_manifests;
+  std::unordered_set<std::string> seen_manifest_paths;
+  for (const auto& snapshot : changelog_snapshots) {
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, 
snapshot.second->DataManifests(io_));
+    for (auto& manifest : manifests) {
+      if (snapshot_ids.contains(manifest.added_snapshot_id) &&
+          seen_manifest_paths.insert(manifest.manifest_path).second) {
+        data_manifests.push_back(manifest);
+      }
+    }
+  }
+  if (data_manifests.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(io_, schema_, specs_by_id, 
std::move(data_manifests), {}));

Review Comment:
   ```suggestion
         ManifestGroup::Make(io_, schema_, specs_by_id, 
std::move(data_manifests), /*delete_manifests=*/{}));
   ```



##########
src/iceberg/table_scan.cc:
##########
@@ -294,6 +296,24 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
   return MakeArrowArrayStream(std::move(reader));
 }
 
+// ChangelogScanTask implementation
+
+int64_t ChangelogScanTask::size_bytes() const {
+  int64_t total_size = data_file_->file_size_in_bytes;
+  for (const auto& delete_file : delete_files_) {
+    total_size +=
+        (delete_file->IsDeletionVector() ? 
delete_file->content_size_in_bytes.value_or(0)

Review Comment:
   nit: add a `ICEBERG_DCHECK(delete_file->content_size_in_bytes.has_value())` 
as it is unlikely to be unset but just in case



##########
src/iceberg/table_scan.h:
##########
@@ -102,14 +102,87 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   std::shared_ptr<Expression> residual_filter_;
 };
 
+enum class ChangelogOperation : uint8_t {
+  kInsert,
+  kDelete,
+  kUpdateBefore,
+  kUpdateAfter,
+};
+
 /// \brief A scan task for reading changelog entries between snapshots.
 class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
  public:
+  /// \brief Construct an AddedRowsScanTask.
+  ///
+  /// \param change_ordinal Position in the changelog order (0-based).
+  /// \param commit_snapshot_id The snapshot ID that committed this change.
+  /// \param data_file The data file containing the added rows.
+  /// \param delete_files Delete files that apply to this data file.
+  /// \param residual_filter Optional residual filter to apply after reading.
+  ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
+                    std::shared_ptr<DataFile> data_file,
+                    std::vector<std::shared_ptr<DataFile>> delete_files = {},
+                    std::shared_ptr<Expression> residual_filter = nullptr)
+      : change_ordinal_(change_ordinal),
+        commit_snapshot_id_(commit_snapshot_id),
+        data_file_(std::move(data_file)),
+        delete_files_(std::move(delete_files)),
+        residual_filter_(std::move(residual_filter)) {}
+
   Kind kind() const override { return Kind::kChangelogScanTask; }
-  // TODO(): Return actual values once member fields are implemented
-  int64_t size_bytes() const override { return 0; }
-  int32_t files_count() const override { return 0; }
-  int64_t estimated_row_count() const override { return 0; }
+
+  int64_t size_bytes() const override;
+  int32_t files_count() const override;
+  int64_t estimated_row_count() const override;
+
+  virtual ChangelogOperation operation() const = 0;
+
+  /// \brief The position of this change in the changelog order (0-based).
+  virtual int32_t change_ordinal() const { return change_ordinal_; }
+
+  /// \brief The snapshot ID that committed this change.
+  virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
+
+  /// \brief The data file containing the added rows.
+  const std::shared_ptr<DataFile>& data_file() const { return data_file_; }

Review Comment:
   The meaning of `data_file()` and `delete_files()` vary for different 
`ChangelogOperation`. Let's move their definitions to subclasses to pair with 
the Java implementation. The evidence is that comment above does not apply to 
the Java `BaseDeletedRowsScanTask` class.



##########
src/iceberg/table_scan.cc:
##########
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalChangelogScan::PlanFiles is not 
implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::pair<std::shared_ptr<Snapshot>, 
std::unique_ptr<SnapshotCache>>>
+      changelog_snapshots;
+
+  for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
+    auto operation = snapshot->Operation();
+    if (!operation.has_value() || operation.value() != 
DataOperation::kReplace) {
+      auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
+                              snapshot_cache->DeleteManifests(io_));
+      if (!delete_manifests.empty()) {
+        return NotSupported(
+            "Delete files are currently not supported in changelog scans");
+      }
+      changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
+    }
+  }
+  if (changelog_snapshots.empty()) {
+    return std::vector<std::shared_ptr<ChangelogScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::unordered_map<int64_t, int32_t> snapshot_ordinals;
+  int32_t ordinal = 0;

Review Comment:
   nit: we can use `snapshot_ordinals.size()` to replace `ordinal`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to