adutra commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2306925833


##########
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java:
##########
@@ -24,5 +24,5 @@
  * @param method The request's HTTP method
  * @param absolutePath The request's absolute path
  */
-public record BeforeRequestRateLimitedEvent(String method, String absolutePath)
+public record BeforeRequestRateLimitedEvent(String eventId, String method, 
String absolutePath)

Review Comment:
   nit: Does the notion of before/after apply for a rate limiting event? I 
would go with just `RequestRateLimitedEvent`.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Emitted when Polaris creates a table. */
+public record AfterTableCreatedEvent(
+    String eventId, String catalogName, TableMetadata metadata, 
TableIdentifier identifier)

Review Comment:
   A few minor nits here: 
   
   1. all other records have the identifier before the metadata, but not this 
one.
   2. why do some events have the catalog name, but not all of them? Shouldn't 
this be always included?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+/** Emitted Polaris creates a catalog. */
+public record AfterCatalogCreatedEvent(String eventId, String catalogName)

Review Comment:
   I note a few inconsistencies here:
   
   1. There is no `BeforeCatalogCreatedEvent`;
   2. There is no `BeforeTableCreatedEvent`;
   3. There is no `BeforeViewCreatedEvent` and no `AfterViewCreatedEvent`.
   
   Are you planning to add the missing events little by little?



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java:
##########
@@ -1959,4 +1960,14 @@ private List<PolarisBaseEntity> 
loadPoliciesFromMappingRecords(
             .collect(Collectors.toList());
     return ms.lookupEntities(callCtx, policyEntityIds);
   }
+
+  @Nonnull
+  @Override
+  public void writeEvents(

Review Comment:
   This implementation is identical to the default method implementation and 
therefore could be removed.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {

Review Comment:
   No need to call `buffer.get()`:
   
   ```suggestion
       ConcurrentLinkedQueue<PolarisEvent> queue =
           buffer.computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>());
       queue.add(polarisEvent);
       if (queue.size() >= maxBufferSize) {
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());

Review Comment:
   Nit: this is still slightly inefficient, as `replace` performs an `equals` 
operation on the map values, and this can be expensive since the values are 
queues. And the replacement has more chances to fail as more threads call 
`addToBuffer` concurrently. That's fine for now, but there are probably better 
ways to handle this buffer flush.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =

Review Comment:
   Do we need a `ConcurrentLinkedQueue`? It seems a non-concurrent collection, 
e.g. `ArrayList`, would be enough. The map must be concurrent, but individual 
values maybe not.
   
   What worries me is that you are calling `ConcurrentLinkedQueue.size()` 
often, but that method does not run in constant time.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;

Review Comment:
   Not opposed to leaving this as is and improving later, but just out of 
curiosity, why can't this property be a `Map<String, String>`?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")

Review Comment:
   nit: same here, I'd choose some simpler name, maybe just `in-memory` since 
we already have 2 or 3 beans with such a name.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();

Review Comment:
   We should look into reusing the CDI-managed `ObjectMapper` instead, do you 
mind adding a TODO here?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java:
##########
@@ -26,10 +26,10 @@
  * of this event relative to the validation checks we've performed, which 
means the commit may still
  * fail Polaris-side validation checks.
  *
- * @param identifier The identifier.
+ * @param tableIdentifier The identifier.
  * @param base The old metadata.
  * @param metadata The new metadata.
  */
 public record BeforeTableCommitedEvent(
-    TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+    String eventId, TableIdentifier tableIdentifier, TableMetadata base, 
TableMetadata metadata)

Review Comment:
   nit: to clearly distinguish the two metadata objects, how about naming them 
`oldMetadata` and `newMetadata` , or `beforeMetadata` and `afterMetadata`?



##########
runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java:
##########
@@ -167,6 +173,8 @@ public Response createCatalog(
     validateExternalCatalog(catalog);
     Catalog newCatalog = 
CatalogEntity.of(adminService.createCatalog(request)).asCatalog();
     LOGGER.info("Created new catalog {}", newCatalog);
+    polarisEventListener.onAfterCatalogCreated(

Review Comment:
   Just to make sure we are on the same page: this kind of pattern is going to 
be replaced by `@Decorator` beans, correct?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();

Review Comment:
   ```suggestion
     private final Set<Future<?>> futures = ConcurrentHashMap.newKeySet();
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}

Review Comment:
   `PolarisCallContext` is a request-scoped CDI bean; in order to store 
instances of this bean in an application-scoped map, you are calling 
`PolarisCallContext.copy()` to create a "detached" copy of the bean. That's  OK 
for now, but not great – I recall some discussions about removing the `copy()` 
method.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import java.time.Duration;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer")

Review Comment:
   Nit: `persistence-in-memory-buffer` sounds contradictory to me, as it can 
either be "persistent" or "in-memory", but not both.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());
+      if (!replaced) {
+        // Another thread concurrently modified the buffer, so do not continue
+        return;
+      }
+
+      metaStoreManagerFactory
+          .getOrCreateMetaStoreManager(() -> realmId)
+          .writeEvents(
+              head.callContext(),
+              new 
ArrayList<>(queue.stream().map(EventAndContext::polarisEvent).toList()));

Review Comment:
   Why is the list mutable? I think 
`queue.stream().map(EventAndContext::polarisEvent).toList()` is enough.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+
+  // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer`
+  @Override
+  public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent 
event) {}
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTableCreated(AfterTableCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalProperties(additionalParameters);
+    addToBuffer(polarisEvent);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent);
+  }
+
+  protected record ContextSpecificInformation(long timestamp, String 
principalName) {}
+
+  abstract ContextSpecificInformation getContextSpecificInformation();
+
+  abstract String getRequestId();
+
+  abstract void addToBuffer(org.apache.polaris.core.entity.PolarisEvent event);

Review Comment:
   Buffering events is imho an implementation detail – not every listener needs 
to buffer events. How about naming this method to something more generic, e.g. 
`processEvent`?
   
   (and BTW: _all_ methods should call `processEvent` including 
`onBeforeRequestRateLimited` – it's up to the implementation to decide what to 
do with the event.)



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOGGER.warn("Executor did not shut down cleanly");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      for (String realmId : buffer.keySet()) {
+        try {
+          checkAndFlushBufferIfNecessary(realmId, true);
+        } catch (Exception e) {
+          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
+        }
+      }
+    }
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent) {
+    String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callContext.getPolarisCallContext().copy()));
+    if (buffer.get(realmId).size() >= maxBufferSize) {
+      futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)), 1);
+    }
+  }
+
+  @VisibleForTesting
+  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    EventAndContext head = queue.peek();
+    if (head == null) {
+      return;
+    }
+
+    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());
+
+    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
+      // Atomically replace old queue with new queue
+      boolean replaced = buffer.replace(realmId, queue, new 
ConcurrentLinkedQueue<>());
+      if (!replaced) {
+        // Another thread concurrently modified the buffer, so do not continue
+        return;
+      }
+
+      metaStoreManagerFactory

Review Comment:
   I wonder if this isn't better as it would avoid the need to store 
`PolarisCallContext` in the buffer:
   
   ```java
    RealmContext realmContext = () -> realmId;
     PolarisMetaStoreManager metaStoreManager =
         metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
     BasePersistence basePersistence = 
metaStoreManagerFactory.getOrCreateSession(realmContext);
     metaStoreManager.writeEvents(
         new PolarisCallContext(realmContext, basePersistence, 
polarisDiagnostics),
         queue.stream().toList());
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+
+  // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer`
+  @Override
+  public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent 
event) {}
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  @Override
+  public void onAfterTableCreated(AfterTableCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalProperties(additionalParameters);
+    addToBuffer(polarisEvent);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
+    ContextSpecificInformation contextSpecificInformation = 
getContextSpecificInformation();
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(),
+            event.getClass().getSimpleName(),
+            contextSpecificInformation.timestamp(),
+            contextSpecificInformation.principalName(),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent);
+  }
+
+  protected record ContextSpecificInformation(long timestamp, String 
principalName) {}

Review Comment:
   nit: I'm not sure I see a lot of value in this record, as opposed to just 
exposing two methods: `timestamp()` and `principalName()`.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {

Review Comment:
   Why call `checkAndFlushBufferIfNecessary` here? This creates unnecessary 
concurrent flushes. Imo this method should just be cleaning up completed 
futures.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());

Review Comment:
   ```suggestion
         futures.removeIf(Future::isDone);
   ```



##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
+    
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
+    
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+
+    // Use reflection to set the callContext field
+    try {
+      java.lang.reflect.Field field =
+          
InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext");
+      field.setAccessible(true);
+      field.set(eventListener, callContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set callContext field", e);
+    }
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {
+    String realmId = "realm1";
+    int threadCount = 10;
+    List<Thread> threads = new ArrayList<>();
+    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
+
+    // Pre-populate the buffer with events
+    List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+
+    // Each thread will call checkAndFlushBufferIfNecessary concurrently
+    for (int i = 0; i < threadCount; i++) {
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+                } catch (Exception e) {
+                  exceptions.add(e);
+                }
+              });
+      threads.add(t);
+    }
+    // Start all threads
+    threads.forEach(Thread::start);
+    // Wait for all threads to finish
+    for (Thread t : threads) {
+      t.join();
+    }
+    // There should be no exceptions
+    if (!exceptions.isEmpty()) {
+      throw new AssertionError(
+          "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: 
", exceptions.peek());
+    }
+    // Only one flush should occur
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), eq(events));
+  }
+
+  @Test
+  public void testRequestIdFunctionalityWithContainerRequestContext() {
+    // Test when containerRequestContext has requestId property
+    ContainerRequestContext mockContainerRequestContext =
+        Mockito.mock(ContainerRequestContext.class);
+    String expectedRequestId = "custom-request-id-123";
+
+    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
+    
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
+
+    // Use reflection to set the containerRequestContext field
+    try {
+      java.lang.reflect.Field field =
+          InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
+              "containerRequestContext");
+      field.setAccessible(true);
+      field.set(eventListener, mockContainerRequestContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set containerRequestContext 
field", e);
+    }

Review Comment:
   Why? Reflection is not necessary here:
   
   ```suggestion
        eventListener.containerRequestContext = mockContainerRequestContext;
   ```



##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
+    
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
+    
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+
+    // Use reflection to set the callContext field
+    try {
+      java.lang.reflect.Field field =
+          
InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField("callContext");
+      field.setAccessible(true);
+      field.set(eventListener, callContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set callContext field", e);
+    }
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1, false);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {

Review Comment:
   Thanks for adding this test. It's not testing however the real-life scenario 
where many threads call `addToBuffer` concurrently and potentially enqueue a 
flush task. Without this, the test kind of loses its purpose, because there is 
no concurrent updates to the queue, only concurrent flushes.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush = eventListenerConfiguration.bufferTime();
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
+
+    executor = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId, false);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();

Review Comment:
   Nit: that's not quite the recommended way to shut down the executor, cf. 
https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor;
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}

Review Comment:
   FYI: https://github.com/apache/polaris/pull/2294



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to