wgtmac commented on code in PR #701:
URL: https://github.com/apache/iceberg-cpp/pull/701#discussion_r3437367400
##########
src/iceberg/table_scan.cc:
##########
@@ -538,11 +565,39 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
DataTableScan::PlanFiles() co
.Select(ScanColumns())
.FilterData(filter())
.IgnoreDeleted()
- .ColumnsToKeepStats(context_.columns_to_keep_stats);
+ .ColumnsToKeepStats(context_.columns_to_keep_stats)
+ .ScanMetrics(scan_metrics);
if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}
- return manifest_group->PlanFiles();
+ ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->PlanFiles());
+
+ timed.Stop();
+
+ if (context_.metrics_reporter) {
+ ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, ResolveProjectedSchema());
+ const auto& schema_ptr = projected_schema.get();
+ std::vector<int32_t> projected_field_ids;
+ std::vector<std::string> projected_field_names;
+ for (const auto& field : schema_ptr->fields()) {
+ projected_field_ids.push_back(field.field_id());
+ projected_field_names.emplace_back(field.name());
+ }
+
+ ScanReport report{
+ .table_name = context_.table_name,
+ .snapshot_id = snapshot->snapshot_id,
+ .filter = context_.filter,
Review Comment:
This sends the raw scan filter to metrics reporters. Java sanitizes the
filter before reporting, so literal predicate values are not exposed. With the
current code, a scan like `email = 'x'` posts that value to the REST metrics
endpoint or any custom reporter.
##########
src/iceberg/update/snapshot_update.h:
##########
@@ -54,6 +55,15 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
Kind kind() const override { return Kind::kUpdateSnapshot; }
bool IsRetryable() const override { return true; }
+ /// \brief Set the metrics reporter for this snapshot update.
+ ///
+ /// \param reporter The metrics reporter to use.
+ /// \return Reference to this for method chaining.
+ auto& ReportWith(this auto& self, std::shared_ptr<MetricsReporter> reporter)
{
+ static_cast<SnapshotUpdate&>(self).reporter_ = std::move(reporter);
Review Comment:
This stores the reporter on `SnapshotUpdate`, but nothing reads `reporter_`
when building the `CommitReport`; `Transaction::Commit` uses
`ctx_->table->reporter()` instead. A caller that does
`fast_append->ReportWith(custom_reporter)` will not receive a commit report
unless the table already has that reporter. Java's
`SnapshotProducer.reportWith` overrides the operation reporter.
##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -201,10 +204,33 @@ RestCatalog::RestCatalog(RestCatalogProperties config,
std::shared_ptr<FileIO> f
catalog_session_(std::move(catalog_session)),
snapshot_mode_(snapshot_mode) {
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be
null");
+ const auto& props = config_.configs();
+ auto it = props.find(std::string(kMetricsReporterImpl));
+ if (it != props.end() && !it->second.empty() &&
+ it->second != kMetricsReporterTypeNoop) {
+ if (auto r = MetricsReporters::Load(props); r.has_value()) {
Review Comment:
`MetricsReporters::Load` errors are silently ignored. If
`metrics-reporter-impl` is misspelled or the factory fails, the REST catalog
still initializes with no custom reporter and no diagnostic. Java fails catalog
initialization for an invalid metrics reporter; this should propagate the load
error.
##########
src/iceberg/transaction.cc:
##########
@@ -378,6 +392,27 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
committed_ = true;
ctx_->table = std::move(commit_result.value());
+ // Fire CommitReport only when a new snapshot was produced (not for
property-only
+ // commits).
+ const auto& reporter = ctx_->table->reporter();
+ if (reporter) {
+ auto snapshot_result = ctx_->table->metadata()->Snapshot();
+ if (snapshot_result.has_value() && snapshot_result.value() &&
+ snapshot_result.value()->snapshot_id != pre_commit_snapshot_id) {
+ const auto& snapshot = snapshot_result.value();
+ const auto op = snapshot->Operation();
+ CommitReport report{
+ .table_name = ctx_->table->name().ToString(),
Review Comment:
Same table-name issue for commit reports: this uses
`TableIdentifier::ToString()`, so the catalog name is missing. Java commit
reports use the table's full name (`catalog.namespace.table`), which avoids
collisions across catalogs.
##########
src/iceberg/table_scan.cc:
##########
@@ -538,11 +565,39 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
DataTableScan::PlanFiles() co
.Select(ScanColumns())
.FilterData(filter())
.IgnoreDeleted()
- .ColumnsToKeepStats(context_.columns_to_keep_stats);
+ .ColumnsToKeepStats(context_.columns_to_keep_stats)
+ .ScanMetrics(scan_metrics);
if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}
- return manifest_group->PlanFiles();
+ ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->PlanFiles());
+
+ timed.Stop();
+
+ if (context_.metrics_reporter) {
+ ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, ResolveProjectedSchema());
+ const auto& schema_ptr = projected_schema.get();
+ std::vector<int32_t> projected_field_ids;
+ std::vector<std::string> projected_field_names;
+ for (const auto& field : schema_ptr->fields()) {
Review Comment:
This only reports top-level projected fields. For nested projections, Java
uses `TypeUtil.getProjectedIds(schema())` and includes nested child IDs/names.
The C++ scan already uses `GetProjectedIdsVisitor` when resolving selected
columns; the report should use the same recursive IDs plus `FindColumnNameById`.
##########
src/iceberg/delete_file_index.cc:
##########
@@ -624,6 +638,8 @@ Result<std::vector<ManifestEntry>>
DeleteFileIndex::Builder::LoadDeleteFiles() {
file.equality_ids.end());
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
files.emplace_back(std::move(entry));
+ } else {
+ if (scan_metrics_) scan_metrics_->skipped_delete_files->Increment(1);
Review Comment:
This counts delete files older than `min_sequence_number_` as
`skipped_delete_files`. Java drops those files in the same sequence-number
filter without counting them as skipped. Snapshots with old delete files will
report extra skipped delete files.
##########
src/iceberg/catalog/memory/in_memory_catalog.cc:
##########
@@ -351,7 +352,15 @@ InMemoryCatalog::InMemoryCatalog(
properties_(std::move(properties)),
file_io_(std::move(file_io)),
warehouse_location_(std::move(warehouse_location)),
- root_namespace_(std::make_unique<InMemoryNamespace>()) {}
+ root_namespace_(std::make_unique<InMemoryNamespace>()) {
+ auto it = properties_.find(std::string(kMetricsReporterImpl));
+ if (it != properties_.end() && !it->second.empty() &&
+ it->second != kMetricsReporterTypeNoop) {
+ if (auto r = MetricsReporters::Load(properties_); r.has_value()) {
Review Comment:
Same issue here: `MetricsReporters::Load` errors are ignored, so an invalid
`metrics-reporter-impl` silently disables reporting for the in-memory catalog.
This should propagate the load failure.
##########
src/iceberg/catalog/sql/sql_catalog.cc:
##########
@@ -128,9 +129,15 @@ Result<std::string> ResolveTableLocation(
SqlCatalog::SqlCatalog(SqlCatalogConfig config, std::shared_ptr<FileIO>
file_io,
std::shared_ptr<CatalogStore> store)
- : config_(std::move(config)),
- file_io_(std::move(file_io)),
- store_(std::move(store)) {}
+ : config_(std::move(config)), file_io_(std::move(file_io)),
store_(std::move(store)) {
+ auto it = config_.props.find(std::string(kMetricsReporterImpl));
+ if (it != config_.props.end() && !it->second.empty() &&
+ it->second != kMetricsReporterTypeNoop) {
+ if (auto r = MetricsReporters::Load(config_.props); r.has_value()) {
Review Comment:
Same issue here: `MetricsReporters::Load` errors are ignored, so an invalid
`metrics-reporter-impl` silently disables reporting for the SQL catalog. This
should propagate the load failure.
##########
src/iceberg/manifest/manifest_group.cc:
##########
@@ -220,6 +227,24 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
ManifestGroup::PlanFiles() {
for (auto& task : tasks) {
file_tasks.push_back(internal::checked_pointer_cast<FileScanTask>(task));
}
+
+ if (scan_metrics_) {
+ for (const auto& task : file_tasks) {
+ for (const auto& df : task->delete_files()) {
+ scan_metrics_->result_delete_files->Increment(1);
Review Comment:
These counters are updated per `FileScanTask`, so the same delete file is
counted once for every data file it applies to. Java increments indexed/type
delete counters once while building `DeleteFileIndex`. A global or partition
delete file that matches N data files will inflate `indexed_delete_files` and
the delete type counters by N here.
##########
src/iceberg/table.cc:
##########
@@ -151,12 +154,23 @@ const std::shared_ptr<TableMetadata>& Table::metadata()
const { return metadata_
const std::shared_ptr<Catalog>& Table::catalog() const { return catalog_; }
+const std::shared_ptr<MetricsReporter>& Table::reporter() const { return
reporter_; }
+
+void Table::CombineReporter(std::shared_ptr<MetricsReporter> additional) {
+ reporter_ = MetricsReporters::Combine(reporter_, std::move(additional));
+}
+
Result<std::unique_ptr<LocationProvider>> Table::location_provider() const {
return LocationProvider::Make(metadata_->location, metadata_->properties);
}
Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const {
- return DataTableScanBuilder::Make(metadata_, io_);
+ ICEBERG_ASSIGN_OR_RAISE(auto builder, DataTableScanBuilder::Make(metadata_,
io_));
+ builder->TableName(identifier_.ToString());
Review Comment:
This uses the table identifier, not the fully qualified table name. Java
reports `BaseTable.name()`, and REST builds that as `catalog.namespace.table`.
Scan reports from two catalogs with the same identifier will collide and differ
from Java REST payloads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]