snazy commented on code in PR #1844: URL: https://github.com/apache/polaris/pull/1844#discussion_r2185141900
########## service/common/src/main/java/org/apache/polaris/service/events/BeforeCatalogCreatedEvent.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +/** Emitted when Polaris intends to create a table. */ +public record BeforeCatalogCreatedEvent(String eventId, String catalogName) Review Comment: There are new events added in this PR, which is out of scope (not mentioned in the PR summary) of the already big PR. These are not "samples", but used in production code. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -200,6 +202,61 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE }); } + /** + * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same + * parameterized form. + * + * @param preparedQueries : queries to be executed + * @return : Number of rows modified / inserted. + * @throws SQLException : Exception during Query Execution. + */ + public int executeBatchUpdate(List<QueryGenerator.PreparedQuery> preparedQueries) + throws SQLException { + if (preparedQueries.isEmpty()) { + return 0; + } + int batchSize = 100; + AtomicInteger successCount = new AtomicInteger(); + return withRetries( + () -> { + String sql = preparedQueries.get(0).sql(); + try (Connection connection = borrowConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + + try { + for (int i = 1; i <= preparedQueries.size(); i++) { + List<Object> params = preparedQueries.get(i - 1).parameters(); + for (int j = 0; j < params.size(); j++) { + statement.setObject(j + 1, params.get(j)); + } + + statement.addBatch(); // Add to batch + + if (i % batchSize == 0) { + successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum()); + } + } + + // Execute remaining queries in the batch + successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum()); + success = true; + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + successCount.set(0); + } + connection.setAutoCommit(autoCommit); Review Comment: Why this at all and why isn't this in its own `finally` block? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeCatalogCreatedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +/** + * Represents an event listener that can respond to notable moments during Polaris's execution. + * Event details are documented under the event objects themselves. + */ +public class PolarisEventListener { + + /** {@link BeforeRequestRateLimitedEvent} */ + public void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeTableCommitedEvent} */ + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterTableCommitedEvent} */ + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeViewCommitedEvent} */ + public void onBeforeViewCommited( + BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterViewCommitedEvent} */ + public void onAfterViewCommited( + AfterViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeTableRefreshedEvent} */ + public void onBeforeTableRefreshed( + BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterTableRefreshedEvent} */ + public void onAfterTableRefreshed( + AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeViewRefreshedEvent} */ + public void onBeforeViewRefreshed( + BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterViewRefreshedEvent} */ + public void onAfterViewRefreshed( + AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeTaskAttemptedEvent} */ + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, CallContext callCtx) {} + + /** {@link AfterTaskAttemptedEvent} */ + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext callCtx) {} + + /** {@link BeforeTableCreatedEvent} */ + public void onBeforeTableCreated( + BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterTableCreatedEvent} */ + public void onAfterTableCreated( + AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link BeforeCatalogCreatedEvent} */ + public void onBeforeCatalogCreated( + BeforeCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} Review Comment: This is weird - there's a `CallContext` here and also in this and other events. ########## service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java: ########## @@ -20,13 +20,6 @@ import org.apache.polaris.core.context.CallContext; -/** - * Emitted before an attempt of an async task, such as manifest file cleanup, begins. - * - * @param taskEntityId The ID of the TaskEntity - * @param callContext The CallContext the task is being executed under. - * @param attempt The attempt number. Each retry of the task will have its own attempt number. The - * initial (non-retried) attempt starts counting from 1. - */ -public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt) +public record BeforeTaskAttemptedEvent( + String eventId, long taskEntityId, CallContext callContext, int attempt) Review Comment: I didn't realize before that some `record`s for events contain a `CallContext` - that should really be removed, as it is _runtime_ specific information referencing _resources_, not pure event related information. There is no way to re-construct an event without the original `CallContext`. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Serializable; +import java.util.Map; + +public class PolarisEvent implements Serializable { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalParameters; + + public String getCatalogId() { + return catalogId; + } + + public String getId() { + return id; + } + + public String getRequestId() { + return requestId; + } + + public String getEventType() { + return eventType; + } + + public long getTimestampMs() { + return timestampMs; + } + + public String getPrincipalName() { + return principalName; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getResourceIdentifier() { + return resourceIdentifier; + } + + public String getAdditionalParameters() { + return additionalParameters != null ? additionalParameters : EMPTY_MAP_STRING; + } + + public Map<String, String> getAdditionalParametersAsMap() { + try { + return additionalParameters != null + ? MAPPER.readValue(this.additionalParameters, Map.class) + : Map.of(); + } catch (JsonProcessingException ex) { + throw new IllegalStateException( + String.format( + "Failed to deserialize json. additionalParameters %s", this.additionalParameters), + ex); + } + } + + @JsonCreator Review Comment: Where is this JSON-serialized? ########## persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java: ########## @@ -92,6 +96,53 @@ void testExecuteUpdate_failure() throws Exception { assertThrows(SQLException.class, () -> datasourceOperations.executeUpdate(query)); } + @Test + void testExecuteBatchUpdate_success() throws Exception { Review Comment: I know other tests in the code base do it as well, but repeating terms (here: `test`) doesn't make sense - it's clear that this is a test, it's annotated with `@Test` ########## persistence/relational-jdbc/src/main/resources/postgres/schema-v1.sql: ########## @@ -114,3 +114,17 @@ CREATE TABLE IF NOT EXISTS policy_mapping_record ( ); CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( Review Comment: How are Polaris upgrades handled? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + @Override + public final void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewCommited( + BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewCommited( + AfterViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableRefreshed( + BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableRefreshed( + AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewRefreshed( + BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewRefreshed( + AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onBeforeTableCreated( + BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCreated( + AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalParameters(additionalParameters); + + addToBuffer(polarisEvent, callCtx); + } + + @Override + public void onAfterCatalogCreated( + AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent, callCtx); + } + + private long getTimestamp(CallContext callCtx) { + return callCtx.getPolarisCallContext().getClock().millis(); + } + + private String getRequestId(CallContext callCtx) { + return callCtx.getPolarisCallContext().getRequestId(); + } Review Comment: Why do these values become different? ########## service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java: ########## @@ -56,7 +58,11 @@ public void filter(ContainerRequestContext ctx) throws IOException { if (!rateLimiter.canProceed()) { polarisEventListener.onBeforeRequestRateLimited( new BeforeRequestRateLimitedEvent( - ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString())); + PolarisEvent.createEventId(), + ctx.getMethod(), + ctx.getUriInfo().getAbsolutePath().toString()), + CallContext.getCurrentContext(), + ctx.getSecurityContext()); ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); Review Comment: This is quite a dangerous call site. An attacker can easily overload the system when rate-limiting kicks in. I consider this a legit security issue that would justify a CVE. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc.models; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; + +public class ModelEvent implements Converter<PolarisEvent> { Review Comment: Tests for the getters, the builder and implementations are completely missing. ########## service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java: ########## @@ -21,15 +21,6 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.TableIdentifier; -/** Review Comment: I see a bunch of Javadoc being removed. What's wrong with having Javadocs? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java: ########## @@ -261,6 +262,12 @@ public void writeToGrantRecords( runActionInTransaction(callCtx, () -> this.writeToGrantRecordsInCurrentTxn(callCtx, grantRec)); } + @Override + public void writeEvents(@Nonnull List<PolarisEvent> events) { + throw new UnsupportedOperationException( + "Not implemented for EclipseLink, as it is deprecated."); Review Comment: Eclipselink?? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + @Override + public final void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewCommited( + BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewCommited( + AfterViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableRefreshed( + BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableRefreshed( + AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewRefreshed( + BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewRefreshed( + AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onBeforeTableCreated( + BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCreated( + AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalParameters(additionalParameters); + + addToBuffer(polarisEvent, callCtx); + } + + @Override + public void onAfterCatalogCreated( + AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent, callCtx); + } Review Comment: It is unclear why these are special implementations. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { Review Comment: Persistence implementation specific stuff should be in persistence, not here. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Serializable; +import java.util.Map; + +public class PolarisEvent implements Serializable { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalParameters; + + public String getCatalogId() { + return catalogId; + } + + public String getId() { + return id; + } + + public String getRequestId() { + return requestId; + } + + public String getEventType() { + return eventType; + } + + public long getTimestampMs() { + return timestampMs; + } + + public String getPrincipalName() { + return principalName; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getResourceIdentifier() { + return resourceIdentifier; + } + + public String getAdditionalParameters() { + return additionalParameters != null ? additionalParameters : EMPTY_MAP_STRING; + } + + public Map<String, String> getAdditionalParametersAsMap() { + try { + return additionalParameters != null + ? MAPPER.readValue(this.additionalParameters, Map.class) + : Map.of(); + } catch (JsonProcessingException ex) { + throw new IllegalStateException( + String.format( + "Failed to deserialize json. additionalParameters %s", this.additionalParameters), + ex); + } + } + + @JsonCreator + public PolarisEvent( + @JsonProperty("catalog_id") String catalogId, + @JsonProperty("id") String id, + @JsonProperty("request_id") String requestId, + @JsonProperty("event_type") String eventType, + @JsonProperty("timestamp_ms") long timestampMs, + @JsonProperty("actor") String actor, + @JsonProperty("resource_type") ResourceType resourceType, + @JsonProperty("resource_identifier") String resourceIdentifier) { + this.catalogId = catalogId; + this.id = id; + this.requestId = requestId; + this.eventType = eventType; + this.timestampMs = timestampMs; + this.principalName = actor; + this.resourceType = resourceType; + this.resourceIdentifier = resourceIdentifier; + } + + // Needed for Kryo Deserialization Review Comment: Why Kryo? ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc.models; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; + +public class ModelEvent implements Converter<PolarisEvent> { Review Comment: Using `@PolarisImmutable` would cut the LoC to ~ 1/3. ########## service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java: ########## @@ -21,15 +21,6 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.TableIdentifier; -/** - * Emitted when Polaris intends to perform a commit to a table. There is no guarantee on the order - * of this event relative to the validation checks we've performed, which means the commit may still - * fail Polaris-side validation checks. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ public record BeforeTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + String eventId, TableIdentifier tableIdentifier, TableMetadata base, TableMetadata metadata) Review Comment: IIUC these will become _VERY_ big payloads in persisted events, causing issues in/with the backend database and runtime memory issues. I consider having `TableMetadata` in events a serious issue that has to be solved. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + @Override + public final void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewCommited( + BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewCommited( + AfterViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableRefreshed( + BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableRefreshed( + AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewRefreshed( + BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewRefreshed( + AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onBeforeTableCreated( + BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCreated( + AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalParameters(additionalParameters); + + addToBuffer(polarisEvent, callCtx); + } + + @Override + public void onAfterCatalogCreated( + AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent, callCtx); + } + + private long getTimestamp(CallContext callCtx) { + return callCtx.getPolarisCallContext().getClock().millis(); + } + + private String getRequestId(CallContext callCtx) { + return callCtx.getPolarisCallContext().getRequestId(); + } + + private String getUsername(SecurityContext securityContext) { Review Comment: `Principal.getName()` is not guaranteed to be a username ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>(); Review Comment: This "buffer" can easily become a serious issue for GC and heap issues (OOMs), see my comment about having `TableMetadata`. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.collect.Streams; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.core.SecurityContext; +import java.util.ArrayList; +import java.util.List; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEvent; + +/** Event listener that stores all emitted events forever. Not recommended for use in production. */ +@ApplicationScoped +@Identifier("test") +public class TestPolarisEventListener extends PolarisEventListener { Review Comment: It's test code which should not live in `main/src` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
