adnanhemani commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2308704688


##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;

Review Comment:
   I believe it has to do with the serializability of the record in and out of 
the database. But I'll readily admit, I did not test this and am just following 
the existing patterns in other similar entities.
   
   Maybe someone should do this in the future and if possible, change this for 
all entities.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java:
##########
@@ -26,10 +26,10 @@
  * of this event relative to the validation checks we've performed, which 
means the commit may still
  * fail Polaris-side validation checks.
  *
- * @param identifier The identifier.
+ * @param tableIdentifier The identifier.
  * @param base The old metadata.
  * @param metadata The new metadata.
  */
 public record BeforeTableCommitedEvent(
-    TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+    String eventId, TableIdentifier tableIdentifier, TableMetadata base, 
TableMetadata metadata)

Review Comment:
   Changed!



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent 
event) {}
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTableCreated(AfterTableCreatedEvent event) {

Review Comment:
   I added this in a comment from a previous review but I am planning to add 
almost all events eventually to this (basically all that you see except the 
Request Rate Limited ones). I will add a TODO to this code to make this 
apparent.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+/** Emitted Polaris creates a catalog. */
+public record AfterCatalogCreatedEvent(String eventId, String catalogName)

Review Comment:
   Yes, ideally the other instrumentation PRs I have in my fork (which I will 
surface to the main repo shortly) will take care of this.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();

Review Comment:
   Added!



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import java.time.Duration;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer")

Review Comment:
   I was trying to say this is an event listener that sinks events into the 
persistence layer using an in-memory buffer. I know this makes it longer, but 
would something like `persistence-sink-through-in-memory-buffer` make it more 
clear? 
   
   I'm afraid that `in-memory` doesn't actually capture the goal of the event 
listener, which is to sink the events into the persistence layer.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+
+  // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer`
+  @Override
+  public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent 
event) {}
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTableCreated(AfterTableCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalProperties(additionalParameters);
+    addToBuffer(polarisEvent);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent);
+  }
+
+  protected record ContextSpecificInformation(long timestamp, String 
principalName) {}
+
+  abstract ContextSpecificInformation getContextSpecificInformation();
+
+  abstract String getRequestId();
+
+  abstract void addToBuffer(org.apache.polaris.core.entity.PolarisEvent event);

Review Comment:
   Changed. On the topic of RateLimited events, I think the default for events 
that will eventually find themselves in the Persistence layer, it should be 
that they are not processed in any OSS implementation. If any implementation 
would like to actually persist this event, they should be free to override this 
method.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {

Review Comment:
   This is how we can do a best-effort flush according to the `timeToFlush` set 
by the user. Was there a different way you were thinking of this?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =

Review Comment:
   Unfortunately, I haven't found a different Queue implementation which solves 
all of the following constraints:
   * Thread-safe for multiple threads writing in an append-only manner.
   * Constant-time adds
   * No locking required on adds
   * Unbounded size
   
   At the risk of over-engineering this, I will add a new class which can help 
us keep track of the size in constant time in a best-effort manner. It may not 
always be completely accurate (since it is not atomic) - but should be good 
enough to determine whether we should flush or not IMO.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();

Review Comment:
   Updated to reflect this documentation.



##########
runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java:
##########
@@ -167,6 +173,8 @@ public Response createCatalog(
     validateExternalCatalog(catalog);
     Catalog newCatalog = 
CatalogEntity.of(adminService.createCatalog(request)).asCatalog();
     LOGGER.info("Created new catalog {}", newCatalog);
+    polarisEventListener.onAfterCatalogCreated(

Review Comment:
   That's correct. I will rebase this PR against the Decorator PR before 
merging - and then will remove any of these old event instrumentation I 
introduced as part of this PR. Putting a note on the draft PRs for this!



##########
runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Emitted when Polaris creates a table. */
+public record AfterTableCreatedEvent(
+    String eventId, String catalogName, TableMetadata metadata, 
TableIdentifier identifier)

Review Comment:
   1. Good call, changed.
   2. This is a good point as well - I was trying hard to not touch already 
existing events in this PR. But I think you're right, let me make these changes.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}

Review Comment:
   Removed - based on below comment.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java:
##########
@@ -24,5 +24,5 @@
  * @param method The request's HTTP method
  * @param absolutePath The request's absolute path
  */
-public record BeforeRequestRateLimitedEvent(String method, String absolutePath)
+public record BeforeRequestRateLimitedEvent(String eventId, String method, 
String absolutePath)

Review Comment:
   It currently is like this. I'm not opposed to making this change - but 
perhaps we don't make this change as part of this PR? WDYT?



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java:
##########
@@ -1959,4 +1960,14 @@ private List<PolarisBaseEntity> 
loadPoliciesFromMappingRecords(
             .collect(Collectors.toList());
     return ms.lookupEntities(callCtx, policyEntityIds);
   }
+
+  @Nonnull
+  @Override
+  public void writeEvents(

Review Comment:
   Removed.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();

Review Comment:
   I didn't know this existed - thanks for showing this!



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());
+      if (!replaced) {
+        // Another thread concurrently modified the buffer, so do not continue
+        return;
+      }
+
+      metaStoreManagerFactory
+          .getOrCreateMetaStoreManager(() -> realmId)
+          .writeEvents(
+              head.callContext(),
+              new 
ArrayList<>(queue.stream().map(EventAndContext::polarisEvent).toList()));

Review Comment:
   Removed.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());

Review Comment:
   Wanted to check one thing on this - I believe `equals` is not overridden 
from the Object class at any point by `ConcurrentLinkedQueue`, so technically 
this is only going to check whether or not it is the exact memory address, 
right?
   
   Am I missing some implementation override along the way?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());
+      if (!replaced) {
+        // Another thread concurrently modified the buffer, so do not continue
+        return;
+      }
+
+      metaStoreManagerFactory

Review Comment:
   I'm glad you raised this - I thought it would be more frowned to recreate 
the metastoreManager each time instead of just keeping a handle on the 
PolarisCallContext. But this is the cleaner solution, I agree! Thank you!



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+
+  // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer`
+  @Override
+  public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent 
event) {}
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTableCreated(AfterTableCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalProperties(additionalParameters);
+    addToBuffer(polarisEvent);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent);
+  }
+
+  protected record ContextSpecificInformation(long timestamp, String 
principalName) {}

Review Comment:
   I did this to make the code more DRY - if there are more Context-Specific 
pieces of information, then we can add them here as well and keep the code 
tight. If you disagree, I'm okay with undoing this record.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {

Review Comment:
   Good call! Changed.



##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
+    
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
+    
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+
+    // Use reflection to set the callContext field
+    try {
+      java.lang.reflect.Field field =
+          
InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext");
+      field.setAccessible(true);
+      field.set(eventListener, callContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set callContext field", e);
+    }
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {
+    String realmId = "realm1";
+    int threadCount = 10;
+    List<Thread> threads = new ArrayList<>();
+    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
+
+    // Pre-populate the buffer with events
+    List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+
+    // Each thread will call checkAndFlushBufferIfNecessary concurrently
+    for (int i = 0; i < threadCount; i++) {
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+                } catch (Exception e) {
+                  exceptions.add(e);
+                }
+              });
+      threads.add(t);
+    }
+    // Start all threads
+    threads.forEach(Thread::start);
+    // Wait for all threads to finish
+    for (Thread t : threads) {
+      t.join();
+    }
+    // There should be no exceptions
+    if (!exceptions.isEmpty()) {
+      throw new AssertionError(
+          "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: 
", exceptions.peek());
+    }
+    // Only one flush should occur
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), eq(events));
+  }
+
+  @Test
+  public void testRequestIdFunctionalityWithContainerRequestContext() {
+    // Test when containerRequestContext has requestId property
+    ContainerRequestContext mockContainerRequestContext =
+        Mockito.mock(ContainerRequestContext.class);
+    String expectedRequestId = "custom-request-id-123";
+
+    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
+    
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
+
+    // Use reflection to set the containerRequestContext field
+    try {
+      java.lang.reflect.Field field =
+          InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
+              "containerRequestContext");
+      field.setAccessible(true);
+      field.set(eventListener, mockContainerRequestContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set containerRequestContext 
field", e);
+    }

Review Comment:
   Sorry, this is overhang from a previous revision where this was not possible 
to do. Thank you for pointing this out! Changed all references to similar 
patterns.



##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
+    
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
+    
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+
+    // Use reflection to set the callContext field
+    try {
+      java.lang.reflect.Field field =
+          
InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext");
+      field.setAccessible(true);
+      field.set(eventListener, callContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set callContext field", e);
+    }
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {

Review Comment:
   Added a new test in the revision that should address this.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());

Review Comment:
   Great catch, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to