obelix74 commented on code in PR #3385: URL: https://github.com/apache/polaris/pull/3385#discussion_r2849077673
########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetricsPersistence.java: ########## @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc; + +import static org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RequestIdSupplier; +import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; +import org.apache.polaris.core.persistence.metrics.MetricsPersistence; +import org.apache.polaris.core.persistence.metrics.MetricsQueryCriteria; +import org.apache.polaris.core.persistence.metrics.ReportIdToken; +import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.pagination.Token; +import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport; +import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBC implementation of {@link MetricsPersistence}. + * + * <p>This class provides direct JDBC persistence for metrics reports, converting between SPI record + * types ({@link ScanMetricsRecord}, {@link CommitMetricsRecord}) and JDBC model types ({@link + * ModelScanMetricsReport}, {@link ModelCommitMetricsReport}). + * + * <p>This implementation assumes that metrics tables exist. The producer ({@link + * JdbcMetricsPersistenceProducer}) checks for metrics table availability at startup and returns + * {@link MetricsPersistence#NOOP} if tables are not present. + */ +public class JdbcMetricsPersistence implements MetricsPersistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMetricsPersistence.class); + + private final DatasourceOperations datasourceOperations; + private final String realmId; + private final PolarisPrincipal polarisPrincipal; + private final RequestIdSupplier requestIdSupplier; + + /** + * Creates a new JdbcMetricsPersistence instance. + * + * @param datasourceOperations the datasource operations for JDBC access + * @param realmId the realm ID for multi-tenancy + * @param polarisPrincipal the authenticated principal for the current request + * @param requestIdSupplier supplier for obtaining the request ID + */ + public JdbcMetricsPersistence( + DatasourceOperations datasourceOperations, + String realmId, + PolarisPrincipal polarisPrincipal, + RequestIdSupplier requestIdSupplier) { + this.datasourceOperations = datasourceOperations; + this.realmId = realmId; + this.polarisPrincipal = polarisPrincipal; + this.requestIdSupplier = requestIdSupplier; + } + + @Override + public void writeScanReport(@Nonnull ScanMetricsRecord record) { + // Obtain request context fields + String principalName = polarisPrincipal != null ? polarisPrincipal.getName() : null; + String requestId = requestIdSupplier.getRequestId(); + String otelTraceId = null; + String otelSpanId = null; + + // Get OpenTelemetry context if available + SpanContext spanContext = Span.current().getSpanContext(); + if (spanContext.isValid()) { + otelTraceId = spanContext.getTraceId(); + otelSpanId = spanContext.getSpanId(); + } + + ModelScanMetricsReport model = + SpiModelConverter.toModelScanReport( + record, realmId, principalName, requestId, otelTraceId, otelSpanId); + writeScanMetricsReport(model); + } + + @Override + public void writeCommitReport(@Nonnull CommitMetricsRecord record) { + // Obtain request context fields + String principalName = polarisPrincipal != null ? polarisPrincipal.getName() : null; + String requestId = requestIdSupplier.getRequestId(); + String otelTraceId = null; + String otelSpanId = null; + + // Get OpenTelemetry context if available + SpanContext spanContext = Span.current().getSpanContext(); + if (spanContext.isValid()) { + otelTraceId = spanContext.getTraceId(); + otelSpanId = spanContext.getSpanId(); + } + + ModelCommitMetricsReport model = + SpiModelConverter.toModelCommitReport( + record, realmId, principalName, requestId, otelTraceId, otelSpanId); + writeCommitMetricsReport(model); + } + + @Override + @Nonnull + public Page<ScanMetricsRecord> queryScanReports( + @Nonnull MetricsQueryCriteria criteria, @Nonnull PageToken pageToken) { + // catalogId and tableId are required for queries + if (criteria.catalogId().isEmpty() || criteria.tableId().isEmpty()) { + return Page.fromItems(List.of()); + } + + int limit = pageToken.pageSize().orElse(100); + Long startTimeMs = criteria.startTime().map(t -> t.toEpochMilli()).orElse(null); + Long endTimeMs = criteria.endTime().map(t -> t.toEpochMilli()).orElse(null); + + // Extract composite cursor from page token if present + // The cursor is (timestamp_ms, report_id) for chronological pagination + var cursorToken = pageToken.valueAs(ReportIdToken.class); + Long cursorTimestampMs = cursorToken.map(ReportIdToken::timestampMs).orElse(null); + String cursorReportId = cursorToken.map(ReportIdToken::reportId).orElse(null); + + List<ModelScanMetricsReport> models = + queryScanMetricsReports( + criteria.catalogId().getAsLong(), + criteria.tableId().getAsLong(), + startTimeMs, + endTimeMs, + cursorTimestampMs, + cursorReportId, + limit); + + List<ScanMetricsRecord> records = + models.stream().map(SpiModelConverter::toScanMetricsRecord).collect(Collectors.toList()); + + // Build continuation token only when we might have more pages + Token nextToken = records.size() >= limit ? ReportIdToken.fromRecord(records.getLast()) : null; + + return Page.page(pageToken, records, nextToken); + } + + @Override + @Nonnull + public Page<CommitMetricsRecord> queryCommitReports( + @Nonnull MetricsQueryCriteria criteria, @Nonnull PageToken pageToken) { + // catalogId and tableId are required for queries + if (criteria.catalogId().isEmpty() || criteria.tableId().isEmpty()) { + return Page.fromItems(List.of()); + } + + int limit = pageToken.pageSize().orElse(100); + Long startTimeMs = criteria.startTime().map(t -> t.toEpochMilli()).orElse(null); + Long endTimeMs = criteria.endTime().map(t -> t.toEpochMilli()).orElse(null); + + // Extract composite cursor from page token if present + // The cursor is (timestamp_ms, report_id) for chronological pagination + var cursorToken = pageToken.valueAs(ReportIdToken.class); + Long cursorTimestampMs = cursorToken.map(ReportIdToken::timestampMs).orElse(null); + String cursorReportId = cursorToken.map(ReportIdToken::reportId).orElse(null); + + List<ModelCommitMetricsReport> models = + queryCommitMetricsReports( + criteria.catalogId().getAsLong(), + criteria.tableId().getAsLong(), + startTimeMs, + endTimeMs, + cursorTimestampMs, + cursorReportId, + limit); + + List<CommitMetricsRecord> records = + models.stream().map(SpiModelConverter::toCommitMetricsRecord).collect(Collectors.toList()); + + // Build continuation token only when we might have more pages + Token nextToken = records.size() >= limit ? ReportIdToken.fromRecord(records.getLast()) : null; + + return Page.page(pageToken, records, nextToken); + } + + // ========== Internal JDBC methods ========== + + /** + * Writes a scan metrics report to the database. + * + * <p>This operation is idempotent - writing the same reportId twice has no effect. The primary + * key (realm_id, report_id) constraint is used with ON CONFLICT DO NOTHING to ensure idempotency. + * + * @param report the scan metrics report to persist + */ + void writeScanMetricsReport(@Nonnull ModelScanMetricsReport report) { + try { + PreparedQuery pq = + buildIdempotentInsertQuery( + ModelScanMetricsReport.ALL_COLUMNS, + ModelScanMetricsReport.TABLE_NAME, + report.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(), + datasourceOperations.getDatabaseType()); + // Note: updated may be 0 if the report already exists (idempotent insert) + datasourceOperations.executeUpdate(pq); + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to write scan metrics report due to %s", e.getMessage()), e); + } + } + + /** + * Writes a commit metrics report to the database. + * + * <p>This operation is idempotent - writing the same reportId twice has no effect. The primary + * key (realm_id, report_id) constraint is used with ON CONFLICT DO NOTHING to ensure idempotency. + * + * @param report the commit metrics report to persist + */ + void writeCommitMetricsReport(@Nonnull ModelCommitMetricsReport report) { + try { + PreparedQuery pq = + buildIdempotentInsertQuery( + ModelCommitMetricsReport.ALL_COLUMNS, + ModelCommitMetricsReport.TABLE_NAME, + report.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(), + datasourceOperations.getDatabaseType()); + // Note: updated may be 0 if the report already exists (idempotent insert) + datasourceOperations.executeUpdate(pq); + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to write commit metrics report due to %s", e.getMessage()), e); + } + } + + /** + * Builds an idempotent INSERT query that ignores conflicts on the primary key. + * + * <p>This ensures that duplicate reportIds are silently ignored, fulfilling the idempotency + * contract of the {@link MetricsPersistence} interface. + * + * @param columns the column names + * @param tableName the table name + * @param values the values to insert + * @param databaseType the database type (for dialect-specific syntax) + * @return a PreparedQuery with the idempotent INSERT statement + */ + private static PreparedQuery buildIdempotentInsertQuery( + List<String> columns, String tableName, List<Object> values, DatabaseType databaseType) { + String columnList = String.join(", ", columns); + String placeholders = columns.stream().map(c -> "?").collect(Collectors.joining(", ")); + + String sql; + if (databaseType == DatabaseType.H2) { + // H2 uses MERGE INTO for idempotent inserts, but INSERT ... ON CONFLICT also works in newer + // versions + // Using standard SQL MERGE syntax for H2 compatibility + sql = + "MERGE INTO " + + QueryGenerator.getFullyQualifiedTableName(tableName) + + " (" + + columnList + + ") KEY (realm_id, report_id) VALUES (" + + placeholders + + ")"; + } else { + // PostgreSQL: Use INSERT ... ON CONFLICT DO NOTHING + sql = + "INSERT INTO " + + QueryGenerator.getFullyQualifiedTableName(tableName) + + " (" + + columnList + + ") VALUES (" + + placeholders + + ") ON CONFLICT (realm_id, report_id) DO NOTHING"; + } + return new PreparedQuery(sql, values); Review Comment: Fixed it in commit 9e3d4d0f409720ab4005db176190ebde29a9d2b6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
