adnanhemani commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2184268258


##########
polaris-core/src/main/java/org/apache/polaris/core/utils/CachedSupplier.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.utils;
+
+import java.util.function.Supplier;
+
+public class CachedSupplier<T> implements Supplier<T> {

Review Comment:
   This has been tackled by #1988. Removing all references to this.



##########
runtime/defaults/src/main/resources/application.properties:
##########
@@ -120,12 +120,16 @@ 
polaris.features."SUPPORTED_EXTERNAL_CATALOG_AUTHENTICATION_TYPES"=["OAUTH", "BE
 # polaris.persistence.type=eclipse-link
 # polaris.persistence.type=in-memory-atomic
 polaris.persistence.type=in-memory
+# polaris.persistence.type=relational-jdbc
 
 polaris.secrets-manager.type=in-memory
 
 polaris.file-io.type=default
 
 polaris.event-listener.type=no-op
+# polaris.event-listener.type=persistence-file-buffer

Review Comment:
   Updated the properties as per latest implementation.



##########
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Optional;
+
+public interface EventListenerConfiguration {
+  Optional<Long> bufferTime();

Review Comment:
   Good call, I didn't know Quarkus supported Duration - thanks for pointing me 
in this direction!



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();
+  private final ScheduledExecutorService thread = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Sure - done!



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisConfigurationStore configurationStore;
+  private BasePersistence basePersistence;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500;
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    basePersistence = mock(BasePersistence.class);
+    Supplier basePersistenceSupplier = () -> basePersistence;
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    when(metaStoreManagerFactory.getOrCreateSessionSupplier(Mockito.any()))
+        .thenReturn(basePersistenceSupplier);
+
+    EventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(EventListenerConfiguration.class);
+    when(eventListenerConfiguration.maxBufferSize())
+        .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE));
+    when(eventListenerConfiguration.bufferTime())
+        .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS));
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, configurationStore, clock, 
eventListenerConfiguration);
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS * 2, ChronoUnit.MILLIS);
+    eventListener.checkAndFlushBufferIfNecessary(realmId);
+    verify(basePersistence, times(1)).writeEvents(eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {

Review Comment:
   Even though I don't believe this should occur, I've added a test which tests 
this.



##########
service/common/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Emitted when Polaris intends to create a table. */

Review Comment:
   Good catch, went too fast and copy pasted it over accidentally from the 
Before version of this event.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+public class ModelEvent implements Converter<PolarisEvent> {

Review Comment:
   Changed.



##########
service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java:
##########
@@ -20,13 +20,6 @@
 
 import org.apache.polaris.core.context.CallContext;
 
-/**
- * Emitted before an attempt of an async task, such as manifest file cleanup, 
begins.
- *
- * @param taskEntityId The ID of the TaskEntity
- * @param callContext The CallContext the task is being executed under.
- * @param attempt The attempt number. Each retry of the task will have its own 
attempt number. The
- *     initial (non-retried) attempt starts counting from 1.
- */
-public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext 
callContext, int attempt)
+public record BeforeTaskAttemptedEvent(
+    String eventId, long taskEntityId, CallContext callContext, int attempt)

Review Comment:
   Agreed that we should not have a `CallContext` as part of the event record 
itself. But while this change is technically not in the scope of this PR, I 
will make it anyways as we are sending in the CallContext into the event 
listener with this PR.



##########
service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java:
##########
@@ -21,15 +21,6 @@
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.TableIdentifier;
 
-/**
- * Emitted when Polaris intends to perform a commit to a table. There is no 
guarantee on the order
- * of this event relative to the validation checks we've performed, which 
means the commit may still
- * fail Polaris-side validation checks.
- *
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
 public record BeforeTableCommitedEvent(
-    TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+    String eventId, TableIdentifier tableIdentifier, TableMetadata base, 
TableMetadata metadata)

Review Comment:
   > IIUC these will become VERY big payloads in persisted events
   
   I agree with this statement. But not with this:
   
   > I consider having TableMetadata in events a serious issue that has to be 
solved.
   
   Having the TableMetadata POJO in the event definition in itself is not an 
issue. The server has already loaded this into its memory for this call. But 
persisting this POJO into the database may be problematic - and that's why this 
PR does not actually do this.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  @Override
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  @Override
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalParameters(additionalParameters);
+
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(
+      AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent, callCtx);
+  }

Review Comment:
   Just part of what's mentioned in the scope of the PR. The remaining/new 
events will be instrumented accordingly in a future PR - I did not want to add 
all events immediately to ensure that the focus of the PR stays on the main 
logic instead of the implementation of all events.



##########
polaris-core/src/main/java/org/apache/polaris/core/PolarisCallContext.java:
##########
@@ -85,6 +91,10 @@ public Clock getClock() {
     return clock;
   }
 
+  public String getRequestId() {

Review Comment:
   I agree, this class is combining a lot of unrelated things...but I can't 
think of any better other place to keep the request ID. If you have a 
suggestion, I'm happy to investigate it!
   
   > I would prefer to use the same request ID coming from HTTP headers if 
available, and if not, use a random UUID. Wdyt?
   
   I did not see this when I searched through the repository - but this is a 
great call. Let me see if I can integrate this through.



##########
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Optional;
+
+public interface EventListenerConfiguration {
+  Optional<Long> bufferTime();
+
+  Optional<Integer> maxBufferSize();

Review Comment:
   Added Javadoc.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;

Review Comment:
   Sorry about that and thanks for catching, changed!



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;

Review Comment:
   Good call, changed!



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;

Review Comment:
   Good catch, changed!



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisConfigurationStore configurationStore;
+  private BasePersistence basePersistence;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500;
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    basePersistence = mock(BasePersistence.class);
+    Supplier basePersistenceSupplier = () -> basePersistence;

Review Comment:
   Removed as part of some further refactoring to ensure that buffer flushes 
are going through `PolarisMetaStoreManager` rather than directly through 
`BasePersistence`.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();

Review Comment:
   Changed.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();
+  private final ScheduledExecutorService thread = 
Executors.newSingleThreadScheduledExecutor();
+  private final long timeToFlush;
+  private final int maxBufferSize;
+  private Future<?> backgroundTask;
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore polarisConfigurationStore,
+      Clock clock,
+      EventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.polarisConfigurationStore = polarisConfigurationStore;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse((long) 30 * 1000); // 
30s default
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    backgroundTask =
+        thread.scheduleAtFixedRate(this::runCleanup, 0, timeToFlush, 
TimeUnit.MILLISECONDS);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    backgroundTask.cancel(false);
+    thread.shutdownNow();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+    buffer.computeIfAbsent(realmId, k -> new ArrayList<>()).add(polarisEvent);
+    checkAndFlushBufferIfNecessary(realmId);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   This is a good point, I've changed the logic heavily to become thread-safe. 
But I've also made the change to run all cleanup only on the executor service - 
so in theory, we should never have 2 threads calling this function to begin 
with.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {

Review Comment:
   Yes, this is a POJO. Not sure what @PolarisImmutable helps with? I don't see 
any other entity types that have that decorator?
   
   Serializable was needed before for the file-base buffers, but I didn't 
really hurt to keep either. But I've removed that now.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalParameters;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalParameters() {
+    return additionalParameters != null ? additionalParameters : 
EMPTY_MAP_STRING;
+  }
+
+  public Map<String, String> getAdditionalParametersAsMap() {

Review Comment:
   It is unused - but we have similar methods in other Polaris entity types 
that are similarly unused at the current moment but sometimes a use case does 
show up (personally seen this on an older PR). I'll remove it for now, but 
there's a decent chance I'll have to reintroduce it with some other event type.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeCatalogCreatedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public class PolarisEventListener {

Review Comment:
   I turned it back to a ABC (not sure why I undid that change tbh) - I think 
the original goal was to make it easy for implementors to only override 
whichever set of methods they'd like instead of having to manually implement 
every single type of event, even if they want most events to remain no-ops.
   
   Let me know if you have hard thoughts on this.



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java:
##########
@@ -42,8 +42,8 @@
 import org.apache.polaris.service.context.DefaultRealmContextResolver;
 import org.apache.polaris.service.context.RealmContextResolver;
 import org.apache.polaris.service.context.TestRealmContextResolver;
-import org.apache.polaris.service.events.PolarisEventListener;
-import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.apache.polaris.service.events.listeners.PolarisEventListener;
+import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
 import 
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
 import org.eclipse.microprofile.config.Config;

Review Comment:
   Good eye - I've changed it in the next revision.



##########
service/common/src/main/java/org/apache/polaris/service/config/DefaultConfigurationStore.java:
##########
@@ -47,7 +46,10 @@ public DefaultConfigurationStore(
   }
 
   @Override
-  public <T> @Nullable T getConfiguration(@Nonnull RealmContext realmContext, 
String configName) {
+  public <T> @Nullable T getConfiguration(RealmContext realmContext, String 
configName) {
+    if (realmContext == null) {

Review Comment:
   Sorry, I remember reverting this in the latest commit but don't know what 
happened. Thanks for pointing this out.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java:
##########
@@ -200,6 +202,61 @@ public int executeUpdate(QueryGenerator.PreparedQuery 
preparedQuery) throws SQLE
         });
   }
 
+  /**
+   * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL 
queries have the same
+   * parameterized form.
+   *
+   * @param preparedQueries : queries to be executed
+   * @return : Number of rows modified / inserted.
+   * @throws SQLException : Exception during Query Execution.
+   */
+  public int executeBatchUpdate(List<QueryGenerator.PreparedQuery> 
preparedQueries)
+      throws SQLException {
+    if (preparedQueries.isEmpty()) {
+      return 0;
+    }
+    int batchSize = 100;
+    AtomicInteger successCount = new AtomicInteger();
+    return withRetries(
+        () -> {
+          String sql = preparedQueries.get(0).sql();

Review Comment:
   > Am I correct that this is relying on the assumption that all PreparedQuery 
instances have the exact same SQL query?
   
   Yes, that is written in the Javadoc above this method.
   
   > A proper signature would be something like: `public int 
executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedBatchQuery)`
   
   I've implemented it in the next revision, but makes the code a bit less 
clean imo. Please do check and let me know if there are any suggestions on this.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeCatalogCreatedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public class PolarisEventListener {
+
+  /** {@link BeforeRequestRateLimitedEvent} */
+  public void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  /** {@link BeforeTableCommitedEvent} */
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterTableCommitedEvent} */
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link BeforeViewCommitedEvent} */
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterViewCommitedEvent} */
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link BeforeTableRefreshedEvent} */
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterTableRefreshedEvent} */
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link BeforeViewRefreshedEvent} */
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterViewRefreshedEvent} */
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link BeforeTaskAttemptedEvent} */
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  /** {@link AfterTaskAttemptedEvent} */
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  /** {@link BeforeTableCreatedEvent} */
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterTableCreatedEvent} */
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link BeforeCatalogCreatedEvent} */
+  public void onBeforeCatalogCreated(
+      BeforeCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}

Review Comment:
   Yes, this is intentional - `CallContext` is sent along the event record 
rather than part of it. We need constructs and information from within the 
`CallContext` to be able to process these events. Let me know if there is a 
question about this here after seeing the usage pattern of the `CallContext`.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java:
##########
@@ -200,6 +202,61 @@ public int executeUpdate(QueryGenerator.PreparedQuery 
preparedQuery) throws SQLE
         });
   }
 
+  /**
+   * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL 
queries have the same
+   * parameterized form.
+   *
+   * @param preparedQueries : queries to be executed
+   * @return : Number of rows modified / inserted.
+   * @throws SQLException : Exception during Query Execution.
+   */
+  public int executeBatchUpdate(List<QueryGenerator.PreparedQuery> 
preparedQueries)
+      throws SQLException {
+    if (preparedQueries.isEmpty()) {
+      return 0;
+    }
+    int batchSize = 100;
+    AtomicInteger successCount = new AtomicInteger();
+    return withRetries(
+        () -> {
+          String sql = preparedQueries.get(0).sql();
+          try (Connection connection = borrowConnection();
+              PreparedStatement statement = connection.prepareStatement(sql)) {
+            boolean autoCommit = connection.getAutoCommit();
+            boolean success = false;
+            connection.setAutoCommit(false);
+
+            try {
+              for (int i = 1; i <= preparedQueries.size(); i++) {
+                List<Object> params = preparedQueries.get(i - 1).parameters();
+                for (int j = 0; j < params.size(); j++) {
+                  statement.setObject(j + 1, params.get(j));
+                }
+
+                statement.addBatch(); // Add to batch
+
+                if (i % batchSize == 0) {
+                  
successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum());
+                }
+              }
+
+              // Execute remaining queries in the batch
+              
successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum());
+              success = true;
+            } finally {
+              if (success) {
+                connection.commit();
+              } else {
+                connection.rollback();
+                successCount.set(0);
+              }
+              connection.setAutoCommit(autoCommit);

Review Comment:
   We do need to reset the `autoCommit`. Putting this into its own `finally` 
block is a good call - changed!



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+public class ModelEvent implements Converter<PolarisEvent> {

Review Comment:
   Added.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.collect.Streams;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEvent;
+
+/** Event listener that stores all emitted events forever. Not recommended for 
use in production. */
+@ApplicationScoped
+@Identifier("test")
+public class TestPolarisEventListener extends PolarisEventListener {

Review Comment:
   Was discussed on the original PR here: 
https://github.com/apache/polaris/pull/922/files#r2000612762
   
   I'll keep the same stance as the original thread - we can take this as a 
followup task (by creating a GH issue) to clean this and all other similarly 
implemented beans. But I don't want to cross the wires for that effort with 
this PR, which is already quite large as-is.



##########
persistence/relational-jdbc/src/main/resources/postgres/schema-v1.sql:
##########
@@ -114,3 +114,17 @@ CREATE TABLE IF NOT EXISTS policy_mapping_record (
 );
 
 CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record 
(realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, 
target_id);
+
+CREATE TABLE IF NOT EXISTS events (

Review Comment:
   This is a fair point, I've made a `schema-v3.sql` in the next revision.



##########
service/common/src/main/java/org/apache/polaris/service/events/BeforeCatalogCreatedEvent.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+/** Emitted when Polaris intends to create a table. */
+public record BeforeCatalogCreatedEvent(String eventId, String catalogName)

Review Comment:
   I figured that if we are adding the "After" version of the event, then we 
might as well add the "Before" to keep the pairing of the events together. But 
if we are being pedantic about sticking specifically to the explicitly 
mentioned scope, then I can remove this and reintroduce it in a different PR.



##########
service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java:
##########
@@ -21,15 +21,6 @@
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.TableIdentifier;
 
-/**

Review Comment:
   Sorry, they got lost in the shuffle of this PR. Restored, thanks for calling 
this out.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalParameters;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalParameters() {
+    return additionalParameters != null ? additionalParameters : 
EMPTY_MAP_STRING;
+  }
+
+  public Map<String, String> getAdditionalParametersAsMap() {
+    try {
+      return additionalParameters != null
+          ? MAPPER.readValue(this.additionalParameters, Map.class)
+          : Map.of();
+    } catch (JsonProcessingException ex) {
+      throw new IllegalStateException(
+          String.format(
+              "Failed to deserialize json. additionalParameters %s", 
this.additionalParameters),
+          ex);
+    }
+  }
+
+  @JsonCreator

Review Comment:
   Picked this up from the other entities, but I think you're right that it's 
not needed. Changed in following revision.



##########
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java:
##########
@@ -92,6 +96,53 @@ void testExecuteUpdate_failure() throws Exception {
     assertThrows(SQLException.class, () -> 
datasourceOperations.executeUpdate(query));
   }
 
+  @Test
+  void testExecuteBatchUpdate_success() throws Exception {

Review Comment:
   Changed.



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java:
##########
@@ -261,6 +262,12 @@ public void writeToGrantRecords(
     runActionInTransaction(callCtx, () -> 
this.writeToGrantRecordsInCurrentTxn(callCtx, grantRec));
   }
 
+  @Override
+  public void writeEvents(@Nonnull List<PolarisEvent> events) {
+    throw new UnsupportedOperationException(
+        "Not implemented for EclipseLink, as it is deprecated.");

Review Comment:
   Currently, that is the only non-testing Transactional Persistence 
implementation. But I've changed the wording for now - we can revisit 
implementing this in the future if there is a need.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalParameters;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalParameters() {
+    return additionalParameters != null ? additionalParameters : 
EMPTY_MAP_STRING;
+  }
+
+  public Map<String, String> getAdditionalParametersAsMap() {
+    try {
+      return additionalParameters != null
+          ? MAPPER.readValue(this.additionalParameters, Map.class)
+          : Map.of();
+    } catch (JsonProcessingException ex) {
+      throw new IllegalStateException(
+          String.format(
+              "Failed to deserialize json. additionalParameters %s", 
this.additionalParameters),
+          ex);
+    }
+  }
+
+  @JsonCreator
+  public PolarisEvent(
+      @JsonProperty("catalog_id") String catalogId,
+      @JsonProperty("id") String id,
+      @JsonProperty("request_id") String requestId,
+      @JsonProperty("event_type") String eventType,
+      @JsonProperty("timestamp_ms") long timestampMs,
+      @JsonProperty("actor") String actor,
+      @JsonProperty("resource_type") ResourceType resourceType,
+      @JsonProperty("resource_identifier") String resourceIdentifier) {
+    this.catalogId = catalogId;
+    this.id = id;
+    this.requestId = requestId;
+    this.eventType = eventType;
+    this.timestampMs = timestampMs;
+    this.principalName = actor;
+    this.resourceType = resourceType;
+    this.resourceIdentifier = resourceIdentifier;
+  }
+
+  // Needed for Kryo Deserialization

Review Comment:
   Changed.



##########
service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java:
##########
@@ -56,7 +58,11 @@ public void filter(ContainerRequestContext ctx) throws 
IOException {
     if (!rateLimiter.canProceed()) {
       polarisEventListener.onBeforeRequestRateLimited(
           new BeforeRequestRateLimitedEvent(
-              ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString()));
+              PolarisEvent.createEventId(),
+              ctx.getMethod(),
+              ctx.getUriInfo().getAbsolutePath().toString()),
+          CallContext.getCurrentContext(),
+          ctx.getSecurityContext());
       
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());

Review Comment:
   Referring back to the original discussion on this: 
https://github.com/apache/polaris/pull/922/files#r2001535434
   
   I'm not taking any sides on that discussion - but will say that this PR is 
not aiming to change anything about this event from before and is therefore 
definitely not in the scope of this PR. Please feel free to discuss this event 
outside this PR.
   
   If it helps - I can confirm, I will not be attempting to persist this event 
in this event listener implementation.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();

Review Comment:
   As discussed on the mailing list thread, users are free to adjust their 
flush timeout and/or number of events reached before this buffer will flush to 
the persistence. If this is not favorable to you, please suggest how else you'd 
like us to solve this problem.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  @Override
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  @Override
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalParameters(additionalParameters);
+
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(
+      AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  private long getTimestamp(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getClock().millis();
+  }
+
+  private String getRequestId(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getRequestId();
+  }
+
+  private String getUsername(SecurityContext securityContext) {

Review Comment:
   I don't think that is a requirement that it must be the username. Any unique 
identifier for that Principal should be fine. But if there's a better way to 
represent that principal, please do suggest.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {

Review Comment:
   This is not persistence-implementation specific. And it is still an event 
listener implementation at the end of the day.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  @Override
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  @Override
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalParameters(additionalParameters);
+
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(
+      AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  private long getTimestamp(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getClock().millis();
+  }
+
+  private String getRequestId(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getRequestId();
+  }

Review Comment:
   Not sure if the question is why did these two methods get extracted 
specifically? If that's the question, the answer is just that these were 
longer, repetitive code that it was much easier to exact into these utility 
methods to maintain DRYness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to