snazy commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2253895594


##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Optional;
+import 
org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration;
+
+@StaticInitSafe

Review Comment:
   Why re-introduce? #1331



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    // Given that we are using a ConcurrentLinkedQueue, this should not lock 
any calls to `add` on
+    // the queue.
+    synchronized (queue) {

Review Comment:
   What does this synchronize with??



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;

Review Comment:
   This ties an event-listener implementation to JAX-RS, which IMHO should not 
be the case.



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferPersistenceListenerConfiguration.class);
+    when(eventListenerConfiguration.maxBufferSize())
+        .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE));
+    when(eventListenerConfiguration.bufferTime())
+        .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS));
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent, callContext);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {
+    String realmId = "realm1";
+    int threadCount = 10;
+    List<Thread> threads = new ArrayList<>();
+    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
+
+    // Pre-populate the buffer with events
+    List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+
+    // Each thread will call checkAndFlushBufferIfNecessary concurrently
+    for (int i = 0; i < threadCount; i++) {
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  eventListener.checkAndFlushBufferIfNecessary(realmId);
+                } catch (Exception e) {
+                  exceptions.add(e);
+                }
+              });
+      threads.add(t);
+    }
+    // Start all threads
+    threads.forEach(Thread::start);
+    // Wait for all threads to finish
+    for (Thread t : threads) {
+      t.join();
+    }
+    // There should be no exceptions
+    if (!exceptions.isEmpty()) {
+      throw new AssertionError(
+          "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: 
", exceptions.peek());
+    }
+    // Only one flush should occur
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), eq(events));
+  }
+
+  @Test
+  public void testRequestIdFunctionalityWithContainerRequestContext() {
+    // Test when containerRequestContext has requestId property
+    ContainerRequestContext mockContainerRequestContext =
+        Mockito.mock(ContainerRequestContext.class);
+    String expectedRequestId = "custom-request-id-123";
+
+    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
+    
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
+
+    // Use reflection to set the containerRequestContext field
+    try {
+      java.lang.reflect.Field field =
+          InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
+              "containerRequestContext");
+      field.setAccessible(true);
+      field.set(eventListener, mockContainerRequestContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set containerRequestContext 
field", e);
+    }

Review Comment:
   Using Java reflection in tests for new code worries me, should be avoided.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  @Override
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  @Override
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalParameters(additionalParameters);
+
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(
+      AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  private long getTimestamp(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getClock().millis();
+  }
+
+  private String getRequestId(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getRequestId();
+  }
+
+  private String getUsername(SecurityContext securityContext) {

Review Comment:
   The function says get**Username**, so I'd expect it to return that



##########
runtime/defaults/src/main/resources/application.properties:
##########
@@ -109,23 +109,29 @@ polaris.realm-context.header-name=Polaris-Realm
 polaris.realm-context.require-header=false
 
 
polaris.features."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
-polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE"]
+polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
+polaris.features."ALLOW_INSECURE_STORAGE_TYPES"=true
+polaris.readiness.ignore-severe-issues=true

Review Comment:
   This exposes user to insecure settings _by default_.
   These changes must really be reverted.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;

Review Comment:
   This introduces another `Map`<->`String` JSON re-serialization dances. I'd 
strongly prefer to not do that mistake again.



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Optional;
+import 
org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer")
+@ApplicationScoped
+public interface QuarkusPolarisInMemoryBufferEventListenerConfiguration
+    extends InMemoryBufferPersistenceListenerConfiguration {
+  /**
+   * @return the buffer time in milliseconds
+   */
+  @Override
+  @WithName("buffer-time")
+  @WithDefault("5000ms")
+  Optional<Duration> bufferTime();
+
+  /**
+   * @return the maximum number of cached entries
+   */
+  @Override
+  @WithName("max-buffer-size")
+  @WithDefault("5")
+  Optional<Integer> maxBufferSize();

Review Comment:
   Not `OptionalInt`?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   This starts a thread _before_ the CDI container considers the instance 
started, right?



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalProperties() {
+    return additionalProperties != null ? additionalProperties : 
EMPTY_MAP_STRING;
+  }
+
+  public PolarisEvent(
+      String catalogId,
+      String id,
+      String requestId,
+      String eventType,
+      long timestampMs,
+      String actor,
+      ResourceType resourceType,
+      String resourceIdentifier) {
+    this.catalogId = catalogId;
+    this.id = id;
+    this.requestId = requestId;
+    this.eventType = eventType;
+    this.timestampMs = timestampMs;
+    this.principalName = actor;
+    this.resourceType = resourceType;
+    this.resourceIdentifier = resourceIdentifier;
+  }
+
+  @JsonIgnore
+  public void setAdditionalProperties(Map<String, String> properties) {
+    try {
+      this.additionalProperties = properties == null ? null : 
MAPPER.writeValueAsString(properties);
+    } catch (JsonProcessingException ex) {
+      throw new IllegalStateException(
+          String.format("Failed to serialize json. properties %s", 
properties), ex);
+    }
+  }
+
+  public void setAdditionalProperties(String additionalProperties) {
+    this.additionalProperties = additionalProperties;
+  }
+
+  public enum ResourceType {
+    CATALOG,
+    NAMESPACE,
+    TABLE,
+    VIEW
+  }

Review Comment:
   What's the overall plan to add other types in the future?
   How can the various kinds of tables be differentiated?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();

Review Comment:
   What's the `Integer` for?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public abstract class PolarisEventListener {
+
+  /** {@link BeforeRequestRateLimitedEvent} */
+  public void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, SecurityContext securityContext) {}
+
+  /** {@link BeforeTableCommitedEvent} */
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterTableCommitedEvent} */
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}

Review Comment:
   Why does every function here need `CallContext` and `SecurityContext`?
   The former is already available in the implementing types.
   The latter is a JAX-RS type leaking into persistence concerns.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {

Review Comment:
   It says Polaris**Persistence**EventListener - so either the name is wrong or 
the type is not in the right package?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));

Review Comment:
   Why this explicit .cancel() ?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   This is internal code, should not be `public`



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();

Review Comment:
   This doesn't guarantee that nothing will happen when the function returns, 
leading to errors



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    // Given that we are using a ConcurrentLinkedQueue, this should not lock 
any calls to `add` on
+    // the queue.
+    synchronized (queue) {
+      // Double-check inside synchronized block
+      if (queue.isEmpty()) {
+        return;
+      }
+
+      EventAndContext head = queue.peek();
+      if (head == null) {
+        return;
+      }
+
+      Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());

Review Comment:
   `clock.millis() - head.polarisEvent.getTimestampMs()` relies on the 
wall-clock, not the actual elapsed time.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   This will run for each call to `addToBuffer()` - that's quite a lot of 
immediately scheduled futures.
   I still suspect a lot of elements piling up in the queue(s), where 
processing of those is being stalled.
   The implementation doesn't seem to be resilient against such scenarios, like 
millions of "rate limited" events to bring down the service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to